Skip to content

Commit

Permalink
AIP-7774 tag integration test w/ pipeline-id (#262)
Browse files Browse the repository at this point in the history
* AIP-7774 tag integration test w/ pipeline-id

* debug

* debug 2

* debug 3

* --pipeline-tag pipeline_iid_${CI_PIPELINE_IID}
  • Loading branch information
talebzeghmi authored Oct 16, 2023
1 parent 36c2c11 commit ba1f997
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
- feature/aip
- feature/kfp-argo
- tz/AIP-7418-remove-create
- tz/AIP-7773-argo-ftf
workflow_call:

jobs:
Expand Down
3 changes: 2 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ stages:
-e AIP_STEP_IMAGE=${IMAGE_REPOSITORY_TAG_AIP_STEP}
${IMAGE_REPOSITORY_TAG}
bash -c "
set -x &&
cd /home/zservice/metaflow/metaflow/plugins/aip/tests &&
python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg
python -m pytest -s -n 3 run_integration_tests.py --image ${IMAGE_REPOSITORY_TAG} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg --pipeline-tag pipeline_iid_${CI_PIPELINE_IID}
"
artifacts:
when: always
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/aip/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ def pytest_addoption(parser):
parser.addoption(
"--opsgenie-api-token", dest="opsgenie_api_token", action="store", default=None
)
parser.addoption(
"--pipeline-tag", dest="pipeline_tag", action="store", default=None
)
22 changes: 16 additions & 6 deletions metaflow/plugins/aip/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ def test_s3_sensor_flow(pytestconfig) -> None:
s3_sensor_flow_cmd: str = (
f"{_python()} flows/s3_sensor_flow.py --datastore=s3 --with retry aip run "
f"--file_name {file_name} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
s3_sensor_with_formatter_flow_cmd: str = (
f"{_python()} flows/s3_sensor_with_formatter_flow.py --datastore=s3 --with retry aip run "
f"--file_name_for_formatter_test {file_name_for_formatter_test} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

main_config_cmds: str = (
Expand Down Expand Up @@ -112,6 +114,7 @@ def test_s3_sensor_flow(pytestconfig) -> None:
f"--file_name {file_name} --file_name_for_formatter_test {file_name_for_formatter_test} "
f"--s3_sensor_argo_workflow_name {s3_sensor_argo_workflow_name} --s3_sensor_with_formatter_argo_workflow_name {s3_sensor_with_formatter_argo_workflow_name} "
f"--argo-wait "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
validate_s3_sensor_flow_cmd += main_config_cmds
validate_s3_sensor_flow_cmd += image_cmds
Expand All @@ -127,6 +130,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None:
f"{_python()} flows/raise_error_flow.py --datastore=s3 aip run "
f"--argo-wait --workflow-timeout 1800 "
f"--experiment metaflow_test --tag test_t1 --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
if pytestconfig.getoption("image"):
image_cmds: str = (
Expand Down Expand Up @@ -186,6 +190,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None:
f"--experiment metaflow_test --tag test_t1 "
f"--error_flow_id={error_flow_id} "
f"--notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
check_error_handling_flow_cmd += image_cmds
run_cmd_with_backoff_from_platform_errors(
Expand All @@ -207,6 +212,7 @@ def test_flows(pytestconfig, flow_file_path: str) -> None:
f"--argo-wait --workflow-timeout 1800 "
f"--max-parallelism 3 --experiment metaflow_test --tag test_t1 "
f"--sys-tag test_sys_t1:sys_tag_value "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
if pytestconfig.getoption("image"):
test_cmd += (
Expand Down Expand Up @@ -238,6 +244,7 @@ def run_cmd_with_backoff_from_platform_errors(
for interval in backoff_intervals_in_seconds:
time.sleep(interval)

print(f"Running: {aip_run_cmd=}")
run_and_wait_process: CompletedProcess = run(
aip_run_cmd,
universal_newlines=True,
Expand Down Expand Up @@ -311,13 +318,14 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]:
return flow_yaml


def test_kfp_pod_default() -> None:
def test_kfp_pod_default(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/s3_sensor_flow.py --no-pylint --datastore s3 aip run"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path}"
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

Expand All @@ -329,7 +337,7 @@ def test_kfp_pod_default() -> None:
)


def test_kubernetes_service_account_compile_only() -> None:
def test_kubernetes_service_account_compile_only(pytestconfig) -> None:
service_account = "test-service-account"
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(
Expand All @@ -339,7 +347,8 @@ def test_kubernetes_service_account_compile_only() -> None:
compile_to_yaml_cmd: str = (
f"export METAFLOW_KUBERNETES_SERVICE_ACCOUNT={service_account};"
f" {_python()} flows/toleration_and_affinity_flow.py aip run"
f" --yaml-only --pipeline-path {yaml_file_path}"
f" --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)
Expand All @@ -354,7 +363,7 @@ def test_kubernetes_service_account_compile_only() -> None:
assert "METAFLOW_KUBERNETES_SERVICE_ACCOUNT" in env


def test_toleration_and_affinity_compile_only() -> None:
def test_toleration_and_affinity_compile_only(pytestconfig) -> None:
step_templates: Dict[str, str] = {}
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(
Expand All @@ -363,7 +372,8 @@ def test_toleration_and_affinity_compile_only() -> None:

compile_to_yaml_cmd: str = (
f"{_python()} flows/toleration_and_affinity_flow.py --datastore=s3 --with retry aip run"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path}"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)

flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)
Expand Down

0 comments on commit ba1f997

Please sign in to comment.