Skip to content

Commit 1f13665

Browse files
author
YYC
committed
Initial check in
1 parent 3d841bd commit 1f13665

File tree

3 files changed

+191
-0
lines changed

3 files changed

+191
-0
lines changed

appflow_ga.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import json
2+
import pandas as pd
3+
import boto3
4+
from sqlalchemy import create_engine
5+
import os
6+
import urllib.parse
7+
import logging
8+
9+
logger = logging.getLogger()
10+
if logger.handlers:
11+
for handler in logger.handlers:
12+
logger.removeHandler(handler)
13+
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
14+
15+
class PostgressDB():
16+
def __init__(self, username, password, port, host, database):
17+
self.user_name = username
18+
self.password = password
19+
self.port = port
20+
self.host = host
21+
self.database = database
22+
self.conn_string = self.__get_connect_string()
23+
24+
def __get_connect_string(self):
25+
"""
26+
Build the connection string for postgres
27+
:return: String: valid connection string for sqlalchemy engine
28+
"""
29+
return 'postgresql://{}:{}@{}:{}/{}'.format(self.user_name, self.password, self.host, self.port, self.database)
30+
31+
def create_engine(self):
32+
"""
33+
return a create_engine object with pooling
34+
:return:
35+
"""
36+
return create_engine(self.conn_string, pool_size=20, max_overflow=0)
37+
38+
39+
def process_json_file(jsonf):
40+
"""
41+
Process the result from appflow google analytics extraction. The json structure needs to
42+
converted to a table structure
43+
:param jsonf: JSON structure
44+
:return: Pandas Dataframe
45+
"""
46+
logger.info("Starting conversion JSON format to table format.")
47+
logger.info("Detecting {} valid JSON structures in the objects".format(str(len(jsonf))))
48+
#JsonF is a list but the cols and metrics will always be the same across multiple jsons for 1 file
49+
cols = []
50+
try:
51+
cols = [r for r in jsonf[0]['reports'][0]['columnHeader']['dimensions']]
52+
except:
53+
logger.warning("No dimensions specified.")
54+
metrics = []
55+
try:
56+
metrics = [r['name'] for r in jsonf[0]['reports'][0]['columnHeader']['metricHeader']['metricHeaderEntries']]
57+
except:
58+
logger.warning("No metrics specified.")
59+
60+
61+
pd_result = None
62+
63+
for list_index in range(len(jsonf)):
64+
data_rows = [r for r in jsonf[list_index]['reports'][0]['data']['rows']]
65+
dim_result_dict = {}
66+
67+
for row in data_rows:
68+
#if there are dimensions, extract the dimension data and add values per key
69+
for i in range(len(cols)):
70+
if cols[i] in dim_result_dict.keys():
71+
data_list = dim_result_dict[cols[i]]
72+
data_list.append(row['dimensions'][i])
73+
dim_result_dict.update({cols[i]: data_list})
74+
else:
75+
dim_result_dict[cols[i]] = [row['dimensions'][i]]
76+
77+
# if there are metrics, extract the metrics data and add values per key
78+
for i in range(len(metrics)):
79+
if metrics[i] in dim_result_dict.keys():
80+
data_list = dim_result_dict[metrics[i]]
81+
data_list.append(row['metrics'][0]['values'][i])
82+
dim_result_dict.update({metrics[i]: data_list})
83+
else:
84+
dim_result_dict[metrics[i]] = [row['metrics'][0]['values'][i]]
85+
#Create dataframe for the first JSON object otherwise append to existing
86+
if list_index == 0:
87+
pd_result = pd.DataFrame.from_dict(dim_result_dict)
88+
else:
89+
pd_result = pd_result.append(pd.DataFrame.from_dict(dim_result_dict))
90+
logger.info("Finished conversion JSON format to table format.")
91+
return pd_result
92+
93+
def lambda_handler(event, context):
94+
logger.info("Starting appflow conversion")
95+
bucket_name = event['Records'][0]['s3']['bucket']['name']
96+
object_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
97+
s3_client = boto3.client('s3')
98+
99+
logger.info("Processing bucket {}, filename {}".format(bucket_name,object_key))
100+
101+
raw_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
102+
raw_data = json.loads('[' + raw_object['Body'].read().decode('utf-8').replace('}\n{', '},{') + ']')
103+
#Raw data is always a list of JSON objects
104+
pd_result = process_json_file(raw_data)
105+
106+
db = PostgressDB(username=os.getenv("DB_USERNAME"),
107+
password=os.getenv("DB_PASSWORD"),
108+
port=5432,
109+
host=os.getenv("DB_HOST"),
110+
database=os.getenv("DB_DATABASE"))
111+
db_tmp_table = os.getenv("DB_TABLE_TMP")
112+
logger.info("Writing data to the table {}".format(db_tmp_table))
113+
114+
pd_result.to_sql(name=db_tmp_table, con=db.create_engine(), index=False, if_exists='replace')
115+
116+
logger.info("Finished appflow conversion")

packager.sh

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#!/usr/bin/env bash
2+
set -x
3+
4+
pFlag=0
5+
rFlag=0
6+
nFlag=0
7+
zFlag=0
8+
9+
CreateZipFile(){
10+
cd package
11+
zip -r9 ../${ZIP_NAME}.zip .
12+
cd ../
13+
zip -g ${ZIP_NAME}.zip appflow_ga.py
14+
}
15+
16+
17+
UsageMessage(){
18+
echo "usage : packager -p <aws profile> -r <aws_region> -n <name of the function> -z <zipname> -h <dbhost> -i <GOOGLE_SPREADSHEET_ID> -t <GOOGLE_SPREADSHEET_TAB_PAGE> -d <DB_TEMP_TABLE>"
19+
}
20+
21+
while getopts "p:r:n:z:" option
22+
do
23+
case ${option} in
24+
"p" ) AWS_PROFILE=${OPTARG}
25+
pFlag=1
26+
;;
27+
"r" ) AWS_REGION=${OPTARG}
28+
rFlag=1
29+
;;
30+
"n" ) FUNCTION_NAME=${OPTARG}
31+
nFlag=1
32+
;;
33+
"z" ) ZIP_NAME=${OPTARG}
34+
zFlag=1
35+
;;
36+
\? ) UsageMessage
37+
exit 4
38+
;;
39+
esac
40+
done
41+
42+
43+
if [ ${pFlag} -eq 0 ] || [ ${rFlag} -eq 0 ] || [ ${nFlag} -eq 0 ] || [ ${zFlag} -eq 0 ]; then
44+
UsageMessage
45+
exit 4
46+
fi
47+
48+
CreateZipFile
49+
50+
51+
52+
function_name_cli=`aws lambda get-function --function-name ${FUNCTION_NAME} --profile ${AWS_PROFILE} --region=${AWS_REGION} | jq -r '.Configuration | .FunctionName'`
53+
54+
if [ ${function_name_cli} == ${FUNCTION_NAME} ]; then
55+
echo "Found the function. Ready to update."
56+
aws lambda update-function-code --function-name ${FUNCTION_NAME} --zip-file fileb://${ZIP_NAME}.zip --profile ${AWS_PROFILE} --region=${AWS_REGION}
57+
aws lambda update-function-configuration --function-name ${FUNCTION_NAME} --profile ${AWS_PROFILE} --region=${AWS_REGION}
58+
59+
else
60+
echo "Unable to find the function. We will create the function for you."
61+
62+
ACCOUNTID=`aws sts get-caller-identity --region=${AWS_REGION} --profile=${AWS_PROFILE} | jq -r '.Account'`
63+
ROLEARN=`aws iam get-role --role-name lambda-cli-role --region=${AWS_REGION} --profile=${AWS_PROFILE} | jq -r '.Role | .Arn'`
64+
65+
if [ -z ${ROLEARN} ]; then
66+
echo "The lambda role does not exist. Please ask your admin to create the role with the AWSLambdaBasicExecutionRole policy."
67+
else
68+
aws lambda create-function --function-name ${FUNCTION_NAME} --zip-file fileb://${ZIP_NAME}.zip --handler appflow_ga.lambda_handler --runtime python3.7 --role arn:aws:iam::${ACCOUNTID}:role/lambda-cli-role --profile ${AWS_PROFILE} --region=${AWS_REGION}
69+
aws lambda update-function-configuration --function-name ${FUNCTION_NAME} --profile ${AWS_PROFILE} --region=${AWS_REGION}
70+
71+
fi
72+
fi

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
psycopg2-binary==2.8.5
2+
SQLAlchemy==1.2.18
3+
pandas==0.24.2

0 commit comments

Comments
 (0)