Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-8163 Remove KFP annotations & labels from Argo YAML #293

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ def trigger(cls, kubernetes_namespace: str, name: str, parameters=None):
except Exception as e:
raise AIPException(str(e))

@staticmethod
def _remove_kfp_annotations_labels(workflow: Dict[Text, Any]):
def remove_keys(d: dict):
kf_prefix = "pipelines.kubeflow.org/"
for k in list(d):
if k.startswith(kf_prefix):
del d[k]

def remove_annotations_labels(d: dict):
remove_keys(d["metadata"]["annotations"])
remove_keys(d["metadata"]["labels"])

remove_annotations_labels(workflow)

for template in workflow["spec"]["templates"]:
if "metadata" in template:
remove_annotations_labels(template)

def _create_workflow_yaml(
self,
flow_parameters: Dict,
Expand All @@ -248,6 +266,9 @@ def _create_workflow_yaml(
pipeline_conf=pipeline_conf,
)

# mutates and removes kubeflow labels and annotations
KubeflowPipelines._remove_kfp_annotations_labels(workflow)

workflow["spec"]["arguments"]["parameters"] = [
dict(name=k, value=json.dumps(v) if isinstance(v, dict) else v)
for k, v in flow_parameters.items()
Expand Down
24 changes: 19 additions & 5 deletions metaflow/plugins/aip/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
or not the run was successful. It also checks to make sure the logging
functionality works.

More specifically, the tests spawn KFP runs and ensure the spawning processes
have a returncode of 0. If any test fails within KFP, an exception
More specifically, the tests spawn Argo runs and ensure the spawning processes
have a returncode of 0. If any test fails within Argo, an exception
is raised, the test fails, and the user can access the run link to the failed
KFP run.
Argo run.

Parameters:
-n: specifies the number of parallel processes used by PyTest.

Sometimes, the tests may fail on KFP due to resource quota issues. If they do,
Sometimes, the tests may fail on Argo due to resource quota issues. If they do,
try reducing -n (number of parallel processes) so less simultaneous
KFP runs will be scheduled.
Argo runs will be scheduled.

"""

Expand Down Expand Up @@ -316,6 +316,20 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]:
return flow_yaml


def test_kfp_labels(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow_labels.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/s3_sensor_flow.py --no-pylint --datastore s3 aip run"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

assert "pipelines.kubeflow.org" not in json.dumps(flow_yaml)


def test_kfp_pod_default(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml")
Expand Down
Loading