-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzip_codes.py
executable file
·368 lines (332 loc) · 14.4 KB
/
zip_codes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""
ZIP-Code Crosswalks.
Takes USA ZIP-Codes and returns 5 digit US Census Zip Code Tabulation Areas (ZCTAs)
and/or their latitude & longitude centroid coordinates.
By J. A. Cooper https://github.com/cooperjaXC
"""
import os, json, inspect, numpy as np, pandas as pd
from collections import defaultdict
def reverse_dict(dictionary: dict):
"""Takes an input dictionary and reverses the keys and values.
Input dictionary values do not need to be unique, and they can be both individual and list types."""
# Create a defaultdict to store reversed values
reverse_dictionary = defaultdict(list)
for key, value in dictionary.items():
# https://stackoverflow.com/questions/13675296/how-to-overcome-typeerror-unhashable-type-list
if isinstance(value, list):
for v in value:
reverse_dictionary[v].append(key)
else:
reverse_dictionary[value].append(key)
# If all the values in the dict have a len <=1, set them as individual, non_listed values.
if all(len(v) <= 1 for v in reverse_dictionary.values()):
reverse_dictionary = {
k: v[0] if v else None for k, v in reverse_dictionary.items()
}
# Convert defaultdict item back to a regular dictionary
dict_return = dict(reverse_dictionary)
return dict_return
class ZipCodes:
"""Contains variables relating to ZIP-Codes and ZCTAs.
2010 Crosswalk data here comes from https://udsmapper.org/zip-code-to-zcta-crosswalk/ .
This dictionary is based on the 2010 Census' ZCTA data and will need to be updated with the new 2020 geographies."""
def __init__(self, year: int = 2020):
# Establish the Census year in question
try:
year = int(year)
except:
print(
"Year value of",
year,
"is an invalid input for the Zip Code Class. Default of '2020' will be set.",
)
year = 2020
if year not in [2010, 2020]: # Census years
if year < 2020:
# years in the 2010s (before 2020) all operate on 2010 census definitions.
year = 2010
else:
year = 2020
self.year = str(year)
# Find the path to the directory of JSONs containing ZIP-Code data.
filepath = os.path.join(
os.path.realpath(
os.path.abspath(
os.path.split(inspect.getfile(inspect.currentframe()))[0]
)
),
"json",
)
# If the `json` directory is in a sibling directory to the script
if os.path.exists(filepath) is False:
filepath = os.path.join(
os.path.dirname(os.path.dirname(filepath)), os.path.basename(filepath)
)
# Establish year-specific variables
# # ZIP-Code -> ZCTA Crosswalks
crosswalk_path = os.path.join(
filepath, "zipzcta_crosswalk_" + self.year + ".json"
)
with open(crosswalk_path) as open_cross:
# A dictionary containing the ZIP-Code (key; string)
# and corresponding ZCTA (value; string) for the given [year].
self.crosswalk = json.load(open_cross)
# # ZCTA -> ZIP-Code Reverse Crosswalks
self.reverse_crosswalk = reverse_dict(self.crosswalk)
# # ZCTA -> Latitude & Longitude Centroids
latlon_path = os.path.join(
filepath, "zcta_latloncentroid_" + self.year + ".json"
)
with open(latlon_path) as open_ll:
# A dictionary containing the ZCTA (key; string)
# and corresponding [Latitude, Longitude] coordinates (value; list) for the given [year].
self.latlon_centroids = json.load(open_ll)
def zip_code_formatter(postal_code):
"""Formats a USA ZIP-Code into the correct 5-digit format."""
# Put in some safeguards here in case you get entries with zip 9s or zips w/o the leading 0s.
postal_code = str(postal_code)
if len(postal_code) > 5:
postal_code = postal_code[:5]
if "-" in postal_code:
postal_code = postal_code.replace("-", "").replace(" ", "")
# Use zfill()? https://stackoverflow.com/questions/733454/best-way-to-format-integer-as-string-with-leading-zeros
if len(postal_code) == 3:
# No longer only uses postal codes "501" and "544". Expanded to include US overseas territories.
postal_code = "00" + postal_code
if len(postal_code) == 4:
postal_code = "0" + postal_code
# Catch nulls and return None
null_list = ["0", "nan", "null", "none", "0none", "00nan"]
if (not postal_code) or (postal_code.lower() in null_list):
postal_code = None
return postal_code
def zip_code_crosswalk(
postal_code,
year=2020,
use_postalcode_if_error: bool = False,
suppress_prints: bool = False,
):
"""This function takes a (1) postal ZIP Code and transforms it into a Zip Code Tabulation Area,
the US Census-defined polygonal region for a ZIP Code.
Postal ZIP Codes are not indicative of a continuous region; rather, they are functional attributes used by the
US Postal Service to deliver mail & goods. They can refer to a single post office (point), a discontinuous
region, or an area that transcends state borders. The ZCTA is the Census' way of polygonizing and ordering this
messy but commonly-used geographic identifying attribute.
Crosswalk data here comes from https://udsmapper.org/zip-code-to-zcta-crosswalk/ .
This function has been updated with the 2020 census' new ZCTA definitions."""
zipcrosswalk = ZipCodes(year).crosswalk
# Put in some safeguards here in case you get entries with zip 9s or zips w/o the leading 0s.
postal_code = zip_code_formatter(postal_code)
# Get ZCTA
if postal_code in zipcrosswalk:
zcta = zipcrosswalk[postal_code]
else:
nozctawarning = str(postal_code) + " is not in this repository's records."
if use_postalcode_if_error is True:
if suppress_prints is False:
print(nozctawarning, "The input postal code will be returned instead.")
zcta = postal_code
else:
if suppress_prints is False and str(postal_code).lower() != "none":
print(
nozctawarning,
"No ZCTA will be returned. Please double check your entry and try again.",
)
zcta = None
return zcta
def df_zip_crosswalk(
dataframe: pd.DataFrame,
zip_field_name: str,
year: int = 2020,
zcta_field_name: str = "zcta",
use_postalcode_if_error: bool = False,
suppress_prints: bool = False,
):
"""Takes a Pandas Dataframe with a ZIP-Code field and returns a ZCTA field using the crosswalk function.
Returns a Pandas dataframe."""
if zip_field_name not in dataframe.columns.to_list():
print(
zip_field_name,
"not in the submitted dataframe. No ZCTA field will be added.",
)
return dataframe
else:
outdf = dataframe.copy()
outdf[zcta_field_name] = (
outdf[zip_field_name]
.fillna("0")
.astype(int)
.astype(str)
.apply(
lambda x: zip_code_crosswalk(
x,
year=year,
use_postalcode_if_error=use_postalcode_if_error,
suppress_prints=suppress_prints,
)
)
)
return outdf
def reverse_zcta_crosswalk(
zcta, year=2020, suppress_prints: bool = False, use_zcta_if_error: bool = True
):
"""Function takes a ZCTA and returns a list of all ZIP Codes that correspond to that ZCTA."""
# Dictionary of all ZCTAs and their
zcta_reverse_xwalk = ZipCodes(year).reverse_crosswalk
# Put in some safeguards here in case you get entries with zip 9s or zips w/o the leading 0s.
zcta_formatted = zip_code_formatter(zcta)
# Get ZIP Codes
if zcta_formatted in zcta_reverse_xwalk:
zips = zcta_reverse_xwalk[zcta_formatted]
else:
nozctawarning = (
str(zcta_formatted) + " is not in this repository's records of ZCTAs."
)
# Check if the passed ZCTA is actually a non-ZCTA ZIP Code.
if zcta_formatted in ZipCodes(year).crosswalk:
if suppress_prints is False:
print(
nozctawarning,
"However, it is already a ZIP Code.",
"It will be returned inside a list as the only item.",
)
zips = [zcta_formatted]
elif use_zcta_if_error is True:
if suppress_prints is False:
print(
nozctawarning,
"It will be returned inside a list as the only item instead.",
)
zips = [zcta_formatted]
else:
if suppress_prints is False and str(zcta_formatted).lower() != "none":
print(
nozctawarning,
"No zips will be returned. Please double check your entry and try again.",
)
zips = []
return zips
def df_reverse_zcta_crosswalk(
dataframe: pd.DataFrame,
zcta_field_name: str = "zcta",
year_zip: int = 2020,
zip_field_name: str = "zip_codes",
use_zcta_if_error: bool = True,
suppress_prints: bool = False,
):
"""Takes a Pandas Dataframe with a ZCTA field and returns a ZIP-Code field using the reverse crosswalk function.
Returns a Pandas dataframe."""
if zcta_field_name not in dataframe.columns.to_list():
print(
zcta_field_name,
"not in the submitted dataframe. No ZIP Code field will be added.",
)
return dataframe
else:
outdf = dataframe.copy()
# Create and fill the ZCTA field.
outdf[zip_field_name] = (
outdf[zcta_field_name]
.fillna("0")
.astype(int)
.astype(str)
.apply(
lambda x: reverse_zcta_crosswalk(
x,
year=year_zip,
use_zcta_if_error=use_zcta_if_error,
suppress_prints=suppress_prints,
)
)
)
# Format the input ZCTA field for output.
outdf[zcta_field_name] = (
outdf[zcta_field_name]
.fillna("0")
.astype(int)
.astype(str)
.apply(lambda x: zip_code_formatter(x))
)
return outdf
def lat_lon_centroid(
postal_code,
year: int = 2020,
use_postalcode_if_error: bool = False,
suppress_prints: bool = False,
):
"""Returns the latitude and longitude coordinates in the centroid of the postal ZIP code's ZCTA
as defined by the US Census Bureau's TIGER shapefiles. The function will return a list: [lat, lon].
These centroids are not guaranteed to be on land.
If there is a body of water near the geometric center of the ZCTA, the centroid may be placed offshore."""
zcta = zip_code_formatter(
zip_code_crosswalk(postal_code, year, use_postalcode_if_error, suppress_prints)
)
latlon_crosswalk = ZipCodes(year).latlon_centroids
if zcta in latlon_crosswalk:
centroid = latlon_crosswalk[zcta]
else:
if suppress_prints is False:
zip_is_zcta = zcta == postal_code
if zip_is_zcta:
no_centroid_warning = (
str(postal_code)
+ " does not have a centroid in this repository's records"
)
else:
no_centroid_warning = (
str(postal_code)
+ "'s tabulation area "
+ str(zcta)
+ " does not have a centroid in this repository's records"
)
no_centroid_warning = (
no_centroid_warning
+ " for the census year "
+ str(year)
+ ".\n No centroid will be returned. Please double check your entry and try again."
)
print(no_centroid_warning)
# There is hot debate in coding communities on whether to reutrn None or an empty list for situations like this.
# https://softwareengineering.stackexchange.com/questions/120355/is-it-better-to-return-null-or-empty-values-from-functions-methods-where-the-ret
# https://www.reddit.com/r/Python/comments/30yb5t/return_none_or_not_to_return_none/
# I have chosen to return the None as coordinates within a list
# # so as to not destroy downstream list parsing efforts.
centroid = [None, None]
return centroid
def df_latlon_centroids(
dataframe: pd.DataFrame,
zip_in_field_name: str,
year: int = 2020,
keep_coordinates_field: bool = False,
use_postalcode_if_error: bool = False,
suppress_prints: bool = False,
):
"""
Takes a Pandas Dataframe with a ZIP-Code field and returns a [latitude, longitude] coordinates field
using the `lat_lon_centroid` function. Returns a Pandas dataframe.
"""
if zip_in_field_name not in dataframe.columns.to_list():
print(
zip_in_field_name,
"not in the submitted dataframe. No ZCTA centroid fields will be added.",
)
return dataframe
else:
outdf = dataframe.copy()
coordfieldname = "coordinates"
# Generate the coordinates field with a [lat, lon] list
outdf[coordfieldname] = outdf[zip_in_field_name].apply(
lambda x: lat_lon_centroid(
x,
year=year,
use_postalcode_if_error=use_postalcode_if_error,
suppress_prints=suppress_prints,
)
)
# Split the coordinates field into a lat and lon field separately.
outdf["lat"] = outdf[coordfieldname].apply(lambda x: x[0])
outdf["lon"] = outdf[coordfieldname].apply(lambda x: x[1])
# Remove the coordinates field unless the input argument says otherwise
if keep_coordinates_field is False:
outdf = outdf.drop(coordfieldname, axis=1)
return outdf