Command line python utility to index, partition, aggregate, or move large datasets (Inrix, HERE) within a PostgreSQL database with partitioned tables
Since these datasets are so large, it is partitioned by month in the database, and operations should be batched so that these workloads can be scheduled for off-peak hours on the database server. While it would have been possible to write these loops in PostgreSQL, because of the way transaction isolation occurs in PostgreSQL, no change would be committed until the loop finishes. So it is therefore preferable to call each operation from an external script, in this case Python.
Note: This utility was initially created for Inrix data, it is being migrated to be more generalizable to looping over lengthy PostgreSQL maintenance tasks for big datasets.
Download the contents of this folder. Test with python -m unittest
. This project assumes Python 3.5 and above and has not been tested in Python 2.x. The only external dependency is psycopg2, a database connection package for PostgreSQL (installation instructions). The following sql functions are necessary for the functionality to work:
- SQL aggregation functions
- Copying data to a new schema from TMCs that are not in the
gis.inrix_tmc_tor
table - Removing the moved data
- Creating indexes
To run from the command line see the help text below. So for example inrix_util -i -y 201207 201212
would index the tables from July to December 2012 inclusive with an index on the tmc
, tx
, and score
columns respectively. Descriptions of the four main commands follow.
usage: data_util.py [-h] (-i | -p | -a | -r | -m)
(-y YYYYMM YYYYMM | -Y YEARSJSON) [-d DBSETTING]
[-t TABLENAME] [-s SCHEMANAME] [-tx TIMECOLUMN]
[--function FUNCTION] [--alldata] [--tmctable TMCTABLE]
[--idx IDX]
Index, partition, or aggregate traffic data in a database.
optional arguments:
-h, --help show this help message and exit
-i, --index Index the tables specified by the years argument
-p, --partition Add Check Constraints to specified tablesto complete
table partitioning
-a, --aggregate Aggregate raw data
-r, --runfunction Run the specified function
-m, --movedata Remove data from TMCs outside Toronto
-y YYYYMM YYYYMM, --years YYYYMM YYYYMM
Range of months (YYYYMM) to operate overfrom startdate
to enddate
-Y YEARSJSON, --yearsjson YEARSJSON
Written dictionary which contains years as keyand
start/end month like {'2012'=[1,12]}
-d DBSETTING, --dbsetting DBSETTING
Filename with connection settings to the database
(default: opens default.cfg)
-t TABLENAME, --tablename TABLENAME
Base table on which to perform operation, like ta_
-s SCHEMANAME, --schemaname SCHEMANAME
Base schema on which to perform operation, like here
-tx TIMECOLUMN, --timecolumn TIMECOLUMN
Time column for partitioning, default: tx
--function FUNCTION SQL function to run for the specified months, specify
only the name of the function.
--alldata For aggregating, specify using all rows, regardless of
score
--tmctable TMCTABLE Specify subset of tmcs to use, default:
gis.inrix_tmc_tor
--idx IDX Index functions to call, parameters are keys for
index_functions.json
The yearsjson parameter is an open issue.
Your db.cfg
file should look like this:
[DBSETTINGS]
database=bigdata
host=localhost
user=user
password=password
Index the specified months of disaggregate data. If none of [--indextmc] [--indextx] [--indexscore]
are specified, defaults to creating an index for each of those columns.
It is faster to add the CHECK CONSTRAINTS
which fully enable table partitioning after all data has been loaded. This function adds these constraints to the specified tables. Since the Inrix data are partitioned by month, the constraint is on the timestamp column (default tx
).
Calls SQL aggregation functions to aggregate the disaggregate data into 15-minute bins for other analytic applications.
--tmctable
Specify the table of TMCs to keep in the aggregate table
--alldata
The default is to only use score = 30
records (observed data). If this flag is used all available records are used.
Removes data from TMCs outside the City of Toronto in two stages by calling two sql
functions:
- Copying data to a new schema from TMCs that are not in the
gis.inrix_tmc_tor
table - Removing the moved data. Instead of running a
DELETE
operation, it is faster to copy the subset of data to retain,DROP
the table, and then move the remaining data back to the original table.
This operation requires dropping all constraints and indexes as well.
Want to run an arbitrary function on a table partitioned by month? Create a sql function that accepts 1 argument: yyyymm
that wil operate on tables of the form PARENT_YYYYMM
. You can then execute that function over a range of months provided in the -y
argument. E.g.
python data_util.py --runfunction -y 201601 201612 -d db.cfg -s here --function do_something
Will execute:
SELECT here.do_something('201601');
SELECT here.do_something('201602');
--And so on
Because of RAM issues on the server we used it was necessary to restart the PostgreSQL engine nightly to free up RAM. Because we wanted processing to continue after, the util had to accept these failures gracefully. I eventually wrote an execution wrapper based on the validation I found from asking this question.
You can find that code here. Note that you should consider this very carefully for your own system, since the connection regularly failing isn't normal and infinite retrying queries due to dropped connections could be a Big Problem.
The advantages of making this a command line application are:
- easier to run with different options without having to open an interactive python shell or editing a script directly
- scheduling tasks with
cron
for Un*x ortask scheduler
on Windows
Two handy resources for writing the argument parser were the argparse API documentation and this tutorial. For testing (see below) it was helpful to migrate this a function parse_args()
which returns the parsed arguments as a dictionary.
Unit testing code is awesome and totally worth the effort.
Think of it like exploring whether your code works or not in the interactive shell, but instead you have a history of all of the tests you attempt. If you add or modify functionality, it only takes a second to see if the functions still work the way you initially intended them to. It also forces you to break code up into functions that are easily testable, which makes code generally more readable and understandable. It improves the portability of the code, so that functions can be easily extended and then re-tested. I reused the argument parsing component into automating congestion mapping.
I was inspired by this answer to use the similar directory structure and use the unittest
module to structure my tests. Unittest documentation is here.
- Pull out parts that could be reusable to add to the possible
bdit
Python module, such asYYYYMM
parsing anddb.cfg
command line arguments - Reorganize folders so that the sql functions used in this utility are in the same folder (to make it easier to download everything necessary for this utility.) and maybe raise an error to create them if they aren't present when
data_util
is run