Skip to content

Commit

Permalink
#868, #879 remove/replace references to old 15min tables
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Jul 26, 2024
1 parent 88529ac commit 88b2654
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 94 deletions.
22 changes: 3 additions & 19 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def check_partitions():
create_annual_partition = PostgresOperator(
task_id='create_annual_partitions',
sql=["SELECT miovision_api.create_yyyy_volumes_partition('volumes', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'datetime_bin')",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt_unfiltered', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
postgres_conn_id='miovision_api_bot',
autocommit=True
)
Expand Down Expand Up @@ -168,21 +167,6 @@ def zero_volume_anomalous_ranges_task(ds = None, **context):
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
agg_zero_volume_anomalous_ranges(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_15_min_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
time_period = (ds, ds_add(ds, 1))
#no user specified intersection
if context["params"]["intersection"] == [0]:
with mio_postgres.get_conn() as conn:
aggregate_15_min(conn, time_period=time_period)
#user specified intersection
else:
INTERSECTIONS = tuple(context["params"]["intersection"])
with mio_postgres.get_conn() as conn:
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
aggregate_15_min(conn, time_period=time_period, intersections=intersections)

@task
def aggregate_volumes_daily_task(ds = None, **context):
mio_postgres = PostgresHook("miovision_api_bot")
Expand Down Expand Up @@ -213,7 +197,7 @@ def get_report_dates_task(ds = None, **context):
intersections = get_intersection_info(conn, intersection=INTERSECTIONS)
get_report_dates(conn, time_period=time_period, intersections=intersections)

find_gaps_task() >> aggregate_15_min_mvt_task() >> [aggregate_15_min_task(), zero_volume_anomalous_ranges_task()] >> aggregate_volumes_daily_task()
find_gaps_task() >> aggregate_15_min_mvt_task() >> zero_volume_anomalous_ranges_task() >> aggregate_volumes_daily_task()
get_report_dates_task()

t_done = ExternalTaskMarker(
Expand All @@ -225,7 +209,7 @@ def get_report_dates_task(ds = None, **context):
@task_group(tooltip="Tasks to check critical data quality measures which could warrant re-running the DAG.")
def data_checks():
data_check_params = {
"table": "miovision_api.volumes_15min_mvt",
"table": "miovision_api.volumes_15min_mvt_unfiltered",
"lookback": '60 days',
"dt_col": 'datetime_bin',
"threshold": 0.7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ WITH temp AS (
),

aggregate_insert AS (
INSERT INTO miovision_api.volumes_15min_mvt(
INSERT INTO miovision_api.volumes_15min_mvt_unfiltered(
intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
)
SELECT DISTINCT ON (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ DECLARE
BEGIN

WITH aggregate_delete AS (
DELETE FROM miovision_api.volumes_15min_mvt
DELETE FROM miovision_api.volumes_15min_mvt_unfiltered
WHERE
intersection_uid = ANY(target_intersections)
AND datetime_bin >= start_date
Expand All @@ -41,7 +41,7 @@ BEGIN
SELECT COUNT(*) INTO n_deleted
FROM aggregate_delete;

RAISE NOTICE 'Deleted % rows from miovision_api.volumes_15min_mvt.', n_deleted;
RAISE NOTICE 'Deleted % rows from miovision_api.volumes_15min_mvt_unfiltered.', n_deleted;

END;

Expand All @@ -57,5 +57,5 @@ GRANT EXECUTE ON FUNCTION miovision_api.clear_15_min_mvt(timestamp, timestamp, i
TO miovision_admins;

COMMENT ON FUNCTION miovision_api.clear_15_min_mvt(timestamp, timestamp, integer [])
IS '''Clears data from `miovision_api.volumes_15min_mvt` in order to facilitate re-pulling.
IS '''Clears data from `miovision_api.volumes_15min_mvt_unfiltered` in order to facilitate re-pulling.
`intersections` param defaults to all intersections.''';
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ $BODY$;

COMMENT ON FUNCTION miovision_api.create_mm_nested_volumes_partitions(text, integer, integer) IS
'''Create a new month partition under the parent year table `base_table`. Only to be used for
miovision_api `volumes_15min` and `volumes_15min_mvt` tables.
Example: `SELECT miovision_api.create_yyyy_volumes_partition(''volumes_15min'', 2023)`''';
miovision_api `volumes_15min_mvt_unfiltered` table.
Example: `SELECT miovision_api.create_yyyy_volumes_partition(''volumes_15min_mvt_unfiltered'', 2023)`''';

ALTER FUNCTION miovision_api.create_mm_nested_volumes_partitions(text, integer, integer) OWNER TO miovision_admins;
GRANT EXECUTE ON FUNCTION miovision_api.create_mm_nested_volumes_partitions(text, integer, integer) TO miovision_api_bot;
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ $BODY$;

COMMENT ON FUNCTION miovision_api.create_yyyy_volumes_15min_partition(text, integer) IS
'''Create a new year partition under the parent table `base_table`. Only to be used for
miovision_api `volumes_15min` and `volumes_15min_mvt` tables.
Example: `SELECT miovision_api.create_yyyy_volumes_partition(''volumes_15min'', 2023)`''';
miovision_api `volumes_15min_mvt_unfiltered` table.
Example: `SELECT miovision_api.create_yyyy_volumes_partition(''volumes_15min_mvt_unfiltered'', 2023)`''';

ALTER FUNCTION miovision_api.create_yyyy_volumes_15min_partition(text, integer) OWNER TO miovision_admins;
GRANT EXECUTE ON FUNCTION miovision_api.create_yyyy_volumes_15min_partition(text, integer) TO miovision_api_bot;
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ BEGIN
v15.leg,
MIN(v15.datetime_bin) AS range_start,
MAX(v15.datetime_bin) + interval '15 minutes' AS range_end
FROM miovision_api.volumes_15min_mvt AS v15
FROM miovision_api.volumes_15min_mvt_unfiltered AS v15
WHERE
v15.datetime_bin >= start_date
AND v15.datetime_bin < start_date + interval '1 day'
--this script will only catch zeros for classification_uid 1,2,6,10
--since those are the ones that are zero padded in volumes_15min_mvt. Filter for additional speed.
--since those are the ones that are zero padded in volumes_15min_mvt_unfiltered. Filter for additional speed.
AND v15.classification_uid IN (1,2,6,10)
AND v15.intersection_uid = ANY(target_intersections)
GROUP BY
Expand Down
18 changes: 9 additions & 9 deletions volumes/miovision/sql/views/create-view-volumes_15min_atr.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
DROP VIEW gwolofs.miovision_15min_atr_filtered;
CREATE VIEW gwolofs.miovision_15min_atr_filtered AS (
DROP VIEW miovision_api.miovision_15min_atr_filtered;
CREATE VIEW miovision_api.miovision_15min_atr_filtered AS (

--entries
SELECT
Expand All @@ -9,8 +9,8 @@ CREATE VIEW gwolofs.miovision_15min_atr_filtered AS (
v15.leg,
mmm.entry_dir AS dir,
SUM(v15.volume) AS volume
FROM gwolofs.volumes_15min_mvt_filtered AS v15
JOIN gwolofs.miovision_movement_map_new AS mmm USING (movement_uid, leg)
FROM miovision_api.volumes_15min_mvt_filtered AS v15
JOIN miovision_api.miovision_movement_map_new AS mmm USING (movement_uid, leg)
GROUP BY
v15.intersection_uid,
v15.datetime_bin,
Expand All @@ -28,8 +28,8 @@ CREATE VIEW gwolofs.miovision_15min_atr_filtered AS (
mmm.exit_leg,
mmm.exit_dir,
SUM(v15.volume)
FROM gwolofs.volumes_15min_mvt_filtered AS v15
JOIN gwolofs.miovision_movement_map_new AS mmm USING (movement_uid, leg)
FROM miovision_api.volumes_15min_mvt_filtered AS v15
JOIN miovision_api.miovision_movement_map_new AS mmm USING (movement_uid, leg)
GROUP BY
v15.intersection_uid,
v15.datetime_bin,
Expand All @@ -40,12 +40,12 @@ CREATE VIEW gwolofs.miovision_15min_atr_filtered AS (

--test: 0.2 s with primary key
SELECT *
FROM gwolofs.miovision_15min_atr_filtered
FROM miovision_api.miovision_15min_atr_filtered
WHERE
intersection_uid = 6
AND classification_uid = 1
AND datetime_bin = '2024-06-25 12:00:00'
AND leg <> LEFT(dir, 1)
AND leg <> LEFT(dir, 1);

--DR i0627 test using new view
--41s for original, 1:06 for view (1.3M rows)
Expand All @@ -56,7 +56,7 @@ SELECT
volumes.dir,
classifications.classification,
SUM(volumes.volume) AS volume
FROM gwolofs.miovision_15min_atr_filtered AS volumes
FROM miovision_api.miovision_15min_atr_filtered AS volumes
INNER JOIN miovision_api.classifications USING (classification_uid)
WHERE
volumes.classification_uid NOT IN (2, 7)
Expand Down
41 changes: 0 additions & 41 deletions volumes/miovision/sql/views/create-view-volumes_15min_filtered.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
CREATE VIEW miovision_api.volumes_15min_mvt_filtered AS (
CREATE OR REPLACE VIEW miovision_api.volumes_15min_mvt_filtered AS (
SELECT
v15.volume_15min_mvt_uid,
v15.intersection_uid,
v15.datetime_bin,
v15.classification_uid,
v15.leg,
v15.movement_uid,
v15.volume,
v15.processed
FROM miovision_api.volumes_15min_mvt AS v15
v15.volume
FROM miovision_api.volumes_15min_mvt_unfiltered AS v15
--only include "common" movements
JOIN miovision_api.intersection_movements USING (
intersection_uid, classification_uid, leg, movement_uid
)
--anti join unacceptable_gaps
LEFT JOIN miovision_api.unacceptable_gaps AS un USING (datetime_bin, intersection_uid)
--anti join anomalous_ranges
LEFT JOIN miovision_api.anomalous_ranges AS ar
ON ar.problem_level = ANY(ARRAY['do-not-use', 'questionable'])
ON (ar.problem_level = ANY(ARRAY['do-not-use'::text, 'questionable'::text]))
AND ar.intersection_uid = v15.intersection_uid
AND (
ar.classification_uid = v15.classification_uid
OR ar.classification_uid IS NULL
) AND (
ar.leg = v15.leg
OR ar.leg IS NULL
) AND v15.datetime_bin >= ar.range_start
)
AND v15.datetime_bin >= ar.range_start
AND (
v15.datetime_bin <= ar.range_end
OR ar.range_end IS NULL
)
WHERE ar.uid IS NULL
WHERE
ar.uid IS NULL
AND un.datetime_bin IS NULL
);

COMMENT ON VIEW miovision_api.volumes_15min_mvt_filtered IS E''
'miovision_api.volumes_15min_mvt with anomalous_ranges labeled '
'''do-not-use'' or ''questionable'' filtered out.';
'miovision_api.volumes_15min with anomalous_ranges labeled '
'''do-not-use'' or ''questionable'' filtered out, unacceptable_gaps anti-joined,
and only common (>0.05%) movements (`intersection_movements`) included.';

GRANT SELECT ON TABLE miovision_api.volumes_15min_mvt_filtered TO bdit_humans;
16 changes: 11 additions & 5 deletions volumes/miovision/sql/views/create-view-volumes_15min_tmc.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
CREATE VIEW miovision_api.volumes_15min_tmc AS (
SELECT *
FROM miovision_api.volumes_15min_mvt
WHERE NOT (classification_uid IN (6, 7, 10))
SELECT
intersection_uid,
datetime_bin,
classification_uid,
leg,
movement_uid,
volume
FROM miovision_api.volumes_15min_mvt_filtered
WHERE classification_uid NOT IN (6, 7, 10)
);

ALTER TABLE miovision_api.volumes_15min_tmc
OWNER TO miovision_admins;
OWNER TO miovision_admins;

COMMENT ON VIEW miovision_api.volumes_15min_tmc
IS 'miovision_api.volumes_15min_mvt, but only including turning movement counts.';
IS 'miovision_api.volumes_15min_mvt_filtered, but only including turning movement counts.';

GRANT ALL ON TABLE miovision_api.volumes_15min_tmc TO bdit_humans;
GRANT ALL ON TABLE miovision_api.volumes_15min_tmc TO bdit_bots;
Expand Down

0 comments on commit 88b2654

Please sign in to comment.