From acec9ea10ab7006645c63dd7b9f2df4b5e764aab Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Mon, 29 Jul 2024 22:52:34 +0530 Subject: [PATCH 1/3] wrote the refresh_elementary_report_via_prefect function --- ddpui/ddpdbt/dbt_service.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index a25e264d..1333a492 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -19,7 +19,7 @@ ) from ddpui.models.org import OrgDbt, OrgPrefectBlockv1, OrgWarehouse, TransformType from ddpui.models.org_user import Org -from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask +from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask, OrgDataFlowv1 from ddpui.models.dbt_workflow import OrgDbtModel from ddpui.utils import secretsmanager from ddpui.utils.timezone import as_ist @@ -252,6 +252,16 @@ def refresh_elementary_report(org: Org): return None, {"task_id": task_str, "ttl": ttl} +def refresh_elementary_report_via_prefect(org: Org) -> dict: + """refreshes the elementary report for the current date using the prefect deployment""" + odf = OrgDataFlowv1.objects.filter( + org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" + ).first() + if odf: + return prefect_service.create_deployment_flow_run(odf.deployment_id) + return {"error": "pipeline not found"} + + def fetch_elementary_report(org: Org): """fetch previously generated elementary report""" if org.dbt is None: From 0011369abcc478b6c7170b4eddba1ad47ff1b7c5 Mon Sep 17 00:00:00 2001 From: mdshamoon Date: Tue, 6 Aug 2024 11:50:29 +0530 Subject: [PATCH 2/3] added task for deployment --- ddpui/api/dbt_api.py | 32 ++++++++++++++++++++++++++++++++ ddpui/ddpdbt/dbt_service.py | 12 +----------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/ddpui/api/dbt_api.py b/ddpui/api/dbt_api.py index e98e3362..cf1e7202 100644 --- a/ddpui/api/dbt_api.py +++ b/ddpui/api/dbt_api.py @@ -14,6 +14,7 @@ run_dbt_commands, setup_dbtworkspace, ) +from ddpui.models.tasks import OrgDataFlowv1 from ddpui.models.tasks import TaskProgressHashPrefix from ddpui.utils.taskprogress import TaskProgress from ddpui.ddpdbt import dbt_service @@ -292,3 +293,34 @@ def post_refresh_elementary_report(request): raise HttpError(400, error) return result + + +@dbtapi.post("/v1/refresh-elementary-report/", auth=auth.CustomAuthMiddleware()) +@has_permission(["can_view_dbt_workspace"]) +def post_refresh_elementary_report_via_prefect(request): + """prepare the dbt docs single html via prefect deployment""" + + orguser: OrgUser = request.orguser + org: Org = orguser.org + odf = OrgDataFlowv1.objects.filter( + org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" + ).first() + + if odf is None: + return {"error": "pipeline not found"} + + locks = prefect_service.lock_tasks_for_deployment(odf.deployment_id, orguser) + try: + res = prefect_service.create_deployment_flow_run(odf.deployment_id) + for tasklock in locks: + tasklock.flow_run_id = res["flow_run_id"] + tasklock.save() + + except Exception as error: + for task_lock in locks: + logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug) + task_lock.delete() + logger.exception(error) + raise HttpError(400, "failed to start a run") from error + + return res diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index 1333a492..a25e264d 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -19,7 +19,7 @@ ) from ddpui.models.org import OrgDbt, OrgPrefectBlockv1, OrgWarehouse, TransformType from ddpui.models.org_user import Org -from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask, OrgDataFlowv1 +from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask from ddpui.models.dbt_workflow import OrgDbtModel from ddpui.utils import secretsmanager from ddpui.utils.timezone import as_ist @@ -252,16 +252,6 @@ def refresh_elementary_report(org: Org): return None, {"task_id": task_str, "ttl": ttl} -def refresh_elementary_report_via_prefect(org: Org) -> dict: - """refreshes the elementary report for the current date using the prefect deployment""" - odf = OrgDataFlowv1.objects.filter( - org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" - ).first() - if odf: - return prefect_service.create_deployment_flow_run(odf.deployment_id) - return {"error": "pipeline not found"} - - def fetch_elementary_report(org: Org): """fetch previously generated elementary report""" if org.dbt is None: From 6c6f39855a966de94b58f3a51f79f0ab75876709 Mon Sep 17 00:00:00 2001 From: mdshamoon Date: Wed, 7 Aug 2024 13:23:20 +0530 Subject: [PATCH 3/3] run elementary through prefect --- ddpui/api/dbt_api.py | 26 +-------------- ddpui/api/orgtask_api.py | 10 ++++++ ddpui/ddpdbt/dbt_service.py | 32 ++++++++++++++++++- .../commands/createedrsendreportdataflow.py | 4 ++- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/ddpui/api/dbt_api.py b/ddpui/api/dbt_api.py index cf1e7202..eadd162e 100644 --- a/ddpui/api/dbt_api.py +++ b/ddpui/api/dbt_api.py @@ -14,7 +14,6 @@ run_dbt_commands, setup_dbtworkspace, ) -from ddpui.models.tasks import OrgDataFlowv1 from ddpui.models.tasks import TaskProgressHashPrefix from ddpui.utils.taskprogress import TaskProgress from ddpui.ddpdbt import dbt_service @@ -299,28 +298,5 @@ def post_refresh_elementary_report(request): @has_permission(["can_view_dbt_workspace"]) def post_refresh_elementary_report_via_prefect(request): """prepare the dbt docs single html via prefect deployment""" - orguser: OrgUser = request.orguser - org: Org = orguser.org - odf = OrgDataFlowv1.objects.filter( - org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" - ).first() - - if odf is None: - return {"error": "pipeline not found"} - - locks = prefect_service.lock_tasks_for_deployment(odf.deployment_id, orguser) - try: - res = prefect_service.create_deployment_flow_run(odf.deployment_id) - for tasklock in locks: - tasklock.flow_run_id = res["flow_run_id"] - tasklock.save() - - except Exception as error: - for task_lock in locks: - logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug) - task_lock.delete() - logger.exception(error) - raise HttpError(400, "failed to start a run") from error - - return res + return dbt_service.refresh_elementary_report_via_prefect(orguser) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 024231d0..4ec42797 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -54,6 +54,7 @@ TRANSFORM_TASKS_SEQ, TASK_GENERATE_EDR, ) +from ddpui.core.orgtaskfunctions import get_edr_send_report_task from ddpui.core.pipelinefunctions import ( setup_dbt_core_task_config, setup_git_pull_shell_task_config, @@ -277,6 +278,15 @@ def post_system_transformation_tasks(request): return {"success": 1} +@orgtaskapi.get("elementary-lock/", auth=auth.CustomAuthMiddleware()) +@has_permission(["can_view_orgtasks"]) +def get_elemetary_task_lock(request): + """Check if the elementary report generation task is underway""" + org = request.orguser.org + org_task = get_edr_send_report_task(org) + return fetch_orgtask_lock(org_task) + + @orgtaskapi.get("transform/", auth=auth.CustomAuthMiddleware()) @has_permission(["can_view_orgtasks"]) def get_prefect_transformation_tasks(request): diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index a25e264d..426282e8 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -9,7 +9,7 @@ import requests import boto3 import boto3.exceptions - +from ninja.errors import HttpError from django.utils.text import slugify from dbt_automation import assets from ddpui.ddpprefect import ( @@ -17,6 +17,8 @@ DBTCLIPROFILE, SECRET, ) +from ddpui.models.org_user import OrgUser +from ddpui.models.tasks import OrgDataFlowv1 from ddpui.models.org import OrgDbt, OrgPrefectBlockv1, OrgWarehouse, TransformType from ddpui.models.org_user import Org from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask @@ -305,3 +307,31 @@ def fetch_elementary_report(org: Org): s3response["LastModified"] ).isoformat(), # e.g. 2024-06-07T06:14:08+05:30 } + + +def refresh_elementary_report_via_prefect(orguser: OrgUser) -> dict: + """refreshes the elementary report for the current date using the prefect deployment""" + org: Org = orguser.org + odf = OrgDataFlowv1.objects.filter( + org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" + ).first() + + if odf is None: + return {"error": "pipeline not found"} + + locks = prefect_service.lock_tasks_for_deployment(odf.deployment_id, orguser) + + try: + res = prefect_service.create_deployment_flow_run(odf.deployment_id) + for tasklock in locks: + tasklock.flow_run_id = res["flow_run_id"] + tasklock.save() + + except Exception as error: + for task_lock in locks: + logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug) + task_lock.delete() + logger.exception(error) + raise HttpError(400, "failed to start a run") from error + + return res diff --git a/ddpui/management/commands/createedrsendreportdataflow.py b/ddpui/management/commands/createedrsendreportdataflow.py index f6dc7a62..34e6807c 100644 --- a/ddpui/management/commands/createedrsendreportdataflow.py +++ b/ddpui/management/commands/createedrsendreportdataflow.py @@ -83,7 +83,7 @@ def handle(self, *args, **options): print( f"creating `{options['schedule']}` OrgDataFlowv1 named {dataflow['deployment']['name']} with deployment_id {dataflow['deployment']['id']}" ) - OrgDataFlowv1.objects.create( + data_flow = OrgDataFlowv1.objects.create( org=org, name=dataflow["deployment"]["name"], deployment_name=dataflow["deployment"]["name"], @@ -91,3 +91,5 @@ def handle(self, *args, **options): dataflow_type="manual", # we dont want it to show in flows/pipelines page cron=options["cron"] if options["schedule"] == "orchestrate" else None, ) + + DataflowOrgTask.objects.create(dataflow=data_flow, orgtask=org_task)