Skip to content

Commit

Permalink
AIP-7738 retry_backoff_factor feature
Browse files Browse the repository at this point in the history
- change the default
  - minutes_between_retries = 2m
  - RETRY_BACKOFF_FACTOR = 3
  - 2 * 3 * 3 * 3 = 54 minutes
- this would give flows more time to be resilient for transient cluster failures.
note: If we believe that this should be more configurable, then the default retry strategy could be applied by the webhook.
  • Loading branch information
talebzeghmi committed Oct 17, 2023
1 parent 1508ca8 commit 7dfdeb6
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
31 changes: 29 additions & 2 deletions metaflow/plugins/kfp/kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
PVC_CREATE_RETRY_COUNT,
EXIT_HANDLER_RETRY_COUNT,
BACKOFF_DURATION,
RETRY_BACKOFF_FACTOR,
)
from metaflow.plugins.kfp.kfp_decorator import KfpException

Expand Down Expand Up @@ -102,6 +103,7 @@ def __init__(
environment_decorator: EnvironmentDecorator,
total_retries: int,
minutes_between_retries: str,
retry_backoff_factor: float,
):
self.step_name = step_name
self.resource_requirements = resource_requirements
Expand All @@ -111,6 +113,7 @@ def __init__(
self.environment_decorator = environment_decorator
self.total_retries = total_retries
self.minutes_between_retries = minutes_between_retries
self.retry_backoff_factor = retry_backoff_factor

self.preceding_kfp_func: Callable = (
kfp_decorator.attributes.get("preceding_component", None)
Expand Down Expand Up @@ -294,6 +297,14 @@ def _get_minutes_between_retries(node: DAGNode) -> Optional[str]:
return f"{val}m" if isinstance(val, numbers.Number) else val
return None

@staticmethod
def _get_retry_backoff_factor(node: DAGNode) -> Optional[float]:
retry_deco = [deco for deco in node.decorators if deco.name == "retry"]
if retry_deco:
val = retry_deco[0].attributes.get("retry_backoff_factor")
return float(val)
return None

@staticmethod
def _get_resource_requirements(node: DAGNode) -> Dict[str, str]:
"""
Expand Down Expand Up @@ -430,6 +441,7 @@ def build_kfp_component(node: DAGNode, task_id: str) -> KfpComponent:
user_code_retries, total_retries = KubeflowPipelines._get_retries(node)
resource_requirements = self._get_resource_requirements(node)
minutes_between_retries = self._get_minutes_between_retries(node)
retry_backoff_factor = self._get_retry_backoff_factor(node)

return KfpComponent(
step_name=node.name,
Expand Down Expand Up @@ -468,6 +480,7 @@ def build_kfp_component(node: DAGNode, task_id: str) -> KfpComponent:
),
total_retries=total_retries,
minutes_between_retries=minutes_between_retries,
retry_backoff_factor=retry_backoff_factor,
)

# Mapping of steps to their KfpComponent
Expand Down Expand Up @@ -719,7 +732,10 @@ def _create_volume(
attribute_outputs=attribute_outputs,
)
resource.set_retry(
PVC_CREATE_RETRY_COUNT, policy="Always", backoff_duration=BACKOFF_DURATION
PVC_CREATE_RETRY_COUNT,
policy="Always",
backoff_duration=BACKOFF_DURATION,
backoff_factor=RETRY_BACKOFF_FACTOR,
)
volume = PipelineVolume(
name=f"{volume_name}-volume", pvc=resource.outputs["name"]
Expand Down Expand Up @@ -924,6 +940,7 @@ def build_kfp_dag(
kfp_component.total_retries,
policy="Always",
backoff_duration=kfp_component.minutes_between_retries,
backoff_factor=kfp_component.retry_backoff_factor,
)

if preceding_kfp_component_op:
Expand Down Expand Up @@ -1243,6 +1260,12 @@ def _create_workflow_uid_op(
file_outputs={"Output": "/tmp/outputs/Output/data"},
).set_display_name("get_workflow_uid")
KubeflowPipelines._set_minimal_container_resources(workflow_uid_op)
workflow_uid_op.set_retry(
S3_SENSOR_RETRY_COUNT,
policy="Always",
backoff_duration=BACKOFF_DURATION,
backoff_factor=RETRY_BACKOFF_FACTOR,
)
return workflow_uid_op
else:
return None
Expand Down Expand Up @@ -1317,7 +1340,10 @@ def _create_s3_sensor_op(

KubeflowPipelines._set_minimal_container_resources(s3_sensor_op)
s3_sensor_op.set_retry(
S3_SENSOR_RETRY_COUNT, policy="Always", backoff_duration=BACKOFF_DURATION
S3_SENSOR_RETRY_COUNT,
policy="Always",
backoff_duration=BACKOFF_DURATION,
backoff_factor=RETRY_BACKOFF_FACTOR,
)
return s3_sensor_op

Expand Down Expand Up @@ -1373,5 +1399,6 @@ def _create_exit_handler_op(self, package_commands: str) -> ContainerOp:
EXIT_HANDLER_RETRY_COUNT,
policy="Always",
backoff_duration=BACKOFF_DURATION,
backoff_factor=RETRY_BACKOFF_FACTOR,
)
)
3 changes: 2 additions & 1 deletion metaflow/plugins/kfp/kfp_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
S3_SENSOR_RETRY_COUNT = 7
PVC_CREATE_RETRY_COUNT = 7
EXIT_HANDLER_RETRY_COUNT = 7
BACKOFF_DURATION = "1m" # 1 minute
BACKOFF_DURATION = "2m" # 2 minute
RETRY_BACKOFF_FACTOR = 3 # 2 * 3 * 3 * 3 = 54 minutes

STEP_ENVIRONMENT_VARIABLES = "/tmp/step-environment-variables.sh"

Expand Down
16 changes: 16 additions & 0 deletions metaflow/plugins/kfp/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,22 @@ def test_kfp_pod_default() -> None:
)


def test_retry_strategy() -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "resources_flow.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/resources_flow.py --with retry --no-pylint --datastore s3 kfp run "
f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path}"
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

for template in flow_yaml["spec"]["templates"]:
if not template.get("dag"):
assert (template["retryStrategy"]["backoff"]["duration"], "2m")
assert (template["retryStrategy"]["backoff"]["factor"], 3)


def test_kubernetes_service_account_compile_only() -> None:
service_account = "test-service-account"
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
Expand Down
9 changes: 8 additions & 1 deletion metaflow/plugins/retry_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ def myStep(self):
Number of times to retry this step. Defaults to 3
minutes_between_retries : int
Number of minutes between retries
retry_backoff_factor : int
Exponential backoff factor. If set to 3, the time between retries will triple each time.
Defaults to 3.
"""

name = "retry"
defaults = {"times": "3", "minutes_between_retries": "2"}
defaults = {
"times": "3",
"minutes_between_retries": "2",
"retry_backoff_factor": "3",
}

def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
# The total number of attempts must not exceed MAX_ATTEMPTS.
Expand Down

0 comments on commit 7dfdeb6

Please sign in to comment.