Skip to content

Commit

Permalink
Remove unused library imports, update dbt task (#539)
Browse files Browse the repository at this point in the history
* Remove unused library imports, update dbt task default

* Format isort

* Revert operator default and pass on caller

* Lint errors

* Bump images

* Use exclude package name on dbt tasks

* Reformat code

* Update task_sla key

* Update all task names

* Lint

* Remove logic to append exclude to task name

* Bump stellar-dbt image

* Update dbt image
  • Loading branch information
sydneynotthecity authored Nov 18, 2024
1 parent 00a6e92 commit 33c0ff8
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 48 deletions.
3 changes: 1 addition & 2 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"partnership_assets__account_holders_activity_fact": false,
"partnership_assets__asset_activity_fact": false
},
"dbt_image_name": "stellar/stellar-dbt:96cd862b1",
"dbt_image_name": "stellar/stellar-dbt:34d5542c9",
"dbt_internal_source_db": "test-hubble-319619",
"dbt_internal_source_schema": "test_crypto_stellar_internal",
"dbt_job_execution_timeout_seconds": 300,
Expand Down Expand Up @@ -352,7 +352,6 @@
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"enriched_history_operations_with_exclude": 780,
"fee_stats": 840,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
Expand Down
3 changes: 1 addition & 2 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:96cd862b1",
"dbt_image_name": "stellar/stellar-dbt:34d5542c9",
"dbt_internal_source_db": "hubble-261722",
"dbt_internal_source_schema": "crypto_stellar_internal_2",
"dbt_job_execution_timeout_seconds": 2400,
Expand Down Expand Up @@ -350,7 +350,6 @@
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"enriched_history_operations_with_exclude": 1800,
"fee_stats": 360,
"history_assets": 360,
"liquidity_pool_trade_volume": 1200,
Expand Down
8 changes: 1 addition & 7 deletions dags/dbt_data_quality_alerts_dag.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

init_sentry()

Expand Down
4 changes: 2 additions & 2 deletions dags/dbt_enriched_base_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@

# DBT models to run
enriched_history_operations_task = dbt_task(
dag, tag="enriched_history_operations", excluded="singular_test"
dag, tag="enriched_history_operations", operator="+"
)
current_state_task = dbt_task(dag, tag="current_state")
current_state_task = dbt_task(dag, tag="current_state", operator="+")

# DAG task graph
wait_on_history_table >> enriched_history_operations_task
Expand Down
8 changes: 1 addition & 7 deletions dags/dbt_recency_tests_dag.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

init_sentry()

Expand Down
8 changes: 1 addition & 7 deletions dags/dbt_singular_tests_dag.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

init_sentry()

Expand Down
47 changes: 33 additions & 14 deletions dags/dbt_stellar_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,39 @@
)

# DBT models to run
ohlc_task = dbt_task(dag, tag="ohlc")
liquidity_pool_trade_volume_task = dbt_task(dag, tag="liquidity_pool_trade_volume")
ohlc_task = dbt_task(dag, tag="ohlc", operator="+", excluded="stellar_dbt_public")
liquidity_pool_trade_volume_task = dbt_task(
dag, tag="liquidity_pool_trade_volume", operator="+", excluded="stellar_dbt_public"
)

liquidity_providers_task = dbt_task(dag, tag="liquidity_providers")
liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value")
liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history")
trade_agg_task = dbt_task(dag, tag="trade_agg")
liquidity_providers_task = dbt_task(
dag, tag="liquidity_providers", excluded="stellar_dbt_public"
)
liquidity_pools_values_task = dbt_task(
dag, tag="liquidity_pools_value", operator="+", excluded="stellar_dbt_public"
)
liquidity_pools_value_history_task = dbt_task(
dag,
tag="liquidity_pools_value_history",
operator="+",
excluded="stellar_dbt_public",
)
trade_agg_task = dbt_task(dag, tag="trade_agg", operator="+")
fee_stats_agg_task = dbt_task(dag, tag="fee_stats")
asset_stats_agg_task = dbt_task(dag, tag="asset_stats")
network_stats_agg_task = dbt_task(dag, tag="network_stats")
partnership_assets_task = dbt_task(dag, tag="partnership_assets")
history_assets = dbt_task(dag, tag="history_assets")
soroban = dbt_task(dag, tag="soroban")
snapshot_state = dbt_task(dag, tag="snapshot_state")
asset_stats_agg_task = dbt_task(
dag, tag="asset_stats", operator="+", excluded="stellar_dbt_public"
)
network_stats_agg_task = dbt_task(
dag, tag="network_stats", excluded="stellar_dbt_public"
)
partnership_assets_task = dbt_task(
dag, tag="partnership_assets", operator="+", excluded="stellar_dbt_public"
)
history_assets = dbt_task(dag, tag="history_assets", operator="+")
# Disable soroban tables because they're broken
# soroban = dbt_task(dag, tag="soroban", operator="+")
# Disable snapshot state tables because they're broken
# snapshot_state = dbt_task(dag, tag="snapshot_state")
# Disable releveant_asset_trades due to bugs in SCD tables
# relevant_asset_trades = dbt_task(dag, tag="relevant_asset_trades")

Expand All @@ -62,6 +81,6 @@
wait_on_dbt_enriched_base_tables >> network_stats_agg_task
wait_on_dbt_enriched_base_tables >> partnership_assets_task
wait_on_dbt_enriched_base_tables >> history_assets
wait_on_dbt_enriched_base_tables >> soroban
wait_on_dbt_enriched_base_tables >> snapshot_state
# wait_on_dbt_enriched_base_tables >> soroban
# wait_on_dbt_enriched_base_tables >> snapshot_state
# wait_on_dbt_enriched_base_tables >> relevant_asset_trades
8 changes: 2 additions & 6 deletions dags/dbt_stellar_marts_mgi_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

init_sentry()

Expand Down Expand Up @@ -38,7 +34,7 @@

# DBT models to run

mgi_task = dbt_task(dag, tag="mgi")
mgi_task = dbt_task(dag, tag="mgi", operator="+", excluded="stellar_dbt_public")

wait_on_dbt_enriched_base_tables >> mgi_task
wait_on_partner_pipeline_dag >> mgi_task
1 change: 0 additions & 1 deletion dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def dbt_task(
# --exclude selector added for necessary use cases
# Argument should be string or list of strings
if excluded:
task_name = f"{task_name}_with_exclude"
args.append("--exclude")
if isinstance(excluded, list):
args.append(" ".join(excluded))
Expand Down

0 comments on commit 33c0ff8

Please sign in to comment.