Skip to content

Commit

Permalink
Merge pull request #795 from DalgoT4D/778-edr-report-generation-shoul…
Browse files Browse the repository at this point in the history
…d-always-run-through-prefect

Refresh elementary report through Prefect
  • Loading branch information
fatchat authored Aug 7, 2024
2 parents 9aa18b4 + 6c6f398 commit 88d9b85
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
8 changes: 8 additions & 0 deletions ddpui/api/dbt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,11 @@ def post_refresh_elementary_report(request):
raise HttpError(400, error)

return result


@dbtapi.post("/v1/refresh-elementary-report/", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_dbt_workspace"])
def post_refresh_elementary_report_via_prefect(request):
"""prepare the dbt docs single html via prefect deployment"""
orguser: OrgUser = request.orguser
return dbt_service.refresh_elementary_report_via_prefect(orguser)
10 changes: 10 additions & 0 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
TRANSFORM_TASKS_SEQ,
TASK_GENERATE_EDR,
)
from ddpui.core.orgtaskfunctions import get_edr_send_report_task
from ddpui.core.pipelinefunctions import (
setup_dbt_core_task_config,
setup_git_pull_shell_task_config,
Expand Down Expand Up @@ -277,6 +278,15 @@ def post_system_transformation_tasks(request):
return {"success": 1}


@orgtaskapi.get("elementary-lock/", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_orgtasks"])
def get_elemetary_task_lock(request):
"""Check if the elementary report generation task is underway"""
org = request.orguser.org
org_task = get_edr_send_report_task(org)
return fetch_orgtask_lock(org_task)


@orgtaskapi.get("transform/", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_orgtasks"])
def get_prefect_transformation_tasks(request):
Expand Down
32 changes: 31 additions & 1 deletion ddpui/ddpdbt/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import requests
import boto3
import boto3.exceptions

from ninja.errors import HttpError
from django.utils.text import slugify
from dbt_automation import assets
from ddpui.ddpprefect import (
prefect_service,
DBTCLIPROFILE,
SECRET,
)
from ddpui.models.org_user import OrgUser
from ddpui.models.tasks import OrgDataFlowv1
from ddpui.models.org import OrgDbt, OrgPrefectBlockv1, OrgWarehouse, TransformType
from ddpui.models.org_user import Org
from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask
Expand Down Expand Up @@ -305,3 +307,31 @@ def fetch_elementary_report(org: Org):
s3response["LastModified"]
).isoformat(), # e.g. 2024-06-07T06:14:08+05:30
}


def refresh_elementary_report_via_prefect(orguser: OrgUser) -> dict:
"""refreshes the elementary report for the current date using the prefect deployment"""
org: Org = orguser.org
odf = OrgDataFlowv1.objects.filter(
org=org, name__startswith=f"pipeline-{org.slug}-generate-edr"
).first()

if odf is None:
return {"error": "pipeline not found"}

locks = prefect_service.lock_tasks_for_deployment(odf.deployment_id, orguser)

try:
res = prefect_service.create_deployment_flow_run(odf.deployment_id)
for tasklock in locks:
tasklock.flow_run_id = res["flow_run_id"]
tasklock.save()

except Exception as error:
for task_lock in locks:
logger.info("deleting TaskLock %s", task_lock.orgtask.task.slug)
task_lock.delete()
logger.exception(error)
raise HttpError(400, "failed to start a run") from error

return res
4 changes: 3 additions & 1 deletion ddpui/management/commands/createedrsendreportdataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ def handle(self, *args, **options):
print(
f"creating `{options['schedule']}` OrgDataFlowv1 named {dataflow['deployment']['name']} with deployment_id {dataflow['deployment']['id']}"
)
OrgDataFlowv1.objects.create(
data_flow = OrgDataFlowv1.objects.create(
org=org,
name=dataflow["deployment"]["name"],
deployment_name=dataflow["deployment"]["name"],
deployment_id=dataflow["deployment"]["id"],
dataflow_type="manual", # we dont want it to show in flows/pipelines page
cron=options["cron"] if options["schedule"] == "orchestrate" else None,
)

DataflowOrgTask.objects.create(dataflow=data_flow, orgtask=org_task)

0 comments on commit 88d9b85

Please sign in to comment.