-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfiserv_DB.py
274 lines (216 loc) · 7.65 KB
/
fiserv_DB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# Standard library imports
import os
import ntpath
import logging
import argparse
from datetime import datetime
# Related third-party imports
import boto3
import pandas as pd
from pypgrest import Postgrest
import utils
from config.fiserv import FIELD_MAPPING, REQUIRED_FIELDS
# Environment variables
AWS_ACCESS_ID = os.getenv("AWS_ACCESS_ID")
AWS_PASS = os.getenv("AWS_PASS")
BUCKET_NAME = os.getenv("BUCKET_NAME")
POSTGREST_TOKEN = os.getenv("POSTGREST_TOKEN")
POSTGREST_ENDPOINT = os.getenv("POSTGREST_ENDPOINT")
def handle_year_month_args(year, month, lastmonth, aws_s3_client):
"""
Parameters
----------
year : Int
Argument provided value for year.
month : Int
Argument provided value for month.
lastmonth : Bool
Argument that determines if the previous month should also be queried.
aws_s3_client : boto3 client object
For sending on to get_csv_list
Returns
-------
csv_file_list : List
A list of the csv files to be downloaded and upsert to Postgres.
"""
# If args are missing, default to current month and/or year
if not year:
f_year = datetime.now().year
else:
f_year = year
if not month:
f_month = datetime.now().month
else:
f_month = month
csv_file_list = get_csv_list(f_year, f_month, aws_s3_client)
if not month and not year:
if lastmonth == True:
prev_month = f_month - 1
prev_year = f_year
if prev_month == 0:
prev_year = prev_year - 1
prev_month = 12
logger.debug(
f"Getting data from folders: {prev_month}-{prev_year} and {f_month}-{f_year}"
)
prev_list = get_csv_list(prev_year, prev_month, aws_s3_client)
csv_file_list.extend(prev_list)
else:
logger.debug(f"Getting data from folders: {f_month}-{f_year}")
csv_file_list = [f for f in csv_file_list if f.endswith(".csv")]
return csv_file_list
def get_file_name(file_key):
"""
Returns the name of an email file based on the full s3 file path
:param file_key: the file path
:return: string
"""
return ntpath.basename(file_key)
def get_csv_list(year, month, client):
"""
Returns an array of files parsed into an actual array (as opposed to an object)
:return: array of strings
"""
csv_file_list = []
pending_csv_list = aws_list_files(year, month, client)
for csv_file in pending_csv_list:
csv_file_list.append(csv_file)
# Finally return the final list
return csv_file_list
def aws_list_files(year, month, client):
"""
Returns a list of email files.
:return: object
"""
response = client.list_objects(
Bucket=BUCKET_NAME,
Prefix="emails/current_processed/" + str(year) + "/" + str(month),
)
for content in response.get("Contents", []):
yield content.get("Key")
def id_field_creation(invoice_id, batch_number):
"""
Returns a field for matching between Fiserv and Smartfolio
It is defined as a concatenation of the Credit Card number and the invoice ID
:return: str
"""
## Old ID included sequence number but this was changing over time in PARD transactions
# return str(batch_number) + str(sequence_number) + str(invoice_id)
return str(batch_number) + str(invoice_id)
def determine_submit_date_field(row):
"""
Operates row-wise on a dataframe. Returns Funded Date for account 885 (PARD)
and Batch Date for everything else.
"""
if row["account"] == 885 and row["card_type"] != "AMEX":
return row["funded_date"]
else:
return row["Batch Date"]
def transform(fiserv_df):
"""
Formats and adds columns to a dataframe of Fiserv data for upload to postgres DB
Args: dataframe of data from fiserv report csv
Returns: formatted dataframe to conform to postgres schema
"""
for field in REQUIRED_FIELDS:
assert field in list(
fiserv_df.columns
), "Incorrect report supplied. Check required fields."
if "Product Code" in fiserv_df.columns:
fields = REQUIRED_FIELDS + ["Product Code"]
else:
fields = REQUIRED_FIELDS
fiserv_df = fiserv_df[fields]
# Renaming columns to match schema
fiserv_df = fiserv_df.rename(columns=FIELD_MAPPING)
# Account number field is only the last three digits of the account number
fiserv_df["account"] = fiserv_df["account"].astype(str).str[-3:].astype(int)
# Submit date depends on which account we're looking at
fiserv_df["submit_date"] = fiserv_df.apply(determine_submit_date_field, axis=1)
fiserv_df = fiserv_df.drop(["Batch Date"], axis=1)
# formatting before upsert
fiserv_df["invoice_id"] = fiserv_df["invoice_id"].astype("int64")
fiserv_df["batch_number"] = fiserv_df["batch_number"].astype("int64")
fiserv_df["batch_sequence_number"] = fiserv_df["batch_sequence_number"].astype(
"int64"
)
fiserv_df["meter_id"] = fiserv_df["meter_id"].astype("int64")
# Field for matching between Fiserv and Flowbird
fiserv_df["id"] = fiserv_df.apply(
lambda x: id_field_creation(
x["invoice_id"], x["batch_number"]
),
axis=1,
)
# Subtract one day from our submit date column
# This is to align old Fiserv email reports with the new ones
fiserv_df["submit_date"] = pd.to_datetime(fiserv_df["submit_date"]) - pd.Timedelta(
1, unit="D"
)
fiserv_df["submit_date"] = fiserv_df["submit_date"].dt.strftime("%m/%d/%Y")
# Drop dupes, sometimes there are duplicate records emailed
fiserv_df = fiserv_df.drop_duplicates(subset=["id"], keep="first")
return fiserv_df
def to_postgres(fiserv_df):
"""
Upserts fiserv data to local postgres DB
Args: Formatted dataframe from transform function
Returns: None.
"""
payload = fiserv_df.to_dict(orient="records")
# Connect to local DB
client = Postgrest(
POSTGREST_ENDPOINT,
token=POSTGREST_TOKEN,
headers={"Prefer": "return=representation"},
)
# Upsert to postgres DB
try:
res = client.upsert(resource="fiserv_reports_raw", data=payload)
except Exception as e:
logger.error(client.res.text)
raise e
def main(args):
aws_s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_ID,
aws_secret_access_key=AWS_PASS,
)
csv_file_list = handle_year_month_args(
args.year, args.month, args.lastmonth, aws_s3_client
)
if len(csv_file_list) == 0:
logger.debug("No Files found for selected months, nothing happened.")
return 0
# Access the files from S3 and place them into a dataframe
for csv_f in csv_file_list:
# Parse the file
response = aws_s3_client.get_object(Bucket=BUCKET_NAME, Key=csv_f)
df = pd.read_csv(response.get("Body"))
logger.debug(f"Loaded CSV File: {csv_f}")
# Ignore the emails which send a CSV with only column headers
# This happens with the "Contactless-Detail" reports for some reason
if not df.empty:
df = transform(df)
to_postgres(df)
# CLI arguments definition
parser = argparse.ArgumentParser()
parser.add_argument(
"--year",
type=int,
help=f"Year of folder to select, defaults to current year",
)
parser.add_argument(
"--month",
type=int,
help=f"Month of folder to select. defaults to current month",
)
parser.add_argument(
"--lastmonth",
type=bool,
help=f"Will download from current month folder as well as previous.",
default=False,
)
args = parser.parse_args()
logger = utils.get_logger(__file__, level=logging.DEBUG)
main(args)