From 12d40d8a2a4fba342c4a77c4d57b57911aba8996 Mon Sep 17 00:00:00 2001 From: Taleb Zeghmi Date: Fri, 1 Mar 2024 11:00:51 -0800 Subject: [PATCH] AIP-8095 perm argo-ui link w/ uid --- metaflow/plugins/aip/aip.py | 2 ++ metaflow/plugins/aip/aip_cli.py | 7 ++++++- metaflow/plugins/aip/aip_exit_handler.py | 8 +++++--- metaflow/plugins/aip/aip_udf_exit_handler.py | 8 +++----- metaflow/plugins/aip/argo_utils.py | 6 ++---- metaflow/plugins/aip/tests/flows/flow_triggering_flow.py | 2 +- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index f77e48d3b4..9053ddae08 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -1724,6 +1724,7 @@ def _get_aip_exit_handler_op( " && python -m metaflow.plugins.aip.aip_exit_handler" f" --flow_name {self.name}" f" --run_id {METAFLOW_RUN_ID}" + f" --argo_workflow_uid {ARGO_WORKFLOW_UID}" f" --env_variables_json {json.dumps(json.dumps(env_variables))}" f" --flow_parameters_json {flow_parameters_json if flow_parameters else '{}'}" " --status {{workflow.status}}" @@ -1775,6 +1776,7 @@ def _get_user_defined_exit_handler_op( f" && METAFLOW_USER=aip-user python {os.path.basename(sys.argv[0])} {top_level} aip user-defined-exit-handler" f" --flow_name {self.name}" f" --run_id {METAFLOW_RUN_ID}" + f" --argo_workflow_uid {ARGO_WORKFLOW_UID}" f" --env_variables_json {json.dumps(json.dumps(env_variables))}" f" --flow_parameters_json {flow_parameters_json if flow_parameters else '{}'}" " --status {{workflow.status}}" diff --git a/metaflow/plugins/aip/aip_cli.py b/metaflow/plugins/aip/aip_cli.py index 1effaca600..7e534f2aaa 100644 --- a/metaflow/plugins/aip/aip_cli.py +++ b/metaflow/plugins/aip/aip_cli.py @@ -51,6 +51,7 @@ def kubeflow_pipelines(obj): @click.option("--flow_name") @click.option("--status") @click.option("--run_id") +@click.option("--argo_workflow_uid") @click.option("--env_variables_json") @click.option("--flow_parameters_json") @click.option("--metaflow_configs_json") @@ -61,6 +62,7 @@ def user_defined_exit_handler( flow_name: str, status: str, run_id: str, + argo_workflow_uid: str, env_variables_json: str, flow_parameters_json: str, metaflow_configs_json: str, @@ -72,6 +74,7 @@ def user_defined_exit_handler( flow_name, status, run_id, + argo_workflow_uid, env_variables_json, flow_parameters_json, metaflow_configs_json, @@ -420,7 +423,9 @@ def _echo_workflow_run( argo_workflow_uid = workflow_manifest["metadata"]["uid"] metaflow_run_id = to_metaflow_run_id(argo_workflow_uid) metaflow_ui_url = run_id_to_metaflow_url(flow_name, argo_workflow_uid) - argo_ui_url = run_id_to_url(argo_workflow_name, kubernetes_namespace) + argo_ui_url = run_id_to_url( + argo_workflow_name, kubernetes_namespace, argo_workflow_uid + ) obj.echo(f"Metaflow run_id=*{metaflow_run_id}*\n", fg="magenta") obj.echo(f"*Argo UI:* {argo_ui_url}", fg="cyan") if AIP_SHOW_METAFLOW_UI_URL: diff --git a/metaflow/plugins/aip/aip_exit_handler.py b/metaflow/plugins/aip/aip_exit_handler.py index b25f14df26..a1d1c3f113 100644 --- a/metaflow/plugins/aip/aip_exit_handler.py +++ b/metaflow/plugins/aip/aip_exit_handler.py @@ -2,6 +2,7 @@ from metaflow._vendor import click import logging +from metaflow.plugins.aip import run_id_to_url _logger = logging.getLogger(__name__) @@ -10,6 +11,7 @@ @click.option("--flow_name") @click.option("--status") @click.option("--run_id") +@click.option("--argo_workflow_uid") @click.option("--env_variables_json") @click.option("--flow_parameters_json") @click.option("--run_email_notify", is_flag=True, default=False) @@ -18,6 +20,7 @@ def exit_handler( flow_name: str, status: str, run_id: str, + argo_workflow_uid: str, env_variables_json: str, flow_parameters_json: str, run_email_notify: bool = False, @@ -66,11 +69,10 @@ def email_notify(send_to): argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "") email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "") - argo_url_prefix = get_env("ARGO_RUN_URL_PREFIX", "") k8s_namespace = get_env("POD_NAMESPACE", "") - argo_ui_url = ( - f"{argo_url_prefix}/argo-ui/workflows/{k8s_namespace}/{argo_workflow_name}" + argo_ui_url = run_id_to_url( + argo_workflow_name, k8s_namespace, argo_workflow_uid ) body = ( f"status = {status}
\n" diff --git a/metaflow/plugins/aip/aip_udf_exit_handler.py b/metaflow/plugins/aip/aip_udf_exit_handler.py index c9784577fb..9e13007e7b 100644 --- a/metaflow/plugins/aip/aip_udf_exit_handler.py +++ b/metaflow/plugins/aip/aip_udf_exit_handler.py @@ -4,6 +4,7 @@ from metaflow.decorators import flow_decorators, FlowDecorator from metaflow.graph import FlowGraph +from metaflow.plugins.aip import run_id_to_url _logger = logging.getLogger(__name__) @@ -13,6 +14,7 @@ def invoke_user_defined_exit_handler( flow_name: str, status: str, run_id: str, + argo_workflow_uid: str, env_variables_json: str, flow_parameters_json: str, metaflow_configs_json: str, @@ -40,13 +42,9 @@ def invoke_user_defined_exit_handler( def get_env(name, default=None) -> str: return env_variables.get(name, os.environ.get(name, default=default)) - cluster_env = get_env("K8S_CLUSTER_ENV", "") argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "") - argo_url_prefix = get_env("ARGO_RUN_URL_PREFIX", "") k8s_namespace = get_env("POD_NAMESPACE", "") - argo_ui_url = ( - f"{argo_url_prefix}/argo-ui/workflows/{k8s_namespace}/{argo_workflow_name}" - ) + argo_ui_url = run_id_to_url(argo_workflow_name, k8s_namespace, argo_workflow_uid) metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json) metaflow_configs_new: Dict[str, str] = { diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index c7a3393092..ab54d445c6 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -60,10 +60,8 @@ def to_metaflow_run_id(argo_run_uid: str): ) -def run_id_to_url(argo_run_id: str, kubernetes_namespace: str): - argo_ui_url = ( - f"{ARGO_RUN_URL_PREFIX}/argo-ui/workflows/{kubernetes_namespace}/{argo_run_id}" - ) +def run_id_to_url(argo_run_id: str, kubernetes_namespace: str, argo_workflow_uid: str): + argo_ui_url = f"{ARGO_RUN_URL_PREFIX}/argo-ui/workflows/{kubernetes_namespace}/{argo_run_id}?uid={argo_workflow_uid}" return argo_ui_url diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 86e0196871..662fbffd33 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -113,7 +113,7 @@ def end(self): }, ) logger.info(f"{run_id=}, {run_uid=}") - logger.info(f"{run_id_to_url(run_id, KUBERNETES_NAMESPACE)=}") + logger.info(f"{run_id_to_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}") logger.info("Testing timeout exception for wait_for_kfp_run_completion") try: