diff --git a/ddpui/api/dbt_api.py b/ddpui/api/dbt_api.py index e98e3362..eadd162e 100644 --- a/ddpui/api/dbt_api.py +++ b/ddpui/api/dbt_api.py @@ -292,3 +292,11 @@ def post_refresh_elementary_report(request): raise HttpError(400, error) return result + + +@dbtapi.post("/v1/refresh-elementary-report/", auth=auth.CustomAuthMiddleware()) +@has_permission(["can_view_dbt_workspace"]) +def post_refresh_elementary_report_via_prefect(request): + """prepare the dbt docs single html via prefect deployment""" + orguser: OrgUser = request.orguser + return dbt_service.refresh_elementary_report_via_prefect(orguser) diff --git a/ddpui/api/orgtask_api.py b/ddpui/api/orgtask_api.py index 024231d0..4ec42797 100644 --- a/ddpui/api/orgtask_api.py +++ b/ddpui/api/orgtask_api.py @@ -54,6 +54,7 @@ TRANSFORM_TASKS_SEQ, TASK_GENERATE_EDR, ) +from ddpui.core.orgtaskfunctions import get_edr_send_report_task from ddpui.core.pipelinefunctions import ( setup_dbt_core_task_config, setup_git_pull_shell_task_config, @@ -277,6 +278,15 @@ def post_system_transformation_tasks(request): return {"success": 1} +@orgtaskapi.get("elementary-lock/", auth=auth.CustomAuthMiddleware()) +@has_permission(["can_view_orgtasks"]) +def get_elemetary_task_lock(request): + """Check if the elementary report generation task is underway""" + org = request.orguser.org + org_task = get_edr_send_report_task(org) + return fetch_orgtask_lock(org_task) + + @orgtaskapi.get("transform/", auth=auth.CustomAuthMiddleware()) @has_permission(["can_view_orgtasks"]) def get_prefect_transformation_tasks(request): diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index a25e264d..426282e8 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -9,7 +9,7 @@ import requests import boto3 import boto3.exceptions - +from ninja.errors import HttpError from django.utils.text import slugify from dbt_automation import assets from ddpui.ddpprefect import ( @@ -17,6 +17,8 @@ DBTCLIPROFILE, SECRET, ) +from ddpui.models.org_user import OrgUser +from ddpui.models.tasks import OrgDataFlowv1 from ddpui.models.org import OrgDbt, OrgPrefectBlockv1, OrgWarehouse, TransformType from ddpui.models.org_user import Org from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask @@ -305,3 +307,31 @@ def fetch_elementary_report(org: Org): s3response["LastModified"] ).isoformat(), # e.g. 2024-06-07T06:14:08+05:30 } + + +def refresh_elementary_report_via_prefect(orguser: OrgUser) -> dict: + """refreshes the elementary report for the current date using the prefect deployment""" + org: Org = orguser.org + odf = OrgDataFlowv1.objects.filter( + org=org, name__startswith=f"pipeline-{org.slug}-generate-edr" + ).first() + + if odf is None: + return {"error": "pipeline not found"} + + locks = prefect_service.lock_tasks_for_deployment(odf.deployment_id, orguser) + + try: + res = prefect_service.create_deployment_flow_run(odf.deployment_id) + for tasklock in locks: + tasklock.flow_run_id = res["flow_run_id"] + tasklock.save() + + except Exception as error: + for task_lock in locks: + logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug) + task_lock.delete() + logger.exception(error) + raise HttpError(400, "failed to start a run") from error + + return res diff --git a/ddpui/management/commands/createedrsendreportdataflow.py b/ddpui/management/commands/createedrsendreportdataflow.py index f6dc7a62..34e6807c 100644 --- a/ddpui/management/commands/createedrsendreportdataflow.py +++ b/ddpui/management/commands/createedrsendreportdataflow.py @@ -83,7 +83,7 @@ def handle(self, *args, **options): print( f"creating `{options['schedule']}` OrgDataFlowv1 named {dataflow['deployment']['name']} with deployment_id {dataflow['deployment']['id']}" ) - OrgDataFlowv1.objects.create( + data_flow = OrgDataFlowv1.objects.create( org=org, name=dataflow["deployment"]["name"], deployment_name=dataflow["deployment"]["name"], @@ -91,3 +91,5 @@ def handle(self, *args, **options): dataflow_type="manual", # we dont want it to show in flows/pipelines page cron=options["cron"] if options["schedule"] == "orchestrate" else None, ) + + DataflowOrgTask.objects.create(dataflow=data_flow, orgtask=org_task)