Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RODARS pipeline #1103

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
af506fb
#1088 explore new rodars data
gabrielwol Oct 28, 2024
d2003b0
#1088 initial rodars schema
gabrielwol Oct 29, 2024
3470497
#1088 add new divisions, fix null geometry issue, rename
gabrielwol Nov 20, 2024
7f4754f
#1088 RODARs DAG
gabrielwol Nov 29, 2024
b402e72
#1088 bug fix: update each locationindex geom separately
gabrielwol Nov 29, 2024
9a64344
#1088 readme updates
gabrielwol Nov 29, 2024
19c6726
#1088 switch to congestion_events schema, separate issues & locations
gabrielwol Dec 6, 2024
c78b0fb
#1088 update dag to 2 tasks
gabrielwol Dec 6, 2024
1eed832
#1088 lookup tables
gabrielwol Dec 6, 2024
e0f678b
#1088 fix issue insert
gabrielwol Dec 6, 2024
e4f4e1b
#1088 lanes affected fixes
gabrielwol Dec 9, 2024
95a124c
#1088 issues_locations -> issue_locations
gabrielwol Dec 10, 2024
32e6436
#1088 convert timestamps to eastern, data type fixes, misc
gabrielwol Dec 10, 2024
3eb07f9
#1088 vds_bot -> events_bot
gabrielwol Dec 12, 2024
2d402a2
#1088 add on insert delete trigger to locations
gabrielwol Dec 12, 2024
ef206a0
#1088 update permissions
gabrielwol Dec 12, 2024
11504f7
#1088 add back date filters
gabrielwol Dec 12, 2024
f917d9c
#1088 remove TM3 issues
gabrielwol Dec 20, 2024
b0da1ac
#1088 itsc_locations updates
gabrielwol Dec 20, 2024
6d08f6a
#1088 rename itsc->rodars
gabrielwol Dec 20, 2024
33c6b1b
#1088 rename itsc->rodars (files)
gabrielwol Dec 20, 2024
d0a411c
#1088 rename itsc->rodars (py)
gabrielwol Dec 20, 2024
0ba9459
#1088 rename itsc->rodars (py)
gabrielwol Dec 20, 2024
66f772a
#1088 rename itsc->rodars (harder)
gabrielwol Dec 20, 2024
c603cef
#1088 smol readme update
gabrielwol Dec 20, 2024
85830f3
#1088 sqlfluff
gabrielwol Jan 10, 2025
ab3388b
#1088 remove some dated exploration work
gabrielwol Jan 10, 2025
497c481
#1088 fix rodars_factors -> itsc_factors inconsistency
gabrielwol Jan 10, 2025
e73e818
#1088 rm outdated update sql
gabrielwol Jan 10, 2025
f06ef50
#1088 readme, comment updates
gabrielwol Jan 10, 2025
a5f895c
#1088 format callouts properly?
gabrielwol Jan 10, 2025
a3031de
#1088 format callouts properly? try 2
gabrielwol Jan 10, 2025
8d7ec93
#1088 add lanesaffectedpattern codes+description
gabrielwol Jan 13, 2025
baa69d6
#1088 get centreline_geom even when not in _latest
gabrielwol Jan 13, 2025
03a95ed
#1088 readme updates; add example queries, data dictionary
gabrielwol Jan 13, 2025
4640911
#1088 remove extra >s
gabrielwol Jan 14, 2025
0ca7062
#1134 add data-sources tag to DAGs
gabrielwol Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions dags/rodars_pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import sys
from functools import partial
from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable

DAG_NAME = 'rodars_pull'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ['Unknown'])

repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)

from events.construction.rodars_functions import (
fetch_and_insert_issue_data, fetch_and_insert_location_data
)
from dags.dag_functions import task_fail_slack_alert, get_readme_docmd

README_PATH = os.path.join(repo_path, 'events/construction/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past': False,
'start_date': datetime(2024, 11, 27),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, #Allow for progressive longer waits between retries
'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True),
'catchup': True,
}

@dag(
dag_id=DAG_NAME,
default_args=default_args,
max_active_runs=1,
template_searchpath=[
os.path.join(repo_path,'events/construction/sql')
],
doc_md=DOC_MD,
tags=['bdit_data-sources', 'rodars', 'pull', 'itsc_central'],
schedule='0 4 * * *' #daily at 4am
)

def rodars_dag():
@task
def pull_rodars_issues(ds = None):
"Get RODARS data from ITSC and insert into bigdata `congestion_events.itsc_issues`"
itsc_bot = PostgresHook('itsc_postgres')
events_bot = PostgresHook('events_bot')
fetch_and_insert_issue_data(select_conn=itsc_bot, insert_conn=events_bot, start_date=ds)

@task
def pull_rodar_locations(ds = None):
"Get RODARS data from ITSC and insert into bigdata `congestion_events.itsc_issue_locations`"
itsc_bot = PostgresHook('itsc_postgres')
events_bot = PostgresHook('events_bot')
fetch_and_insert_location_data(select_conn=itsc_bot, insert_conn=events_bot, start_date=ds)
#add a delete task to remove outdated revisions?

pull_rodars_issues()
pull_rodar_locations()

rodars_dag()
237 changes: 237 additions & 0 deletions events/construction/readme.md

Large diffs are not rendered by default.

Binary file added events/construction/rodars_form.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
200 changes: 200 additions & 0 deletions events/construction/rodars_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import os
import logging
import pandas as pd
from numpy import nan
import struct
import json
from io import BytesIO
from psycopg2 import sql, Error
from psycopg2.extras import execute_values

from airflow.providers.postgres.hooks.postgres import PostgresHook

SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

def coordinates_from_binary(br):
'Read longitude and latitude as doubles (8 bytes each)'
longitude, latitude = struct.unpack('dd', br.read(16))
return (longitude, latitude)

def coordinates_to_geomfromtext(l):
'Formats points and line geoms to be ingested by postgres `st_geomfromtext`'
geom_type = 'POINT' if len(l) == 1 else 'LINESTRING'
coords = ', '.join([f"{x[0]} {x[1]}" for x in l])
return f"{geom_type}({coords})"

def geometry_from_bytes(geo_bytes):
'Initialize a stream to read binary data from the byte array'
coordinates_list = []
with BytesIO(geo_bytes) as ms:
# Read the first 4 bytes = length
try:
len_val = struct.unpack('i', ms.read(4))[0]
except struct.error:
#struct.error: unpack requires a buffer of 4 bytes
return None
# Iterate and unpack each pair of doubles as coordinates
for _ in range(len_val):
coordinates = coordinates_from_binary(ms)
coordinates_list.append(coordinates)
return coordinates_list

def process_lanesaffected(json_str):
'''Converts a json variable to pandas dataframe.

Top level json attributes are given _toplevel suffix,
while contents of LaneApproaches nested json keeps original keys,
with exception of FeatureId (centreline_id) and RoadId (linear_name_id).'''

if (json_str == 'Unknown') | (json_str is None):
return None
try:
lanesaffected = json.loads(json_str)
except json.decoder.JSONDecodeError as e:
LOGGER.debug("Json str not parsed: %s", json_str)
LOGGER.debug(e)
return None
#expand laneapproach nested json
try:
lanes = pd.json_normalize(lanesaffected, 'LaneApproaches')
except TypeError as e:
LOGGER.debug("Json str not parsed: %s", lanesaffected)
LOGGER.debug(e)
return None
lanes = lanes.rename(columns={
'FeatureId': 'centreline_id',
'RoadId': 'linear_name_id'
})
#add extra, non-nested variables
keys = list(lanesaffected.keys())
keys.remove('LaneApproaches')
if keys is not None:
for key in keys:
lanes.insert(0, f"{key}_toplevel", lanesaffected[key])
return lanes

def fetch_and_insert_issue_data(
select_conn = PostgresHook('rodars_postgres'),
insert_conn = PostgresHook('vds_bot'),
start_date = None
):
'''Fetch, process and insert data from ITS Central issuedata table.'''
select_fpath = os.path.join(SQL_DIR, 'select-rodars_issues.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read()).format(
start = sql.Literal(start_date)
)
try:
with select_conn.get_conn() as con, con.cursor() as cur:
LOGGER.info("Fetching RODARS data.")
cur.execute(select_query)
data = cur.fetchall()
df = pd.DataFrame(data)
df.columns=[x.name for x in cur.description]
except Error as exc:
LOGGER.critical("Error fetching RODARS data.")
LOGGER.critical(exc)
raise Exception()

#transform values for inserting
df_final = df.replace({pd.NaT: None, nan: None})
df_final = [tuple(x) for x in df_final.to_numpy()]

insert_fpath = os.path.join(SQL_DIR, 'insert-rodars_issues.sql')
with open(insert_fpath, 'r', encoding="utf-8") as file:
insert_query = sql.SQL(file.read())

with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, insert_query, df_final)

def fetch_and_insert_location_data(
select_conn = PostgresHook('rodars_postgres'),
insert_conn = PostgresHook('vds_bot'),
start_date = None
):
'''Fetch, process and insert data from ITS Central issuelocationnew table.

- Fetches data from ITS Central
- Processes geometry data stored in binary (accounts for both points/lines).
- Unnests mutli layered lanesaffected json column into tabular form.
- Performs some checks on columns unnested from json.
- Inserts into RDS `congestion_events.rodars_issue_locations` table.
'''

select_fpath = os.path.join(SQL_DIR, 'select-rodars_issue_locations.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read()).format(
start = sql.Literal(start_date)
)
try:
with select_conn.get_conn() as con, con.cursor() as cur:
LOGGER.info("Fetching RODARS data.")
cur.execute(select_query)
data = cur.fetchall()
df = pd.DataFrame(data)
df.columns=[x.name for x in cur.description]
except Error as exc:
LOGGER.critical("Error fetching RODARS data.")
LOGGER.critical(exc)
raise Exception()

pkeys = ['divisionid', 'issueid', 'timestamputc', 'locationindex']

geom_data = df['geometry'].map(geometry_from_bytes)
valid_geoms = [not(x is None) for x in geom_data]
geoms_df = df[pkeys][valid_geoms]
geoms_df.insert(3, 'geom_text', geom_data[valid_geoms].map(coordinates_to_geomfromtext))
df_no_geom = pd.merge(df.drop('geometry', axis = 1), geoms_df, on = pkeys, how='left')

expanded_list = []
for row in df_no_geom.iterrows():
expanded = process_lanesaffected(row[1]['lanesaffected'])
# Add primary key columns to the expanded data
if expanded is None:
continue
for col in pkeys:
expanded[col] = row[1][col]
expanded_list.append(expanded)
df_expanded = pd.concat(expanded_list, ignore_index=True)
df_no_geom = pd.merge(df_no_geom, df_expanded, on = pkeys, how='left')

cols_to_insert = [
'divisionid', 'issueid', 'timestamputc', 'locationindex', 'mainroadname', 'fromroadname',
'toroadname', 'direction_toplevel', 'lanesaffected', 'streetnumber', 'locationtype', 'groupid',
'groupdescription', 'locationblocklevel_toplevel', 'roadclosuretype_toplevel',
'encodedcoordinates_toplevel', 'locationdescription_toplevel', 'direction', 'roadname',
'centreline_id', 'linear_name_id', 'lanesaffectedpattern', 'laneblocklevel',
'roadclosuretype', 'geom_text'
]
df_no_geom.columns = map(str.lower, df_no_geom.columns)
#check for extra columns unpacked from json.
extra_cols = [col for col in df_no_geom.columns if col not in cols_to_insert]
if extra_cols != []:
LOGGER.warning('There are extra columns unpacked from json not being inserted: %s', extra_cols)
#add missing columns (inconsistent jsons)
missing_cols = [col for col in cols_to_insert if col not in df_no_geom.columns]
if missing_cols != []:
for col in missing_cols:
df_no_geom.insert(0, col, None)

#convert some datatypes to int
cols_to_convert = ["locationblocklevel_toplevel", "roadclosuretype_toplevel", "direction", "centreline_id", "linear_name_id", "laneblocklevel", "roadclosuretype", "groupid"]
df_no_geom[cols_to_convert] = df_no_geom[cols_to_convert].replace({pd.NaT: 0, nan: 0})
df_no_geom[cols_to_convert] = df_no_geom[cols_to_convert].astype('int32')

#transform values for inserting
df_no_geom = df_no_geom.replace({pd.NaT: None, nan: None, '': None})

#arrange columns for inserting
df_no_geom = df_no_geom[cols_to_insert]
df_no_geom = [tuple(x) for x in df_no_geom.to_numpy()]

insert_fpath = os.path.join(SQL_DIR, 'insert-rodars_issue_locations.sql')
with open(insert_fpath, 'r', encoding="utf-8") as file:
insert_query = sql.SQL(file.read())

with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, insert_query, df_no_geom, page_size = 1000)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CREATE OR REPLACE FUNCTION congestion_events.delete_old_rodars_issue_locations()
RETURNS trigger AS $$
BEGIN

WITH latest AS (
SELECT
divisionid,
issueid,
MAX(timestamputc) AS max_timestamputc
FROM congestion_events.rodars_issue_locations
GROUP BY
divisionid,
issueid
)

-- Delete records older than the current one for the same primary keys
DELETE FROM congestion_events.rodars_issue_locations AS iil
USING latest
WHERE
iil.divisionid = latest.divisionid
AND iil.issueid = latest.issueid
AND iil.timestamputc < latest.max_timestamputc;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

GRANT EXECUTE ON FUNCTION congestion_events.delete_old_rodars_issue_locations TO congestion_admins;
GRANT EXECUTE ON FUNCTION congestion_events.delete_old_rodars_issue_locations TO events_bot;

COMMENT ON FUNCTION congestion_events.delete_old_rodars_issue_locations IS
'Deletes old records from congestion_events.rodars_issue_locations on insert (trigger).';
51 changes: 51 additions & 0 deletions events/construction/sql/create-function-get_lanesaffected_sum.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-- FUNCTION: itsc_factors.get_lanesaffected_sums(text)

-- DROP FUNCTION IF EXISTS itsc_factors.get_lanesaffected_sums(text);

CREATE OR REPLACE FUNCTION itsc_factors.get_lanesaffected_sums(input_string text)
RETURNS TABLE (
lap_descriptions text[],

Check notice on line 7 in events/construction/sql/create-function-get_lanesaffected_sum.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

LT01: Expected single whitespace between 'text' keyword and start square bracket '['.
lane_open_auto integer,
lane_closed_auto integer,
lane_open_bike integer,
lane_closed_bike integer,
lane_open_ped integer,
lane_closed_ped integer,
lane_open_bus integer,
lane_closed_bus integer
)
LANGUAGE plpgsql
COST 100
STABLE PARALLEL SAFE
ROWS 1

AS $BODY$
DECLARE
code_list TEXT[];
BEGIN

-- Iterate over the list and aggregate sums for each code
RETURN QUERY
SELECT
ARRAY_AGG(lane_status) AS lap_descriptions,
COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Car'), 0)::int AS lane_open_auto,
COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Car'), 0)::int AS lane_closed_auto,
COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Bike'), 0)::int AS lane_open_bike,
COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Bike'), 0)::int AS lane_closed_bike,
COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Pedestrian'), 0)::int AS lane_open_ped,
COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Pedestrian'), 0)::int AS lane_closed_ped,
COALESCE(SUM(lane_open) FILTER (WHERE mode = 'Bus'), 0)::int AS lane_open_bus,
COALESCE(SUM(lane_closed) FILTER (WHERE mode = 'Bus'), 0)::int AS lane_closed_bus
FROM UNNEST(regexp_split_to_array(input_string, E'(?=(..)+$)')) AS c
JOIN itsc_factors.lanesaffectedpattern AS lap ON lap.code = c;

END;
$BODY$;

ALTER FUNCTION itsc_factors.get_lanesaffected_sums(text) OWNER TO congestion_admins;

GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO public;

GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO congestion_admins;

GRANT EXECUTE ON FUNCTION itsc_factors.get_lanesaffected_sums(text) TO events_bot;
17 changes: 17 additions & 0 deletions events/construction/sql/create-table-itsc_factors-direction.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Table: itsc_factors.direction

-- DROP TABLE IF EXISTS itsc_factors.direction;

CREATE TABLE IF NOT EXISTS itsc_factors.direction
(
code integer NOT NULL,
direction text COLLATE pg_catalog."default" NOT NULL,
CONSTRAINT itsc_factors_direction_pkey PRIMARY KEY (code)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS itsc_factors.direction
OWNER TO congestion_admins;

GRANT SELECT ON TABLE itsc_factors.direction TO events_bot;
Loading
Loading