diff --git a/.env.template b/.env.template index a04c2164..9681b38c 100644 --- a/.env.template +++ b/.env.template @@ -99,4 +99,6 @@ ADMIN_USER_PASSWORD="password" DEV_SECRETS_MOUNT= CLIENTS_DBT_MOUNT= -LOGS_MOUNT= \ No newline at end of file +LOGS_MOUNT= + +ADMIN_EMAIL= \ No newline at end of file diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index c6e65d52..e58a72e6 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -91,7 +91,7 @@ jobs: DEV_SECRETS_DIR: /tmp/ CLIENTDBT_ROOT: /tmp run: | - coverage run --omit ddpui/api/superset_api.py,ddpui/api/task_api.py,ddpui/migrations/*.py -m pytest --durations=20 + coverage run --omit ddpui/api/superset_api.py,ddpui/api/task_api.py,ddpui/migrations/*.py,ddpui/celeryworkers/tasks.py -m pytest --durations=20 coverage xml coverage report --fail-under=70 diff --git a/admin.py b/admin.py index edefcf2f..9c8b5024 100644 --- a/admin.py +++ b/admin.py @@ -2,9 +2,6 @@ import streamlit import django import dotenv -import requests -from datetime import datetime, timedelta -import pytz dotenv.load_dotenv(".env") @@ -12,47 +9,54 @@ django.setup() from ddpui.models.org import Org +from ddpui.models.tasks import OrgTask +from ddpui.ddpprefect.prefect_service import get_long_running_flow_runs +from ddpui.utils.helpers import find_key_in_dictionary -def main(): - """main function to run the streamlit app""" - org_to_workspace = Org.objects.values("name", "airbyte_workspace_id") +def show_workspaces(): + """streamlit function to show workspaces""" + org_to_workspace = Org.objects.order_by("name").values("name", "airbyte_workspace_id") streamlit.title("Airbyte workspace URLs") for org in org_to_workspace: org["airbyte_url"] = f"http://localhost:8000/workspaces/{org['airbyte_workspace_id']}" streamlit.markdown(f"[{org['name']}]({org['airbyte_url']})") + +def main(): + """main function to run the streamlit app""" + + show_workspaces() + streamlit.title("Long-running flows") - twohoursago = datetime.now() - timedelta(seconds=2 * 3600) - r = requests.post( - "http://localhost:4200/api/flow_runs/filter", - json={ - "flow_runs": { - "operator": "and_", - "state": { - "operator": "and_", - "type": {"any_": ["RUNNING"]}, - }, - "start_time": {"before_": twohoursago.astimezone(pytz.utc).isoformat()}, - } - }, - timeout=10, - ) - flow_runs = r.json() + flow_runs = get_long_running_flow_runs(2) for flow_run in flow_runs: streamlit.write(flow_run["state_name"]) - if "config" in flow_run["parameters"]: + flow_run_url = "http://localhost:4200/flow-runs/flow-run/" + flow_run["id"] + streamlit.markdown(f"[{flow_run['id']}]({flow_run_url})") - streamlit.write(flow_run["parameters"]["config"]["org_slug"]) + org_slug = find_key_in_dictionary(flow_run["parameters"], "org_slug") + if org_slug: + streamlit.write(org_slug) - streamlit.write([x["slug"] for x in flow_run["parameters"]["config"]["tasks"]]) + tasks = find_key_in_dictionary(flow_run["parameters"], "tasks") + if tasks: + streamlit.write([x["slug"] for x in tasks]) - elif "payload" in flow_run["parameters"]: - streamlit.write(flow_run["parameters"]["payload"]["flow_name"]) + flow_name = find_key_in_dictionary(flow_run["parameters"], "flow_name") + if flow_name: + streamlit.write(flow_name) - else: - streamlit.write(flow_run["parameters"].keys()) + connection_id = find_key_in_dictionary(flow_run["parameters"], "connection_id") + if connection_id: + orgtask = OrgTask.objects.filter(connection_id=connection_id).first() + if orgtask: + streamlit.write(orgtask.org.slug) + connection_url = f"http://localhost:8000/workspaces/{org['airbyte_workspace_id']}/connections/{connection_id}" + streamlit.markdown(f"[{connection_id}]({connection_url})") + else: + streamlit.write(connection_id) streamlit.write("=" * 20) diff --git a/ddpui/api/dbt_api.py b/ddpui/api/dbt_api.py index 199dbbc0..a6656536 100644 --- a/ddpui/api/dbt_api.py +++ b/ddpui/api/dbt_api.py @@ -284,18 +284,6 @@ def post_fetch_elementary_report(request): return result -@dbt_router.post("/refresh-elementary-report/", auth=auth.CustomAuthMiddleware()) -@has_permission(["can_view_dbt_workspace"]) -def post_refresh_elementary_report(request): - """prepare the dbt docs single html""" - orguser: OrgUser = request.orguser - error, result = dbt_service.refresh_elementary_report(orguser.org) - if error: - raise HttpError(400, error) - - return result - - @dbt_router.post("/v1/refresh-elementary-report/", auth=auth.CustomAuthMiddleware()) @has_permission(["can_view_dbt_workspace"]) def post_refresh_elementary_report_via_prefect(request): diff --git a/ddpui/celeryworkers/tasks.py b/ddpui/celeryworkers/tasks.py index c0f4eefd..035b8d85 100644 --- a/ddpui/celeryworkers/tasks.py +++ b/ddpui/celeryworkers/tasks.py @@ -1,3 +1,5 @@ +"""these are tasks which we run through celery""" + import os import shutil from pathlib import Path @@ -13,9 +15,9 @@ from ddpui.models.userpreferences import UserPreferences from ddpui.utils.discord import send_discord_notification from ddpui.utils.sendgrid import send_email_notification, send_schema_changes_email -from ddpui.utils import timezone +from ddpui.utils import timezone, awsses +from ddpui.utils.helpers import find_key_in_dictionary from ddpui.utils.custom_logger import CustomLogger -from ddpui.core.orgtaskfunctions import get_edr_send_report_task from ddpui.models.org import ( Org, OrgDbt, @@ -45,31 +47,28 @@ from ddpui.utils import secretsmanager from ddpui.utils.taskprogress import TaskProgress from ddpui.utils.singletaskprogress import SingleTaskProgress +from ddpui.utils.constants import ( + TASK_DBTRUN, + TASK_DBTCLEAN, + TASK_DBTDEPS, + TASK_AIRBYTESYNC, +) from ddpui.core.orgdbt_manager import DbtProjectManager from ddpui.ddpdbt.schema import DbtProjectParams from ddpui.ddpairbyte import airbyte_service, airbytehelpers from ddpui.ddpprefect.prefect_service import ( get_flow_run_graphs, - get_flow_run_logs, update_dbt_core_block_schema, get_dbt_cli_profile_block, prefect_get, - get_flow_run_logs_v2, recurse_flow_run_logs, -) -from ddpui.utils.constants import ( - TASK_DBTRUN, - TASK_DBTCLEAN, - TASK_DBTDEPS, - TASK_AIRBYTESYNC, - FLOW_RUN_LOGS_OFFSET_LIMIT, + get_long_running_flow_runs, ) from ddpui.ddpprefect import DBTCLIPROFILE from ddpui.datainsights.warehouse.warehouse_factory import WarehouseFactory from ddpui.core import llm_service from ddpui.utils.helpers import ( convert_sqlalchemy_rows_to_csv_string, - convert_sqlalchemy_rows_to_json_string, ) logger = CustomLogger("ddpui") @@ -509,70 +508,6 @@ def get_connection_catalog_task(task_key, org_id, connection_id): return connection_catalog -@app.task(bind=False) -def create_elementary_report(task_key: str, org_id: int, bucket_file_path: str): - """run edr report to create the elementary report and write to s3""" - taskprogress = SingleTaskProgress(task_key, int(os.getenv("EDR_TTL", "180"))) - - edr_binary = Path(os.getenv("DBT_VENV")) / "venv/bin/edr" - org = Org.objects.filter(id=org_id).first() - org_task = get_edr_send_report_task(org, create=True) - - project_dir = Path(DbtProjectManager.get_dbt_project_dir(org.dbt)) - # profiles_dir = project_dir / "elementary_profiles" - aws_access_key_id = os.getenv("ELEMENTARY_AWS_ACCESS_KEY_ID") - aws_secret_access_key = os.getenv("ELEMENTARY_AWS_SECRET_ACCESS_KEY") - s3_bucket_name = os.getenv("ELEMENTARY_S3_BUCKET") - - # cli args based on the edr org task - cli_params = org_task.parameters - cli_options = cli_params.get("options", {}) - - # its a relative path ({"profiles-dir": "elementary_profiles", ...}) since deployment run via shell task in prefect that have a "working_dir" - if "profiles-dir" in cli_options: - profiles_dir = project_dir / cli_options["profiles-dir"] - cli_options["profiles-dir"] = str(profiles_dir) - - if "bucket-file-path" in cli_options: - cli_options["bucket-file-path"] = bucket_file_path - - os.environ["PATH"] += ":" + str(Path(os.getenv("DBT_VENV")) / "venv/bin") - cmd = [ - str(edr_binary), - org_task.get_task_parameters(), - "--aws-access-key-id", - aws_access_key_id, - "--aws-secret-access-key", - aws_secret_access_key, - "--s3-bucket-name", - s3_bucket_name, - ] - taskprogress.add( - { - "message": "started", - "status": "running", - } - ) - try: - runcmd(" ".join(cmd), project_dir) - except subprocess.CalledProcessError: - taskprogress.add( - { - "message": "edr failed", - # "error": str(error), # error contains the aws secrets - "status": "failed", - } - ) - # logger.exception(error) # error contains the aws secrets - return - taskprogress.add( - { - "message": "generated edr report", - "status": "completed", - } - ) - - @app.task(bind=False) def update_dbt_core_block_schema_task(block_name, default_schema): """single http PUT request to the prefect-proxy""" @@ -758,7 +693,6 @@ def summarize_logs( # logs logs_text = "" - log_file_name = "" try: if type == LogsSummarizationType.DEPLOYMENT: all_task = get_flow_run_graphs(flow_run_id) @@ -1024,6 +958,51 @@ def summarize_warehouse_results( return +@app.task(bind=False) +def check_for_long_running_flow_runs(): + """checks for long-running flow runs in prefect""" + flow_runs = get_long_running_flow_runs(2) + + email_body = "" + + for flow_run in flow_runs: + logger.info(f"Found long running flow run {flow_run['id']} in prefect") + + flow_run_url = "http://localhost:4200/flow-runs/flow-run/" + flow_run["id"] + email_body += f"Flow Run ID: {flow_run['id']} \n" + email_body += f"Flow Run URL: {flow_run_url} \n" + + org_slug = find_key_in_dictionary(flow_run["parameters"], "org_slug") + if org_slug: + email_body += f"Org: {org_slug} \n" + + tasks = find_key_in_dictionary(flow_run["parameters"], "tasks") + if tasks: + email_body += f"Tasks: {tasks} \n" + for x in tasks: + email_body += f"- {x['slug']} \n" + + connection_id = find_key_in_dictionary(flow_run["parameters"], "connection_id") + if connection_id: + orgtask = OrgTask.objects.filter(connection_id=connection_id).first() + if orgtask: + email_body += ( + f"Org: {orgtask.org.slug} \n" # might appear above as well, we don't care + ) + connection_url = f"http://localhost:8000/workspaces/{orgtask.org['airbyte_workspace_id']}/connections/{connection_id}" + email_body += f"Connection URL: {connection_url} \n" + else: + email_body += f"Connection ID: {connection_id} \n" + + email_body += "=" * 20 + + awsses.send_text_message( + os.getenv("ADMIN_EMAIL"), + "Long Running Flow Runs", + email_body, + ) + + @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): """check for old locks every minute""" @@ -1039,6 +1018,12 @@ def setup_periodic_tasks(sender, **kwargs): sync_flow_runs_of_deployments.s(), name="sync flow runs of deployments into our db", ) + if os.getenv("ADMIN_EMAIL"): + sender.add_periodic_task( + 3600 * 1.0, + check_for_long_running_flow_runs.s(), + name="check for long-running flow-runs", + ) @app.task(bind=True) diff --git a/ddpui/ddpdbt/dbt_service.py b/ddpui/ddpdbt/dbt_service.py index a0ba332e..8e17896a 100644 --- a/ddpui/ddpdbt/dbt_service.py +++ b/ddpui/ddpdbt/dbt_service.py @@ -33,10 +33,7 @@ ) from ddpui.utils.timezone import as_ist from ddpui.utils.custom_logger import CustomLogger -from ddpui.utils.singletaskprogress import SingleTaskProgress from ddpui.utils.redis_client import RedisClient -from ddpui.models.tasks import TaskProgressHashPrefix -from ddpui.celeryworkers.tasks import create_elementary_report from ddpui.core.orgdbt_manager import DbtProjectManager logger = CustomLogger("ddpui") @@ -233,28 +230,6 @@ def make_edr_report_s3_path(org: Org): return bucket_file_path -def refresh_elementary_report(org: Org): - """refreshes the elementary report for the current date""" - if org.dbt is None: - return "dbt is not configured for this client", None - - project_dir = Path(DbtProjectManager.get_dbt_project_dir(org.dbt)) - - if not os.path.exists(project_dir / "elementary_profiles"): - return "set up elementary profile first", None - - task_str = f"{TaskProgressHashPrefix.RUNELEMENTARY}-{org.slug}" - if SingleTaskProgress.fetch(task_str) is None: - bucket_file_path = make_edr_report_s3_path(org) - logger.info("creating new elementary report at %s", bucket_file_path) - create_elementary_report.delay(task_str, org.id, bucket_file_path) - ttl = int(os.getenv("EDR_TTL", "180")) - else: - logger.info("edr already running for org %s", org.slug) - ttl = SingleTaskProgress.get_ttl(task_str) - return None, {"task_id": task_str, "ttl": ttl} - - def fetch_elementary_report(org: Org): """fetch previously generated elementary report""" if org.dbt is None: diff --git a/ddpui/ddpprefect/prefect_service.py b/ddpui/ddpprefect/prefect_service.py index 9f9a87b5..f7bed75c 100644 --- a/ddpui/ddpprefect/prefect_service.py +++ b/ddpui/ddpprefect/prefect_service.py @@ -1,4 +1,5 @@ import os +from datetime import datetime import requests from ninja.errors import HttpError @@ -6,7 +7,6 @@ from django.db import transaction from django.db.models import Window from django.db.models.functions import RowNumber -from datetime import datetime from ddpui.ddpprefect.schema import ( PrefectDataFlowCreateSchema3, @@ -640,3 +640,9 @@ def recurse_flow_run_logs( break return logs + + +def get_long_running_flow_runs(nhours: int): + """gets long running flow runs from prefect""" + flow_runs = prefect_get(f"flow_runs/long-running/{nhours}") + return flow_runs["flow_runs"] diff --git a/ddpui/tests/utils/test_helpers.py b/ddpui/tests/utils/test_helpers.py index a8729d0f..433f0b65 100644 --- a/ddpui/tests/utils/test_helpers.py +++ b/ddpui/tests/utils/test_helpers.py @@ -10,6 +10,7 @@ update_dict_but_not_stars, nice_bytes, get_schedule_time_for_large_jobs, + find_key_in_dictionary, ) @@ -255,3 +256,11 @@ def test_get_schedule_time_for_large_jobs_5(): now = datetime(2024, 1, 7, 13, 45).astimezone(pytz.utc) r1 = get_schedule_time_for_large_jobs(now, time(12, 30)) assert r1 >= now + + +def test_find_key_in_dictionary(): + """tests find_key_in_dictionary""" + assert find_key_in_dictionary({"a": "b"}, "a") == "b" + assert find_key_in_dictionary({"a": {"b": "c"}}, "b") == "c" + assert find_key_in_dictionary({"a": {"b": {"c": "d"}}}, "c") == "d" + assert find_key_in_dictionary({"a": {"b": {"c": "d"}}}, "d") is None diff --git a/ddpui/utils/helpers.py b/ddpui/utils/helpers.py index 8ad0fad0..72062feb 100644 --- a/ddpui/utils/helpers.py +++ b/ddpui/utils/helpers.py @@ -9,8 +9,6 @@ from decimal import Decimal from datetime import datetime, date, time, timedelta import pytz -import csv -import io def runcmd(cmd: str, cwd: str): @@ -213,3 +211,15 @@ def get_schedule_time_for_large_jobs( next_slot = next_slot.astimezone(pytz.utc) return next_slot + + +def find_key_in_dictionary(dictionary: dict, key): + """Recursively find first occurence of a key in a dictionary and return its value""" + for k, v in dictionary.items(): + if k == key: + return v + if isinstance(v, dict): + val = find_key_in_dictionary(v, key) + if val: + return val + return None