Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1134 add data-sources tag to active DAGs #1135

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dags/assets_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def pull_traffic_signal():
default_args=DEFAULT_ARGS,
max_active_runs=1,
template_searchpath=[os.path.join(AIRFLOW_ROOT, 'assets/rlc/airflow/tasks')],
tags=["bdit_data-sources", "data_pull", "traffic_signals"]
schedule='0 4 * * 1-5')
# minutes past each hour | Hours (0-23) | Days of the month (1-31) | Months (1-12) | Days of the week (0-7, Sunday represented as either/both 0 and 7)

Expand Down
2 changes: 1 addition & 1 deletion dags/bluetooth_check_readers_temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def format_br_list(returned_list):
default_args=default_args,
schedule='0 8 * * *',
catchup=False,
tags=['bluetooth', 'data_checks']
tags=["bdit_data-sources", 'bluetooth', 'data_checks']
)
def blip_pipeline():
## Tasks ##
Expand Down
2 changes: 1 addition & 1 deletion dags/citywide_tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
default_args=default_args,
schedule=None, # gets triggered by HERE dag
doc_md = doc_md,
tags=["HERE"],
tags=["bdit_data-sources", "HERE"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregation too?

catchup=False
)

Expand Down
2 changes: 1 addition & 1 deletion dags/ecocounter_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
schedule='5 10 * * MON', # Run at 10:05 AM on Monday
catchup=True,
template_searchpath=os.path.join(repo_path, 'volumes/ecocounter/data_checks'),
tags=["ecocounter", "data_checks"],
tags=["bdit_data-sources", "ecocounter", "data_checks"],
doc_md=DOC_MD
)
def ecocounter_check_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/eoy_create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 0 1 12 *', # At 00:00 on 1st in December.
tags=["partition_create", "yearly"],
tags=["bdit_data-sources", "partition_create", "yearly"],
catchup=False,
doc_md=__doc__
)
Expand Down
2 changes: 1 addition & 1 deletion dags/gcc_layers_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
dag_id=dag_id,
default_args=default_args,
catchup=False,
tags=['gcc', name],
tags=["bdit_data-sources", 'gcc', name],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some other DAG has yearly and monthly if its not running daily, then I think this should also have quarterly

schedule='0 7 1 */3 *' #'@quarterly'
)
def gcc_layers_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/miovision_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
schedule='0 4 * * MON', # Run at 4 AM on Monday
catchup=False,
template_searchpath=os.path.join(repo_path,'volumes/miovision/sql/data_checks'),
tags=["miovision", "data_checks"],
tags=["bdit_data-sources", "miovision", "data_checks"],
doc_md=DOC_MD
)
def miovision_check_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/miovision_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
default_args=default_args,
schedule='0 2 * * *',
catchup=False,
tags=["miovision", "data_pull"],
tags=["bdit_data-sources", "miovision", "data_pull"],
doc_md=DOC_MD
)
def pull_miovision_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
items={"type": "number"},
)
},
tags=["miovision", "data_pull", "partition_create", "data_checks"],
tags=["bdit_data-sources", "miovision", "data_pull", "partition_create", "data_checks"],
doc_md=DOC_MD
)
def pull_miovision_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/pull_here.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
schedule='0 17 * * * ' ,
catchup=False,
doc_md = doc_md,
tags=["HERE", "data_pull"]
tags=["bdit_data-sources", "HERE", "data_pull"]
)

def pull_here():
Expand Down
2 changes: 1 addition & 1 deletion dags/pull_here_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
schedule='30 10 * * *' ,
catchup=False,
doc_md = doc_md,
tags=["HERE", "data_pull"]
tags=["bdit_data-sources", "HERE", "data_pull"]
)

def pull_here_path():
Expand Down
2 changes: 1 addition & 1 deletion dags/refresh_wys_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def last_month(ds):
with DAG(dag_id = dag_name,
default_args=default_args,
max_active_runs=1,
tags=["wys", "aggregation", "monthly"],
tags=["bdit_data-sources", "wys", "aggregation", "monthly"],
doc_md = DOC_MD,
user_defined_macros={
'last_month' : last_month
Expand Down
2 changes: 1 addition & 1 deletion dags/replicator_table_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#loosely coupled with the two replicator DAGs which are externally triggered at 430am
schedule='0 4 * * *',
doc_md=DOC_MD,
tags=["replicator", "data_checks"]
tags=["bdit_data-sources", "replicator", "data_checks"]
)
def replicator_DAG():

Expand Down
2 changes: 1 addition & 1 deletion dags/replicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def create_replicator_dag(dag_id, short_name, tables_var, conn, doc_md, default_
max_active_tasks=5,
schedule=None, #triggered externally
doc_md=doc_md,
tags=[short_name, "replicator"]
tags=["bdit_data-sources", short_name, "replicator"]
)
def replicator_DAG():
f"""The main function of the {short_name} DAG."""
Expand Down
2 changes: 1 addition & 1 deletion dags/tti_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
default_args=default_args,
schedule=None, # gets triggered by HERE dag
doc_md = doc_md,
tags=["HERE"],
tags=["bdit_data-sources", "HERE"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregation

catchup=False
)

Expand Down
2 changes: 1 addition & 1 deletion dags/vds_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
schedule='0 5 * * FRI', # Run at 5 AM on Friday
catchup=False,
template_searchpath=os.path.join(repo_path,'volumes/vds/sql/select'),
tags=["vds", "data_checks"],
tags=["bdit_data-sources", "vds", "data_checks"],
doc_md=DOC_MD
)
def vds_check_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/vds_pull_vdsdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
os.path.join(repo_path,'dags/sql')
],
doc_md=DOC_MD,
tags=['vds', 'vdsdata', 'data_checks', 'pull', 'detector_inventory'],
tags=["bdit_data-sources", 'vds', 'vdsdata', 'data_checks', 'pull', 'detector_inventory'],
schedule='0 4 * * *' #daily at 4am
)
def vdsdata_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/vds_pull_vdsvehicledata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
os.path.join(repo_path,'dags/sql')
],
doc_md=DOC_MD,
tags=['vds', 'vdsvehicledata', 'data_checks', 'pull'],
tags=["bdit_data-sources", 'vds', 'vdsvehicledata', 'data_checks', 'pull'],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we doing data_pull or pull ?

schedule='5 4 * * *' #daily at 4:05am
)
def vdsvehicledata_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/vz_google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
items={"type": "number"},
)
},
tags=["Vision Zero", "google_sheets"]
tags=["bdit_data-sources", "Vision Zero", "google_sheets"]
)
def get_vz_data():
"""The main function of the SSZ DAG."""
Expand Down
2 changes: 1 addition & 1 deletion dags/weather_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
default_args=default_args,
schedule='0 8 * * *', #Historical weather is available at 1000UTC which is 6AM EDT and 5AM EST: https://climate.weather.gc.ca/FAQ_e.html#Q17
catchup=False,
tags=['weather', 'data_pull'],
tags=["bdit_data-sources", 'weather', 'data_pull'],
template_searchpath=os.path.join(repo_path, 'weather/sql'),
doc_md=__doc__
)
Expand Down
2 changes: 1 addition & 1 deletion dags/wys_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
max_active_runs=1,
template_searchpath=os.path.join(repo_path,'dags/sql'),
schedule='0 18 * * *', # Run at 6:00 PM local time every day
tags=["wys", "data_checks"],
tags=["bdit_data-sources", "wys", "data_checks"],
doc_md=DOC_MD
)
def wys_check_dag():
Expand Down
2 changes: 1 addition & 1 deletion dags/wys_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
max_active_runs=5,
template_searchpath=os.path.join(repo_path,'dags/sql'),
schedule='0 17 * * *', # Run at 5:00 PM local time every day
tags=["wys", "data_pull", "partition_create", "data_checks", "google_sheets"],
tags=["bdit_data-sources", "wys", "data_pull", "partition_create", "data_checks", "google_sheets"],
doc_md=DOC_MD
)
def pull_wys_dag():
Expand Down
Loading