Skip to content

Commit

Permalink
Merge pull request #293 from zillow/tz/AIP-8163-remove-kfp-annotations
Browse files Browse the repository at this point in the history
AIP-8163 Remove KFP annotations & labels from Argo YAML
  • Loading branch information
talebzeghmi authored Mar 12, 2024
2 parents dc204ab + ca1a9ef commit b15a7fa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
21 changes: 21 additions & 0 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ def trigger(cls, kubernetes_namespace: str, name: str, parameters=None):
except Exception as e:
raise AIPException(str(e))

@staticmethod
def _remove_kfp_annotations_labels(workflow: Dict[Text, Any]):
def remove_keys(d: dict):
kf_prefix = "pipelines.kubeflow.org/"
for k in list(d):
if k.startswith(kf_prefix):
del d[k]

def remove_annotations_labels(d: dict):
remove_keys(d["metadata"]["annotations"])
remove_keys(d["metadata"]["labels"])

remove_annotations_labels(workflow)

for template in workflow["spec"]["templates"]:
if "metadata" in template:
remove_annotations_labels(template)

def _create_workflow_yaml(
self,
flow_parameters: Dict,
Expand All @@ -248,6 +266,9 @@ def _create_workflow_yaml(
pipeline_conf=pipeline_conf,
)

# mutates and removes kubeflow labels and annotations
KubeflowPipelines._remove_kfp_annotations_labels(workflow)

workflow["spec"]["arguments"]["parameters"] = [
dict(name=k, value=json.dumps(v) if isinstance(v, dict) else v)
for k, v in flow_parameters.items()
Expand Down
24 changes: 19 additions & 5 deletions metaflow/plugins/aip/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
or not the run was successful. It also checks to make sure the logging
functionality works.
More specifically, the tests spawn KFP runs and ensure the spawning processes
have a returncode of 0. If any test fails within KFP, an exception
More specifically, the tests spawn Argo runs and ensure the spawning processes
have a returncode of 0. If any test fails within Argo, an exception
is raised, the test fails, and the user can access the run link to the failed
KFP run.
Argo run.
Parameters:
-n: specifies the number of parallel processes used by PyTest.
Sometimes, the tests may fail on KFP due to resource quota issues. If they do,
Sometimes, the tests may fail on Argo due to resource quota issues. If they do,
try reducing -n (number of parallel processes) so less simultaneous
KFP runs will be scheduled.
Argo runs will be scheduled.
"""

Expand Down Expand Up @@ -316,6 +316,20 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]:
return flow_yaml


def test_kfp_labels(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow_labels.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/s3_sensor_flow.py --no-pylint --datastore s3 aip run"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

assert "pipelines.kubeflow.org" not in json.dumps(flow_yaml)


def test_kfp_pod_default(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml")
Expand Down

0 comments on commit b15a7fa

Please sign in to comment.