Skip to content

Commit

Permalink
Merge pull request #890 from DalgoT4D/get_long_running_flow_runs
Browse files Browse the repository at this point in the history
moved get_long_running_flow_runs out of admin.py
  • Loading branch information
fatchat authored Nov 19, 2024
2 parents e1c2f99 + dfba9c1 commit 7e99e82
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 148 deletions.
4 changes: 3 additions & 1 deletion .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,6 @@ ADMIN_USER_PASSWORD="password"

DEV_SECRETS_MOUNT=
CLIENTS_DBT_MOUNT=
LOGS_MOUNT=
LOGS_MOUNT=

ADMIN_EMAIL=
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
DEV_SECRETS_DIR: /tmp/
CLIENTDBT_ROOT: /tmp
run: |
coverage run --omit ddpui/api/superset_api.py,ddpui/api/task_api.py,ddpui/migrations/*.py -m pytest --durations=20
coverage run --omit ddpui/api/superset_api.py,ddpui/api/task_api.py,ddpui/migrations/*.py,ddpui/celeryworkers/tasks.py -m pytest --durations=20
coverage xml
coverage report --fail-under=70
Expand Down
62 changes: 33 additions & 29 deletions admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,61 @@
import streamlit
import django
import dotenv
import requests
from datetime import datetime, timedelta
import pytz

dotenv.load_dotenv(".env")

os.environ["DJANGO_SETTINGS_MODULE"] = "ddpui.settings"
django.setup()

from ddpui.models.org import Org
from ddpui.models.tasks import OrgTask
from ddpui.ddpprefect.prefect_service import get_long_running_flow_runs
from ddpui.utils.helpers import find_key_in_dictionary


def main():
"""main function to run the streamlit app"""
org_to_workspace = Org.objects.values("name", "airbyte_workspace_id")
def show_workspaces():
"""streamlit function to show workspaces"""
org_to_workspace = Org.objects.order_by("name").values("name", "airbyte_workspace_id")
streamlit.title("Airbyte workspace URLs")
for org in org_to_workspace:
org["airbyte_url"] = f"http://localhost:8000/workspaces/{org['airbyte_workspace_id']}"
streamlit.markdown(f"[{org['name']}]({org['airbyte_url']})")


def main():
"""main function to run the streamlit app"""

show_workspaces()

streamlit.title("Long-running flows")
twohoursago = datetime.now() - timedelta(seconds=2 * 3600)
r = requests.post(
"http://localhost:4200/api/flow_runs/filter",
json={
"flow_runs": {
"operator": "and_",
"state": {
"operator": "and_",
"type": {"any_": ["RUNNING"]},
},
"start_time": {"before_": twohoursago.astimezone(pytz.utc).isoformat()},
}
},
timeout=10,
)
flow_runs = r.json()
flow_runs = get_long_running_flow_runs(2)
for flow_run in flow_runs:
streamlit.write(flow_run["state_name"])

if "config" in flow_run["parameters"]:
flow_run_url = "http://localhost:4200/flow-runs/flow-run/" + flow_run["id"]
streamlit.markdown(f"[{flow_run['id']}]({flow_run_url})")

streamlit.write(flow_run["parameters"]["config"]["org_slug"])
org_slug = find_key_in_dictionary(flow_run["parameters"], "org_slug")
if org_slug:
streamlit.write(org_slug)

streamlit.write([x["slug"] for x in flow_run["parameters"]["config"]["tasks"]])
tasks = find_key_in_dictionary(flow_run["parameters"], "tasks")
if tasks:
streamlit.write([x["slug"] for x in tasks])

elif "payload" in flow_run["parameters"]:
streamlit.write(flow_run["parameters"]["payload"]["flow_name"])
flow_name = find_key_in_dictionary(flow_run["parameters"], "flow_name")
if flow_name:
streamlit.write(flow_name)

else:
streamlit.write(flow_run["parameters"].keys())
connection_id = find_key_in_dictionary(flow_run["parameters"], "connection_id")
if connection_id:
orgtask = OrgTask.objects.filter(connection_id=connection_id).first()
if orgtask:
streamlit.write(orgtask.org.slug)
connection_url = f"http://localhost:8000/workspaces/{org['airbyte_workspace_id']}/connections/{connection_id}"
streamlit.markdown(f"[{connection_id}]({connection_url})")
else:
streamlit.write(connection_id)

streamlit.write("=" * 20)

Expand Down
12 changes: 0 additions & 12 deletions ddpui/api/dbt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,6 @@ def post_fetch_elementary_report(request):
return result


@dbt_router.post("/refresh-elementary-report/", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_dbt_workspace"])
def post_refresh_elementary_report(request):
"""prepare the dbt docs single html"""
orguser: OrgUser = request.orguser
error, result = dbt_service.refresh_elementary_report(orguser.org)
if error:
raise HttpError(400, error)

return result


@dbt_router.post("/v1/refresh-elementary-report/", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_dbt_workspace"])
def post_refresh_elementary_report_via_prefect(request):
Expand Down
139 changes: 62 additions & 77 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""these are tasks which we run through celery"""

import os
import shutil
from pathlib import Path
Expand All @@ -13,9 +15,9 @@
from ddpui.models.userpreferences import UserPreferences
from ddpui.utils.discord import send_discord_notification
from ddpui.utils.sendgrid import send_email_notification, send_schema_changes_email
from ddpui.utils import timezone
from ddpui.utils import timezone, awsses
from ddpui.utils.helpers import find_key_in_dictionary
from ddpui.utils.custom_logger import CustomLogger
from ddpui.core.orgtaskfunctions import get_edr_send_report_task
from ddpui.models.org import (
Org,
OrgDbt,
Expand Down Expand Up @@ -45,31 +47,28 @@
from ddpui.utils import secretsmanager
from ddpui.utils.taskprogress import TaskProgress
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.utils.constants import (
TASK_DBTRUN,
TASK_DBTCLEAN,
TASK_DBTDEPS,
TASK_AIRBYTESYNC,
)
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpprefect.prefect_service import (
get_flow_run_graphs,
get_flow_run_logs,
update_dbt_core_block_schema,
get_dbt_cli_profile_block,
prefect_get,
get_flow_run_logs_v2,
recurse_flow_run_logs,
)
from ddpui.utils.constants import (
TASK_DBTRUN,
TASK_DBTCLEAN,
TASK_DBTDEPS,
TASK_AIRBYTESYNC,
FLOW_RUN_LOGS_OFFSET_LIMIT,
get_long_running_flow_runs,
)
from ddpui.ddpprefect import DBTCLIPROFILE
from ddpui.datainsights.warehouse.warehouse_factory import WarehouseFactory
from ddpui.core import llm_service
from ddpui.utils.helpers import (
convert_sqlalchemy_rows_to_csv_string,
convert_sqlalchemy_rows_to_json_string,
)

logger = CustomLogger("ddpui")
Expand Down Expand Up @@ -509,70 +508,6 @@ def get_connection_catalog_task(task_key, org_id, connection_id):
return connection_catalog


@app.task(bind=False)
def create_elementary_report(task_key: str, org_id: int, bucket_file_path: str):
"""run edr report to create the elementary report and write to s3"""
taskprogress = SingleTaskProgress(task_key, int(os.getenv("EDR_TTL", "180")))

edr_binary = Path(os.getenv("DBT_VENV")) / "venv/bin/edr"
org = Org.objects.filter(id=org_id).first()
org_task = get_edr_send_report_task(org, create=True)

project_dir = Path(DbtProjectManager.get_dbt_project_dir(org.dbt))
# profiles_dir = project_dir / "elementary_profiles"
aws_access_key_id = os.getenv("ELEMENTARY_AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("ELEMENTARY_AWS_SECRET_ACCESS_KEY")
s3_bucket_name = os.getenv("ELEMENTARY_S3_BUCKET")

# cli args based on the edr org task
cli_params = org_task.parameters
cli_options = cli_params.get("options", {})

# its a relative path ({"profiles-dir": "elementary_profiles", ...}) since deployment run via shell task in prefect that have a "working_dir"
if "profiles-dir" in cli_options:
profiles_dir = project_dir / cli_options["profiles-dir"]
cli_options["profiles-dir"] = str(profiles_dir)

if "bucket-file-path" in cli_options:
cli_options["bucket-file-path"] = bucket_file_path

os.environ["PATH"] += ":" + str(Path(os.getenv("DBT_VENV")) / "venv/bin")
cmd = [
str(edr_binary),
org_task.get_task_parameters(),
"--aws-access-key-id",
aws_access_key_id,
"--aws-secret-access-key",
aws_secret_access_key,
"--s3-bucket-name",
s3_bucket_name,
]
taskprogress.add(
{
"message": "started",
"status": "running",
}
)
try:
runcmd(" ".join(cmd), project_dir)
except subprocess.CalledProcessError:
taskprogress.add(
{
"message": "edr failed",
# "error": str(error), # error contains the aws secrets
"status": "failed",
}
)
# logger.exception(error) # error contains the aws secrets
return
taskprogress.add(
{
"message": "generated edr report",
"status": "completed",
}
)


@app.task(bind=False)
def update_dbt_core_block_schema_task(block_name, default_schema):
"""single http PUT request to the prefect-proxy"""
Expand Down Expand Up @@ -758,7 +693,6 @@ def summarize_logs(

# logs
logs_text = ""
log_file_name = ""
try:
if type == LogsSummarizationType.DEPLOYMENT:
all_task = get_flow_run_graphs(flow_run_id)
Expand Down Expand Up @@ -1024,6 +958,51 @@ def summarize_warehouse_results(
return


@app.task(bind=False)
def check_for_long_running_flow_runs():
"""checks for long-running flow runs in prefect"""
flow_runs = get_long_running_flow_runs(2)

email_body = ""

for flow_run in flow_runs:
logger.info(f"Found long running flow run {flow_run['id']} in prefect")

flow_run_url = "http://localhost:4200/flow-runs/flow-run/" + flow_run["id"]
email_body += f"Flow Run ID: {flow_run['id']} \n"
email_body += f"Flow Run URL: {flow_run_url} \n"

org_slug = find_key_in_dictionary(flow_run["parameters"], "org_slug")
if org_slug:
email_body += f"Org: {org_slug} \n"

tasks = find_key_in_dictionary(flow_run["parameters"], "tasks")
if tasks:
email_body += f"Tasks: {tasks} \n"
for x in tasks:
email_body += f"- {x['slug']} \n"

connection_id = find_key_in_dictionary(flow_run["parameters"], "connection_id")
if connection_id:
orgtask = OrgTask.objects.filter(connection_id=connection_id).first()
if orgtask:
email_body += (
f"Org: {orgtask.org.slug} \n" # might appear above as well, we don't care
)
connection_url = f"http://localhost:8000/workspaces/{orgtask.org['airbyte_workspace_id']}/connections/{connection_id}"
email_body += f"Connection URL: {connection_url} \n"
else:
email_body += f"Connection ID: {connection_id} \n"

email_body += "=" * 20

awsses.send_text_message(
os.getenv("ADMIN_EMAIL"),
"Long Running Flow Runs",
email_body,
)


@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""check for old locks every minute"""
Expand All @@ -1039,6 +1018,12 @@ def setup_periodic_tasks(sender, **kwargs):
sync_flow_runs_of_deployments.s(),
name="sync flow runs of deployments into our db",
)
if os.getenv("ADMIN_EMAIL"):
sender.add_periodic_task(
3600 * 1.0,
check_for_long_running_flow_runs.s(),
name="check for long-running flow-runs",
)


@app.task(bind=True)
Expand Down
25 changes: 0 additions & 25 deletions ddpui/ddpdbt/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@
)
from ddpui.utils.timezone import as_ist
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.utils.redis_client import RedisClient
from ddpui.models.tasks import TaskProgressHashPrefix
from ddpui.celeryworkers.tasks import create_elementary_report
from ddpui.core.orgdbt_manager import DbtProjectManager

logger = CustomLogger("ddpui")
Expand Down Expand Up @@ -233,28 +230,6 @@ def make_edr_report_s3_path(org: Org):
return bucket_file_path


def refresh_elementary_report(org: Org):
"""refreshes the elementary report for the current date"""
if org.dbt is None:
return "dbt is not configured for this client", None

project_dir = Path(DbtProjectManager.get_dbt_project_dir(org.dbt))

if not os.path.exists(project_dir / "elementary_profiles"):
return "set up elementary profile first", None

task_str = f"{TaskProgressHashPrefix.RUNELEMENTARY}-{org.slug}"
if SingleTaskProgress.fetch(task_str) is None:
bucket_file_path = make_edr_report_s3_path(org)
logger.info("creating new elementary report at %s", bucket_file_path)
create_elementary_report.delay(task_str, org.id, bucket_file_path)
ttl = int(os.getenv("EDR_TTL", "180"))
else:
logger.info("edr already running for org %s", org.slug)
ttl = SingleTaskProgress.get_ttl(task_str)
return None, {"task_id": task_str, "ttl": ttl}


def fetch_elementary_report(org: Org):
"""fetch previously generated elementary report"""
if org.dbt is None:
Expand Down
8 changes: 7 additions & 1 deletion ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os
from datetime import datetime
import requests

from ninja.errors import HttpError
from dotenv import load_dotenv
from django.db import transaction
from django.db.models import Window
from django.db.models.functions import RowNumber
from datetime import datetime

from ddpui.ddpprefect.schema import (
PrefectDataFlowCreateSchema3,
Expand Down Expand Up @@ -640,3 +640,9 @@ def recurse_flow_run_logs(
break

return logs


def get_long_running_flow_runs(nhours: int):
"""gets long running flow runs from prefect"""
flow_runs = prefect_get(f"flow_runs/long-running/{nhours}")
return flow_runs["flow_runs"]
Loading

0 comments on commit 7e99e82

Please sign in to comment.