-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_data_preprocessing.py
81 lines (65 loc) · 3.47 KB
/
aws_data_preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import pandas as pd
import geopandas as gpd
import datetime as dt
# function to perform reverse geocoding
def conduct_reverse_geocoding(df: pd.DataFrame, gdf_shape):
"""
enriches the dataframe with the countries the long-lat data points are located in
:param df: the dataframe with the long-lat data
:param gdf_shape: shapefile with the country borders of the entire world
:return: df: original dataframe with additional country column
"""
# convert df to GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.long, df.lat))
# Merge DataFrames
pointInPolys = gpd.sjoin(gdf, gdf_shape, how='left')
# Drop columns we don't need
pointInPolys = pointInPolys.drop(
columns=['french_shor', 'status', 'index_right', 'color_code', 'status'])
return pointInPolys
start = dt.datetime.now()
# Read in shapefile with country boundaries
gdf_shape = gpd.GeoDataFrame.from_file('./data/shapefiles/world/world-administrative-boundaries.shp')
# the following nested for loops read in all single files and perform the necessary data preprocessing on them
# for memory limitation reasons, the datasets will be combined to a shared dataset later
path = './data/aws_data/performance/'
path2 = ''
n = 0 # count iterations for runtime checks
for i in [["type=fixed/", "fixed"], ["type=mobile/", "mobile"]]:
path_i = path + path2 + i[0]
category = i[1]
for j in [['year=2019/', 2019], ['year=2020/', 2020], ['year=2021/', 2021], ['year=2022/', 2022]]:
path_j = path_i + j[0]
year = j[1]
quarter_list = []
if year != 2022:
quarter_list = [['quarter=1/', '01'], ['quarter=2/', '04'], ['quarter=3/', '07'], ['quarter=4/', '10']]
elif year == 2022:
quarter_list = [['quarter=1/', '01'], ['quarter=2/', '04'], ['quarter=3/', '07']]
for k in quarter_list:
path_k = path_j + k[0]
month = k[1]
path_k = path_k + str(year)+"-"+month+"-01_performance_"+str(category)+"_tiles.parquet"
df = pd.read_parquet(path_k, engine='pyarrow')
# add year, month and category information to the dataframe -
# this information is only in file names, not in the files itself
df['quarter'] = dt.date(year, int(month), 1)
df['category'] = category
# clean the location column, so that one latitude and one longitude value remain
df['tile'] = df['tile'].apply(lambda x: x.replace("POLYGON", ""))
df['tile'] = df['tile'].apply(lambda x: x.replace("(", ""))
df['tile'] = df['tile'].apply(lambda x: x.replace(")", ""))
df['tile'] = df['tile'].apply(lambda x: x.replace(",", ""))
df[['long', 'lat', 'rest']] = df['tile'].str.split(pat=" ", n=2, expand=True)
# drop unecessary columns
df = df.drop(columns=['rest', 'tile', 'quadkey'])
# retrieve country from lat-long data (reverse geocoding)
df = conduct_reverse_geocoding(df, gdf_shape)
# save final dataframe as csv
df.to_csv(f'./data/preprocessed_files/whole_world/whole_world_{n}.csv', sep=';')
# filter for Germany
df_germany = df[df['iso3'] == 'DEU']
df_germany.to_csv(f'./data/preprocessed_files/germany/germany_{n}.csv', sep=';')
n += 1
print(str(n) + " Datasets processed in " + str(dt.datetime.now()-start))
print("Successful execution in " + str(dt.datetime.now()-start))