Skip to content

Commit

Permalink
AIP-7773 argo flow trigger flow (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
talebzeghmi authored Oct 16, 2023
1 parent 159d287 commit 4dbdbb0
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ on:
- feature/kfp
- feature/aip
- feature/kfp-argo
- tz/AIP-7385-aip-plugin
- tz/AIP-7418-remove-create
- tz/AIP-7773-argo-ftf
workflow_call:

jobs:
Expand Down
3 changes: 2 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ stages:
-e AIP_STEP_IMAGE=${IMAGE_REPOSITORY_TAG_AIP_STEP}
${IMAGE_REPOSITORY_TAG}
bash -c "
set -x &&
cd /home/zservice/metaflow/metaflow/plugins/aip/tests &&
python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg
python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg --pipeline-tag pipeline_iid_${CI_PIPELINE_IID}
"
artifacts:
when: always
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/aip/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ def pytest_addoption(parser):
parser.addoption(
"--opsgenie-api-token", dest="opsgenie_api_token", action="store", default=None
)
parser.addoption(
"--pipeline-tag", dest="pipeline_tag", action="store", default=None
)
29 changes: 28 additions & 1 deletion metaflow/plugins/aip/tests/flows/flow_triggering_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,34 @@ def start(self):
)
logger.info(f"Creating workflow: {self.template_name}")
subprocess.run(
["python", __file__, "aip", "create", "--name", self.template_name],
[
"python",
__file__,
"aip",
"create",
"--name",
self.template_name,
"--yaml-only",
"--pipeline-path",
"/tmp/ftf.yaml",
"--kind",
"WorkflowTemplate",
"--max-run-concurrency",
"0",
],
check=True,
)
subprocess.run(["cat", "/tmp/ftf.yaml"])
print(f"{KUBERNETES_NAMESPACE=}")
subprocess.run(
[
"argo",
"template",
"-n",
KUBERNETES_NAMESPACE,
"create",
"/tmp/ftf.yaml",
],
check=True,
)

Expand Down
24 changes: 17 additions & 7 deletions metaflow/plugins/aip/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

disabled_test_flows = [
"aip_flow.py", # kfp_preceding_component feature has been deprecated.
"flow_triggering_flow.py", # TODO(talebz): will re-enable with Argo FTF
# "flow_triggering_flow.py",
# TODO(talebz) AIP-6717 re-enable for compilation changes or when cluster can handle
# "foreach_linear_foreach.py",
# "foreach_linear_split.py",
Expand All @@ -67,10 +67,12 @@ def test_s3_sensor_flow(pytestconfig) -> None:
s3_sensor_flow_cmd: str = (
f"{_python()} flows/s3_sensor_flow.py --datastore=s3 --with retry aip run "
f"--file_name {file_name} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
s3_sensor_with_formatter_flow_cmd: str = (
f"{_python()} flows/s3_sensor_with_formatter_flow.py --datastore=s3 --with retry aip run "
f"--file_name_for_formatter_test {file_name_for_formatter_test} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

main_config_cmds: str = (
Expand Down Expand Up @@ -112,6 +114,7 @@ def test_s3_sensor_flow(pytestconfig) -> None:
f"--file_name {file_name} --file_name_for_formatter_test {file_name_for_formatter_test} "
f"--s3_sensor_argo_workflow_name {s3_sensor_argo_workflow_name} --s3_sensor_with_formatter_argo_workflow_name {s3_sensor_with_formatter_argo_workflow_name} "
f"--argo-wait "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
validate_s3_sensor_flow_cmd += main_config_cmds
validate_s3_sensor_flow_cmd += image_cmds
Expand All @@ -127,6 +130,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None:
f"{_python()} flows/raise_error_flow.py --datastore=s3 aip run "
f"--argo-wait --workflow-timeout 1800 "
f"--experiment metaflow_test --tag test_t1 --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
if pytestconfig.getoption("image"):
image_cmds: str = (
Expand Down Expand Up @@ -186,6 +190,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None:
f"--experiment metaflow_test --tag test_t1 "
f"--error_flow_id={error_flow_id} "
f"--notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
check_error_handling_flow_cmd += image_cmds
run_cmd_with_backoff_from_platform_errors(
Expand All @@ -207,6 +212,7 @@ def test_flows(pytestconfig, flow_file_path: str) -> None:
f"--argo-wait --workflow-timeout 1800 "
f"--max-parallelism 3 --experiment metaflow_test --tag test_t1 "
f"--sys-tag test_sys_t1:sys_tag_value "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
if pytestconfig.getoption("image"):
test_cmd += (
Expand Down Expand Up @@ -238,6 +244,7 @@ def run_cmd_with_backoff_from_platform_errors(
for interval in backoff_intervals_in_seconds:
time.sleep(interval)

print(f"Running: {aip_run_cmd=}")
run_and_wait_process: CompletedProcess = run(
aip_run_cmd,
universal_newlines=True,
Expand Down Expand Up @@ -311,13 +318,14 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]:
return flow_yaml


def test_kfp_pod_default() -> None:
def test_kfp_pod_default(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/s3_sensor_flow.py --no-pylint --datastore s3 aip run"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path}"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

Expand All @@ -329,7 +337,7 @@ def test_kfp_pod_default() -> None:
)


def test_kubernetes_service_account_compile_only() -> None:
def test_kubernetes_service_account_compile_only(pytestconfig) -> None:
service_account = "test-service-account"
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(
Expand All @@ -339,7 +347,8 @@ def test_kubernetes_service_account_compile_only() -> None:
compile_to_yaml_cmd: str = (
f"export METAFLOW_KUBERNETES_SERVICE_ACCOUNT={service_account};"
f" {_python()} flows/toleration_and_affinity_flow.py aip run"
f" --yaml-only --pipeline-path {yaml_file_path}"
f" --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)
Expand All @@ -354,7 +363,7 @@ def test_kubernetes_service_account_compile_only() -> None:
assert "METAFLOW_KUBERNETES_SERVICE_ACCOUNT" in env


def test_toleration_and_affinity_compile_only() -> None:
def test_toleration_and_affinity_compile_only(pytestconfig) -> None:
step_templates: Dict[str, str] = {}
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(
Expand All @@ -363,7 +372,8 @@ def test_toleration_and_affinity_compile_only() -> None:

compile_to_yaml_cmd: str = (
f"{_python()} flows/toleration_and_affinity_flow.py --datastore=s3 --with retry aip run"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path}"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)
Expand Down

0 comments on commit 4dbdbb0

Please sign in to comment.