diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 87da670447..c199d0ad9a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,8 +12,8 @@ on: - feature/kfp - feature/aip - feature/kfp-argo - - tz/AIP-7385-aip-plugin - tz/AIP-7418-remove-create + - tz/AIP-7773-argo-ftf workflow_call: jobs: diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9015f8980a..8801e8abe9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -68,8 +68,9 @@ stages: -e AIP_STEP_IMAGE=${IMAGE_REPOSITORY_TAG_AIP_STEP} ${IMAGE_REPOSITORY_TAG} bash -c " + set -x && cd /home/zservice/metaflow/metaflow/plugins/aip/tests && - python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg + python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg --pipeline-tag pipeline_iid_${CI_PIPELINE_IID} " artifacts: when: always diff --git a/metaflow/plugins/aip/tests/conftest.py b/metaflow/plugins/aip/tests/conftest.py index f19753afd1..10ff5aee90 100644 --- a/metaflow/plugins/aip/tests/conftest.py +++ b/metaflow/plugins/aip/tests/conftest.py @@ -7,3 +7,6 @@ def pytest_addoption(parser): parser.addoption( "--opsgenie-api-token", dest="opsgenie_api_token", action="store", default=None ) + parser.addoption( + "--pipeline-tag", dest="pipeline_tag", action="store", default=None + ) diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 172fc08874..295e48c7a7 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -61,7 +61,34 @@ def start(self): ) logger.info(f"Creating workflow: {self.template_name}") subprocess.run( - ["python", __file__, "aip", "create", "--name", self.template_name], + [ + "python", + __file__, + "aip", + "create", + "--name", + self.template_name, + "--yaml-only", + "--pipeline-path", + "/tmp/ftf.yaml", + "--kind", + "WorkflowTemplate", + "--max-run-concurrency", + "0", + ], + check=True, + ) + subprocess.run(["cat", "/tmp/ftf.yaml"]) + print(f"{KUBERNETES_NAMESPACE=}") + subprocess.run( + [ + "argo", + "template", + "-n", + KUBERNETES_NAMESPACE, + "create", + "/tmp/ftf.yaml", + ], check=True, ) diff --git a/metaflow/plugins/aip/tests/run_integration_tests.py b/metaflow/plugins/aip/tests/run_integration_tests.py index 7826959041..75783b91cd 100644 --- a/metaflow/plugins/aip/tests/run_integration_tests.py +++ b/metaflow/plugins/aip/tests/run_integration_tests.py @@ -50,7 +50,7 @@ disabled_test_flows = [ "aip_flow.py", # kfp_preceding_component feature has been deprecated. - "flow_triggering_flow.py", # TODO(talebz): will re-enable with Argo FTF + # "flow_triggering_flow.py", # TODO(talebz) AIP-6717 re-enable for compilation changes or when cluster can handle # "foreach_linear_foreach.py", # "foreach_linear_split.py", @@ -67,10 +67,12 @@ def test_s3_sensor_flow(pytestconfig) -> None: s3_sensor_flow_cmd: str = ( f"{_python()} flows/s3_sensor_flow.py --datastore=s3 --with retry aip run " f"--file_name {file_name} --notify " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) s3_sensor_with_formatter_flow_cmd: str = ( f"{_python()} flows/s3_sensor_with_formatter_flow.py --datastore=s3 --with retry aip run " f"--file_name_for_formatter_test {file_name_for_formatter_test} --notify " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) main_config_cmds: str = ( @@ -112,6 +114,7 @@ def test_s3_sensor_flow(pytestconfig) -> None: f"--file_name {file_name} --file_name_for_formatter_test {file_name_for_formatter_test} " f"--s3_sensor_argo_workflow_name {s3_sensor_argo_workflow_name} --s3_sensor_with_formatter_argo_workflow_name {s3_sensor_with_formatter_argo_workflow_name} " f"--argo-wait " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) validate_s3_sensor_flow_cmd += main_config_cmds validate_s3_sensor_flow_cmd += image_cmds @@ -127,6 +130,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None: f"{_python()} flows/raise_error_flow.py --datastore=s3 aip run " f"--argo-wait --workflow-timeout 1800 " f"--experiment metaflow_test --tag test_t1 --notify " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) if pytestconfig.getoption("image"): image_cmds: str = ( @@ -186,6 +190,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None: f"--experiment metaflow_test --tag test_t1 " f"--error_flow_id={error_flow_id} " f"--notify " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) check_error_handling_flow_cmd += image_cmds run_cmd_with_backoff_from_platform_errors( @@ -207,6 +212,7 @@ def test_flows(pytestconfig, flow_file_path: str) -> None: f"--argo-wait --workflow-timeout 1800 " f"--max-parallelism 3 --experiment metaflow_test --tag test_t1 " f"--sys-tag test_sys_t1:sys_tag_value " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) if pytestconfig.getoption("image"): test_cmd += ( @@ -238,6 +244,7 @@ def run_cmd_with_backoff_from_platform_errors( for interval in backoff_intervals_in_seconds: time.sleep(interval) + print(f"Running: {aip_run_cmd=}") run_and_wait_process: CompletedProcess = run( aip_run_cmd, universal_newlines=True, @@ -311,13 +318,14 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]: return flow_yaml -def test_kfp_pod_default() -> None: +def test_kfp_pod_default(pytestconfig) -> None: with tempfile.TemporaryDirectory() as yaml_tmp_dir: yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml") compile_to_yaml_cmd: str = ( f" {_python()} flows/s3_sensor_flow.py --no-pylint --datastore s3 aip run" - f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path}" + f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) @@ -329,7 +337,7 @@ def test_kfp_pod_default() -> None: ) -def test_kubernetes_service_account_compile_only() -> None: +def test_kubernetes_service_account_compile_only(pytestconfig) -> None: service_account = "test-service-account" with tempfile.TemporaryDirectory() as yaml_tmp_dir: yaml_file_path: str = os.path.join( @@ -339,7 +347,8 @@ def test_kubernetes_service_account_compile_only() -> None: compile_to_yaml_cmd: str = ( f"export METAFLOW_KUBERNETES_SERVICE_ACCOUNT={service_account};" f" {_python()} flows/toleration_and_affinity_flow.py aip run" - f" --yaml-only --pipeline-path {yaml_file_path}" + f" --yaml-only --pipeline-path {yaml_file_path} " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) @@ -354,7 +363,7 @@ def test_kubernetes_service_account_compile_only() -> None: assert "METAFLOW_KUBERNETES_SERVICE_ACCOUNT" in env -def test_toleration_and_affinity_compile_only() -> None: +def test_toleration_and_affinity_compile_only(pytestconfig) -> None: step_templates: Dict[str, str] = {} with tempfile.TemporaryDirectory() as yaml_tmp_dir: yaml_file_path: str = os.path.join( @@ -363,7 +372,8 @@ def test_toleration_and_affinity_compile_only() -> None: compile_to_yaml_cmd: str = ( f"{_python()} flows/toleration_and_affinity_flow.py --datastore=s3 --with retry aip run" - f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path}" + f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path} " + f"--tag {pytestconfig.getoption('pipeline_tag')} " ) flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)