Skip to content

Commit

Permalink
Add unit tests & bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudw committed Aug 14, 2024
1 parent a783eb5 commit a88ce45
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 61 deletions.
4 changes: 2 additions & 2 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ def from_conf(name, default=None):
# Note: `ARGO_RUN_URL_PREFIX` is the URL prefix for ARGO runs on your ARGO cluster. The prefix includes
# all parts of the URL except the run_id at the end which we append once the run is created.
# For eg, this would look like: "https://<your-kf-cluster-url>/argo-ui/workflows/
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "")
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "").rstrip("/")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "").rstrip("/")
AIP_MAX_PARALLELISM = int(from_conf("AIP_MAX_PARALLELISM", 10))
AIP_MAX_RUN_CONCURRENCY = int(from_conf("AIP_MAX_RUN_CONCURRENCY", 10))
AIP_SHOW_METAFLOW_UI_URL = bool(from_conf("AIP_SHOW_METAFLOW_UI_URL", False))
Expand Down
5 changes: 2 additions & 3 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ def _create_workflow_yaml(
# Note the name has to follow k8s format.
# self.name is typically CamelCase as it's python class name.
# generateName contains a sanitized version of self.name from aip.compiler
default_workflow_name = workflow["metadata"].pop("generateName").rstrip("-")
workflow["metadata"]["name"] = (
sanitize_k8s_name(name)
if name
else workflow["metadata"].pop("generateName").rstrip("-")
sanitize_k8s_name(name) if name else default_workflow_name
)

# Service account is added through webhooks.
Expand Down
45 changes: 29 additions & 16 deletions metaflow/plugins/aip/argo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import datetime
from typing import Optional, Union, Dict, Any, Tuple, Callable

from metaflow.metaflow_config import ARGO_RUN_URL_PREFIX, METAFLOW_RUN_URL_PREFIX
from metaflow.metaflow_config import (
ARGO_RUN_URL_PREFIX,
METAFLOW_RUN_URL_PREFIX,
KUBERNETES_NAMESPACE,
)
from metaflow.plugins.aip.argo_client import ArgoClient
from metaflow.plugins.aip.aip_decorator import AIPException
from metaflow.plugins.aip.aip_utils import _get_aip_logger
Expand All @@ -13,7 +17,7 @@


class ArgoHelper:
def __init__(self, kubernetes_namespace: str):
def __init__(self, kubernetes_namespace: str = KUBERNETES_NAMESPACE):
"""
Args:
kubernetes_namespace: Namespace where Argo is running.
Expand Down Expand Up @@ -68,6 +72,7 @@ def trigger_latest(
template_prefix: Optional[str] = None,
project_name: Optional[str] = None,
branch_name: Optional[str] = None,
flow_name: Optional[str] = None,
filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None,
parameters: Optional[dict] = None,
wait_timeout: Union[int, float, datetime.timedelta] = 0,
Expand All @@ -87,16 +92,11 @@ def trigger_latest(
wait_timeout: Time to wait for the workflow to complete. Set to 0 to skip waiting.
**kwarg: Other parameters for the watch function. See `ArgoHelper.watch`.
"""
if not any([template_prefix, project_name, branch_name]):
raise AIPException(
"Running argo workflow with no specified template nor filters can be dangerous. "
"Please set at least one of project_name, branch_name or template_name_prefix."
)

template_name: str = self.template_get_latest(
project_name=project_name,
branch_name=branch_name,
template_name_prefix=template_prefix,
template_prefix=template_prefix,
flow_name=flow_name,
filter_func=filter_func,
)

Expand All @@ -109,28 +109,36 @@ def trigger_latest(

def template_get_latest(
self,
template_name_prefix: Optional[str] = None,
template_prefix: Optional[str] = None,
project_name: Optional[str] = None,
branch_name: Optional[str] = None,
flow_name: Optional[str] = None,
filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None,
name_only: bool = True,
) -> Union[str, Dict[str, Any]]:
"""
Args:
template_name_prefix: Prefix of the template name to match.
template_prefix: Prefix of the template name to match.
project_name: Project name to match.
branch_name: Branch name to match.
flow_name: Flow name to match.
filter_func: Custom filter function that is passed template, and should return boolean value
indicating if the template can be used.
name_only: Whether to return only the name of the template or the full manifest. Defaults to True.
Returns:
The name of the latest workflow template, or the full manifest if name_only is set to False.
"""
if not any(
[template_prefix, project_name, branch_name, flow_name, filter_func]
):
raise AIPException(
"Finding latest argo workflow with no specified filters risks picking up unexpected templates. "
"Please set at least one of project_name, branch_name, template_prefix, flow_name or filter_func."
)

# TODO:
# - Add filter by project_id instead of project name - project_id is not added as a label yet.
# - Add filter by flow_name - flow_name is not added as a label yet.

templates = self._client.list_workflow_template()["items"]

Expand All @@ -139,7 +147,7 @@ def template_get_latest(
for template in templates
if (
not project_name
or template["metadata"]["labels"].get("gitlab.zgtools.net/project-name")
or template["metadata"]["labels"].get("metaflow.org/tag_project-name")
== project_name
)
and (
Expand All @@ -148,15 +156,20 @@ def template_get_latest(
== branch_name
)
and (
not template_name_prefix
or template["metadata"]["name"].startswith(template_name_prefix)
not flow_name
or template["metadata"]["labels"].get("metaflow.org/flow_name")
== flow_name
)
and (
not template_prefix
or template["metadata"]["name"].startswith(template_prefix)
)
and (not filter_func or filter_func(template))
]
if not templates:
raise AIPException(
f"No workflow template found with constraints "
f"{project_name=}, {branch_name=}, {template_name_prefix=}, {filter_func=}"
f"{project_name=}, {branch_name=}, {template_prefix=}, {filter_func=}"
)

# Sort by creation timestamp to get the latest template.
Expand Down
137 changes: 97 additions & 40 deletions metaflow/plugins/aip/tests/flows/flow_triggering_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,59 +59,78 @@ def start(self):
# and template_name == 'wfdsk-ftf-test-he5d4--6rhai0z0wiysuew'
# where aip _create_workflow_yaml() calls sanitize_k8s_name() which returns
# 'wfdsk-ftf-test-he5d4-6rhai0z0wiysuew' without the double --
self.template_name = sanitize_k8s_name(
f"{TEST_TEMPLATE_NAME}-{generate_base64_uuid()}".lower()
)
logger.info(f"Creating workflow: {self.template_name}")
subprocess.run(
[
"python",
__file__,
"aip",
"create",
"--name",
self.template_name,
"--yaml-only",
"--pipeline-path",
"/tmp/ftf.yaml",
"--kind",
"WorkflowTemplate",
"--max-run-concurrency",
"0",
],
check=True,
)
subprocess.run(["cat", "/tmp/ftf.yaml"])

print(f"{KUBERNETES_NAMESPACE=}")
subprocess.run(
[
"argo",
"template",
"-n",
KUBERNETES_NAMESPACE,
"create",
"/tmp/ftf.yaml",
],
check=True,
)

self.workflow_template_names = [
sanitize_k8s_name(
f"{TEST_TEMPLATE_NAME}-{generate_base64_uuid()}".lower()
)
for _ in range(3)
]
self.triggered_by_tag = "triggerred-by"
self.index_tag = "template-index"

for template_name, template_index in enumerate(
self.workflow_template_names
):
path = f"/tmp/{template_name}.yaml"
logger.info(f"Creating workflow: {template_name}")
self.comiple_workflow(
template_name,
path,
extra_args=[
"--tag",
f"{self.triggered_by_tag}:{current.run_id}" "--tag",
f"{self.index}:{template_index}",
],
)
subprocess.run(["cat", path])
self.submit_template(path)
time.sleep(1) # Spacing workflow template submission time.

self.next(self.end)

@step
def end(self):
"""Trigger downstream pipeline and test triggering behaviors"""
if self.trigger_enabled:
logger.info("\nTesting run_kubeflow_pipeline")
argo_helper = ArgoHelper(KUBERNETES_NAMESPACE)
argo_helper = ArgoHelper()

# ====== Test template filtering ======
# Test latest template is returned with prefix filter
assert self.workflow_template_names[-1] == argo_helper.template_get_latest(
template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()),
flow_name=current.flow_name,
filter_func=lambda template: template["metadata"]["labels"][
f"metaflow.org/tag_{self.triggered_by_tag}"
]
== current.run_id,
)

# Test filter func correctly filters
assert self.workflow_template_names[1] == argo_helper.template_get_latest(
template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()),
flow_name=current.flow_name,
filter_func=lambda template: template["metadata"]["labels"][
f"metaflow.org/tag_{self.triggered_by_tag}"
]
== current.run_id
and template["metadata"]["labels"][f"metaflow.org/tag_{self.index_tag}"]
== str(1),
)

# ====== Test template triggering ======
logger.info("\n Testing ArgoHelper.trigger")
run_id, run_uid = argo_helper.trigger(
template_name=self.template_name,
template_name=self.workflow_template_names[0],
parameters={
"trigger_enabled": False,
"triggered_by": current.run_id,
},
)
logger.info(f"{run_id=}, {run_uid=}")
logger.info(f"{get_argo_url(run_id, run_uid)=}")
logger.info(f"{get_argo_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}")

logger.info("Testing timeout exception for wait_for_kfp_run_completion")
try:
Expand All @@ -136,8 +155,9 @@ def end(self):

_retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path)

logger.info(f"Deleting {self.template_name=}")
argo_helper.template_delete(self.template_name)
for template_name in self.workflow_template_names:
logger.info(f"Deleting {template_name}")
argo_helper.template_delete(template_name)
else:
logger.info(f"{self.trigger_enabled=}")

Expand All @@ -148,6 +168,43 @@ def assert_task_triggered_by(metaflow_path: str):
logger.info(f"assert {start_step.task.data.triggered_by=} == {current.run_id=}")
assert start_step.task.data.triggered_by == current.run_id

@staticmethod
def comiple_workflow(template_name, path, extra_args=None):
extra_args = extra_args or []
subprocess.run(
[
"python",
__file__,
"aip",
"create",
"--name",
template_name,
"--yaml-only",
"--pipeline-path",
path,
"--kind",
"WorkflowTemplate",
"--max-run-concurrency",
"0",
*extra_args,
],
check=True,
)

@staticmethod
def submit_template(path):
subprocess.run(
[
"argo",
"template",
"-n",
KUBERNETES_NAMESPACE,
"create",
path,
],
check=True,
)


if __name__ == "__main__":
FlowTriggeringFlow()

0 comments on commit a88ce45

Please sign in to comment.