Skip to content

Commit

Permalink
Merge pull request #880 from DalgoT4D/making-project-dir-dbt-venv-rel…
Browse files Browse the repository at this point in the history
…ative

making orgdbt's project_dir and dbt_venv relative
  • Loading branch information
Ishankoradia authored Nov 4, 2024
2 parents b7b3aba + be9d06d commit f4b64bf
Show file tree
Hide file tree
Showing 20 changed files with 313 additions and 144 deletions.
17 changes: 10 additions & 7 deletions ddpui/api/dbt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ddpui.ddpprefect.schema import OrgDbtGitHub, OrgDbtSchema, OrgDbtTarget
from ddpui.models.org import OrgPrefectBlockv1, Org
from ddpui.models.org_user import OrgUser, OrgUserResponse
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.utils.custom_logger import CustomLogger
from ddpui.utils.dbtdocs import create_single_html
from ddpui.utils.helpers import runcmd
Expand Down Expand Up @@ -69,13 +70,13 @@ def put_dbt_github(request, payload: OrgDbtGitHub):
org.dbt.gitrepo_access_token_secret = payload.gitrepoAccessToken
org.dbt.save()

project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / org.slug
org_dir = DbtProjectManager.get_org_dir(org)

task = clone_github_repo.delay(
org.slug,
org.dbt.gitrepo_url,
org.dbt.gitrepo_access_token_secret,
str(project_dir),
org_dir,
None,
)

Expand Down Expand Up @@ -115,10 +116,11 @@ def get_dbt_workspace(request):
def post_dbt_git_pull(request):
"""Pull the dbt repo from github for the organization"""
orguser: OrgUser = request.orguser
if orguser.org.dbt is None:
orgdbt = orguser.org.dbt
if orgdbt is None:
raise HttpError(400, "dbt is not configured for this client")

project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / orguser.org.slug
project_dir = Path(DbtProjectManager.get_dbt_project_dir(orgdbt))
if not os.path.exists(project_dir):
raise HttpError(400, "create the dbt env first")

Expand All @@ -134,11 +136,12 @@ def post_dbt_git_pull(request):
@has_permission(["can_create_dbt_docs"])
def post_dbt_makedocs(request):
"""prepare the dbt docs single html"""
orguser = request.orguser
if orguser.org.dbt is None:
orguser: OrgUser = request.orguser
orgdbt = orguser.org.dbt
if orgdbt is None:
raise HttpError(400, "dbt is not configured for this client")

project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / orguser.org.slug
project_dir = Path(DbtProjectManager.get_dbt_project_dir(orgdbt))
if not os.path.exists(project_dir):
raise HttpError(400, "create the dbt env first")

Expand Down
31 changes: 15 additions & 16 deletions ddpui/api/orgtask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
from ddpui.ddpprefect.schema import (
PrefectSecretBlockCreate,
)
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.schemas.org_task_schema import CreateOrgTaskPayload, TaskParameters
from ddpui.core.dbtfunctions import gather_dbt_project_params

# from ddpui.core.dbtfunctions import DbtProjectManager.gather_dbt_project_params
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.core.orgtaskfunctions import (
create_default_transform_tasks,
create_prefect_deployment_for_dbtcore_task,
Expand Down Expand Up @@ -68,7 +71,8 @@
def post_orgtask(request, payload: CreateOrgTaskPayload):
"""Create a custom client org task (dbt or git). If base task is dbt run create a deployment"""
orguser: OrgUser = request.orguser
if orguser.org.dbt is None:
orgdbt = orguser.org.dbt
if orgdbt is None:
raise HttpError(400, "create a dbt workspace first")

task = Task.objects.filter(slug=payload.task_slug).first()
Expand All @@ -94,9 +98,9 @@ def post_orgtask(request, payload: CreateOrgTaskPayload):

dataflow = None
if task.slug in LONG_RUNNING_TASKS:
dbt_project_params, error = gather_dbt_project_params(orguser.org)
if error:
raise HttpError(400, error)
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(
Expand Down Expand Up @@ -319,10 +323,13 @@ def post_run_prefect_org_task(
if org_task.task.type not in ["dbt", "git", "edr"]:
raise HttpError(400, "task not supported")

if orguser.org.dbt is None:
orgdbt = orguser.org.dbt
if orgdbt is None:
raise HttpError(400, "dbt is not configured for this client")

dbt_project_params, error = gather_dbt_project_params(orguser.org)
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

# check if the task is locked
task_lock = TaskLock.objects.filter(orgtask=org_task).first()
Expand Down Expand Up @@ -357,12 +364,8 @@ def post_run_prefect_org_task(
raise HttpError(400, f"failed to run the shell task {org_task.task.slug}") from error

elif org_task.task.slug == TASK_GENERATE_EDR:
dbt_env_dir = Path(orguser.org.dbt.dbt_venv)
if not dbt_env_dir.exists():
raise HttpError(400, "create the dbt env first")

task_config = setup_edr_send_report_task_config(
org_task, dbt_project_params.project_dir, dbt_env_dir
org_task, dbt_project_params.project_dir, dbt_project_params.venv_binary
)

if task_config.flow_name is None:
Expand All @@ -379,10 +382,6 @@ def post_run_prefect_org_task(
raise HttpError(400, f"failed to run the shell task {org_task.task.slug}") from error

else:
dbt_env_dir = Path(orguser.org.dbt.dbt_venv)
if not dbt_env_dir.exists():
raise HttpError(400, "create the dbt env first")

# fetch the cli profile block
cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
Expand Down
23 changes: 14 additions & 9 deletions ddpui/api/pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ddpui.utils.constants import TASK_DBTRUN, TASK_AIRBYTESYNC
from ddpui.utils.custom_logger import CustomLogger
from ddpui.schemas.org_task_schema import TaskParameters
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.utils.prefectlogs import parse_prefect_logs
from ddpui.utils.helpers import generate_hash_id
from ddpui.core.pipelinefunctions import (
Expand All @@ -33,7 +34,7 @@
lock_tasks_for_dataflow,
)
from ddpui.celeryworkers.tasks import summarize_logs
from ddpui.core.dbtfunctions import gather_dbt_project_params
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.auth import has_permission
from ddpui.models.tasks import TaskLock

Expand Down Expand Up @@ -98,15 +99,14 @@ def post_prefect_dataflow_v1(request, payload: PrefectDataFlowCreateSchema4):
logger.info(f"Pipline has {len(sync_orgtasks)} airbyte syncs")

# push dbt pipeline orgtasks
dbt_project_params = None
dbt_project_params: DbtProjectParams = None
dbt_git_orgtasks = []
orgdbt = orguser.org.dbt
if payload.transformTasks and len(payload.transformTasks) > 0:
logger.info("Dbt tasks being pushed to the pipeline")

# dbt params
dbt_project_params, error = gather_dbt_project_params(orguser.org)
if error:
raise HttpError(400, error)
dbt_project_params = DbtProjectManager.gather_dbt_project_params(orguser.org, orgdbt)

# dbt cli profile block
cli_block = OrgPrefectBlockv1.objects.filter(
Expand Down Expand Up @@ -400,13 +400,14 @@ def put_prefect_dataflow_v1(request, deployment_id, payload: PrefectDataFlowUpda
# push dbt pipeline orgtasks
dbt_project_params = None
dbt_git_orgtasks = []
orgdbt = orguser.org.dbt
if payload.transformTasks and len(payload.transformTasks) > 0:
logger.info(f"Dbt tasks being pushed to the pipeline")

# dbt params
dbt_project_params, error = gather_dbt_project_params(orguser.org)
if error:
raise HttpError(400, error)
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

# dbt cli profile block
cli_block = OrgPrefectBlockv1.objects.filter(
Expand Down Expand Up @@ -517,6 +518,8 @@ def post_run_prefect_org_deployment_task(request, deployment_id, payload: TaskPa
if dataflow_orgtasks.count() == 0:
raise HttpError(400, "no org task mapped to the deployment")

orgdbt = orguser.org.dbt

# ordered
org_tasks: list[OrgTask] = [dataflow_orgtask.orgtask for dataflow_orgtask in dataflow_orgtasks]

Expand All @@ -542,7 +545,9 @@ def post_run_prefect_org_deployment_task(request, deployment_id, payload: TaskPa
cli_profile_block = OrgPrefectBlockv1.objects.filter(
org=orguser.org, block_type=DBTCLIPROFILE
).first()
dbt_project_params, error = gather_dbt_project_params(orguser.org)
dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
orguser.org, orgdbt
)

# dont set any parameters if cli block is not present or there is an error
if cli_profile_block and not error:
Expand Down
21 changes: 11 additions & 10 deletions ddpui/api/transform_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
LockCanvasRequestSchema,
LockCanvasResponseSchema,
)
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.utils.taskprogress import TaskProgress
from ddpui.core.transformfunctions import validate_operation_config, check_canvas_locked
from ddpui.api.warehouse_api import get_warehouse_data
Expand Down Expand Up @@ -55,8 +56,8 @@ def create_dbt_project(request, payload: DbtProjectSchema):
orguser: OrgUser = request.orguser
org = orguser.org

project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / org.slug
project_dir.mkdir(parents=True, exist_ok=True)
org_dir = Path(DbtProjectManager.get_org_dir(org))
org_dir.mkdir(parents=True, exist_ok=True)

# Call the post_dbt_workspace function
_, error = setup_local_dbt_workspace(
Expand All @@ -76,25 +77,25 @@ def delete_dbt_project(request, project_name: str):
"""
orguser: OrgUser = request.orguser
org = orguser.org
orgdbt = org.dbt

project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / org.slug
org_dir = Path(DbtProjectManager.get_org_dir(org))

if not project_dir.exists():
if not org_dir.exists():
raise HttpError(404, f"Organization {org.slug} does not have any projects")

dbtrepo_dir: Path = project_dir / project_name
project_dir: Path = org_dir / project_name

if not dbtrepo_dir.exists():
if not project_dir.exists():
raise HttpError(422, f"Project {project_name} does not exist in organization {org.slug}")

if org.dbt:
dbt = org.dbt
if orgdbt:
org.dbt = None
org.save()

dbt.delete()
orgdbt.delete()

shutil.rmtree(dbtrepo_dir)
shutil.rmtree(project_dir)

return {"message": f"Project {project_name} deleted successfully"}

Expand Down
45 changes: 26 additions & 19 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from ddpui.utils import secretsmanager
from ddpui.utils.taskprogress import TaskProgress
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.core.orgdbt_manager import DbtProjectManager
from ddpui.ddpdbt.schema import DbtProjectParams
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpprefect.prefect_service import (
get_flow_run_graphs,
Expand Down Expand Up @@ -80,7 +82,7 @@ def clone_github_repo(
org_slug: str,
gitrepo_url: str,
gitrepo_access_token: str | None,
project_dir: str,
org_dir: str,
taskprogress: TaskProgress | None,
) -> bool:
"""clones an org's github repo"""
Expand All @@ -107,25 +109,25 @@ def clone_github_repo(
"github.com", "oauth2:" + gitrepo_access_token + "@github.com"
)

project_dir: Path = Path(project_dir)
dbtrepo_dir = project_dir / "dbtrepo"
if not project_dir.exists():
project_dir.mkdir()
org_dir: Path = Path(org_dir)
dbtrepo_dir = org_dir / "dbtrepo"
if not org_dir.exists():
org_dir.mkdir()
taskprogress.add(
{
"message": "created project_dir",
"status": "running",
}
)
logger.info("created project_dir %s", project_dir)
logger.info("created project_dir %s", org_dir)

elif dbtrepo_dir.exists():
shutil.rmtree(str(dbtrepo_dir))

cmd = f"git clone {gitrepo_url} dbtrepo"

try:
runcmd(cmd, project_dir)
runcmd(cmd, org_dir)
except Exception as error:
taskprogress.add(
{
Expand All @@ -135,15 +137,15 @@ def clone_github_repo(
}
)
logger.exception(error)
return False
return None

taskprogress.add(
{
"message": "cloned git repo",
"status": "running" if child else "completed",
}
)
return True
return dbtrepo_dir


@app.task(bind=True)
Expand Down Expand Up @@ -178,24 +180,25 @@ def setup_dbtworkspace(self, org_id: int, payload: dict) -> str:
org.save()

# this client'a dbt setup happens here
project_dir = Path(os.getenv("CLIENTDBT_ROOT")) / org.slug
org_dir = DbtProjectManager.get_org_dir(org)

# four parameters here is correct despite vscode thinking otherwise
if not clone_github_repo(
dbtcloned_repo_path = clone_github_repo(
org.slug,
payload["gitrepoUrl"],
payload["gitrepoAccessToken"],
str(project_dir),
org_dir,
taskprogress,
):
)
if not dbtcloned_repo_path:
raise Exception("Failed to clone git repo")

logger.info("git clone succeeded for org %s", org.name)

dbt = OrgDbt(
gitrepo_url=payload["gitrepoUrl"],
project_dir=str(project_dir),
dbt_venv=os.getenv("DBT_VENV"),
project_dir=DbtProjectManager.get_dbt_repo_relative_path(dbtcloned_repo_path),
dbt_venv="venv",
target_type=warehouse.wtype,
default_schema=payload["profile"]["target_configs_schema"],
transform_type=TransformType.GIT,
Expand Down Expand Up @@ -275,16 +278,20 @@ def run_dbt_commands(self, orguser_id: int, task_id: str, dbt_run_params: dict =
logger.error("need to set up a dbt cli profile first for org %s", org.name)
return

dbt_project_params: DbtProjectParams = DbtProjectManager.gather_dbt_project_params(
org, orgdbt
)

profile = get_dbt_cli_profile_block(dbt_cli_profile.block_name)["profile"]
profile_dirname = Path(os.getenv("CLIENTDBT_ROOT")) / org.slug / "dbtrepo" / "profiles"
profile_dirname = Path(dbt_project_params.project_dir) / "profiles"
os.makedirs(profile_dirname, exist_ok=True)
profile_filename = profile_dirname / "profiles.yml"
logger.info("writing dbt profile to " + str(profile_filename))
with open(profile_filename, "w", encoding="utf-8") as f:
yaml.safe_dump(profile, f)

dbt_binary = Path(os.getenv("DBT_VENV")) / "venv/bin/dbt"
project_dir = Path(orgdbt.project_dir) / "dbtrepo"
dbt_binary = dbt_project_params.dbt_binary
project_dir = dbt_project_params.project_dir

# dbt clean
taskprogress.add({"message": "starting dbt clean", "status": "running"})
Expand Down Expand Up @@ -511,7 +518,7 @@ def create_elementary_report(task_key: str, org_id: int, bucket_file_path: str):
org = Org.objects.filter(id=org_id).first()
org_task = get_edr_send_report_task(org, create=True)

project_dir = Path(org.dbt.project_dir) / "dbtrepo"
project_dir = Path(DbtProjectManager.get_dbt_project_dir(org.dbt))
# profiles_dir = project_dir / "elementary_profiles"
aws_access_key_id = os.getenv("ELEMENTARY_AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("ELEMENTARY_AWS_SECRET_ACCESS_KEY")
Expand Down
Loading

0 comments on commit f4b64bf

Please sign in to comment.