Skip to content

Commit

Permalink
Merge pull request #292 from zillow/tz/AIP-8056-remove-rwmany
Browse files Browse the repository at this point in the history
AIP-8056 remove ReadWriteMany volume
  • Loading branch information
talebzeghmi authored Mar 12, 2024
2 parents f61a4a2 + 3eaab39 commit dc204ab
Show file tree
Hide file tree
Showing 8 changed files with 9 additions and 533 deletions.
80 changes: 9 additions & 71 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,29 +799,19 @@ def _set_container_volume(
self,
container_op: ContainerOp,
aip_component: AIPComponent,
workflow_uid: str,
shared_volumes: Dict[str, Dict[str, Tuple[ResourceOp, PipelineVolume]]],
) -> ResourceOp:
resource_requirements: Dict[str, Any] = aip_component.resource_requirements
resource_op: Optional[ResourceOp] = None

if "volume" in resource_requirements:
mode = resource_requirements["volume_mode"]
volume_dir = resource_requirements["volume_dir"]

if mode == "ReadWriteMany":
# ReadWriteMany shared volumes are created way before
(resource_op, volume) = shared_volumes[aip_component.step_name]
container_op.add_pvolumes(volume)
else:
(resource_op, volume) = self._create_volume(
step_name=aip_component.step_name,
size=resource_requirements["volume"],
workflow_uid=ARGO_WORKFLOW_UID,
mode=mode,
volume_type=resource_requirements.get("volume_type"),
)
container_op.add_pvolumes({volume_dir: volume})
(resource_op, volume) = self._create_volume(
step_name=aip_component.step_name,
size=resource_requirements["volume"],
volume_type=resource_requirements.get("volume_type"),
)
container_op.add_pvolumes({volume_dir: volume})

return resource_op

Expand Down Expand Up @@ -945,13 +935,9 @@ def _create_volume(
self,
step_name: str,
size: str,
workflow_uid: str,
mode: str,
volume_type: Optional[str],
) -> Tuple[ResourceOp, PipelineVolume]:
volume_name = (
sanitize_k8s_name(step_name) if mode == "ReadWriteMany" else "{{pod.name}}"
)
volume_name = "{{pod.name}}"
attribute_outputs = {"size": "{.status.capacity.storage}"}
requested_resources = V1ResourceRequirements(requests={"storage": size})

Expand All @@ -967,7 +953,7 @@ def _create_volume(
controller=True,
kind="Workflow",
name="{{workflow.name}}",
uid="{{workflow.uid}}",
uid=ARGO_WORKFLOW_UID,
)
owner_references = [owner_reference]
pvc_metadata = V1ObjectMeta(
Expand Down Expand Up @@ -1180,20 +1166,13 @@ def build_kfp_dag(
passed_in_split_indexes: str = "",
preceding_kfp_component_op: ContainerOp = None,
preceding_component_outputs_dict: Dict[str, dsl.PipelineParam] = None,
workflow_uid: str = None,
shared_volumes: Dict[
str, Dict[str, Tuple[ResourceOp, PipelineVolume]]
] = None,
):
if node.name in visited:
return

if preceding_component_outputs_dict is None:
preceding_component_outputs_dict = {}

if shared_volumes is None:
shared_volumes = {}

# If any of this node's children has a preceding_kfp_func then
# create (kfp_decorator_component, preceding_component_inputs)
next_aip_decorator_component: Optional[AIPComponent] = None
Expand Down Expand Up @@ -1281,7 +1260,7 @@ def build_kfp_dag(
metaflow_step_op, aip_component
)
resource_op: ResourceOp = self._set_container_volume(
metaflow_step_op, aip_component, workflow_uid, shared_volumes
metaflow_step_op, aip_component
)
if resource_op:
visited_resource_ops[node.name] = resource_op
Expand All @@ -1301,8 +1280,6 @@ def build_kfp_dag(
split_index,
preceding_kfp_component_op=next_aip_component_op,
preceding_component_outputs_dict=next_preceding_component_outputs_dict,
workflow_uid=workflow_uid,
shared_volumes=shared_volumes,
)

# Handle the ParallelFor join step, and pass in
Expand All @@ -1312,8 +1289,6 @@ def build_kfp_dag(
passed_in_split_indexes,
preceding_kfp_component_op=next_aip_component_op,
preceding_component_outputs_dict=next_preceding_component_outputs_dict,
workflow_uid=workflow_uid,
shared_volumes=shared_volumes,
)
else:
for step in node.out_funcs:
Expand All @@ -1333,8 +1308,6 @@ def build_kfp_dag(
passed_in_split_indexes,
preceding_kfp_component_op=next_aip_component_op,
preceding_component_outputs_dict=next_preceding_component_outputs_dict,
workflow_uid=workflow_uid,
shared_volumes=shared_volumes,
)

# The following exit handlers get created and added as a ContainerOp
Expand All @@ -1358,8 +1331,6 @@ def build_kfp_dag(
)
build_kfp_dag(
self.graph["start"],
workflow_uid=ARGO_WORKFLOW_UID,
shared_volumes=self.create_shared_volumes(step_name_to_aip_component),
)

# Instruct KFP of the DAG order by iterating over the Metaflow
Expand Down Expand Up @@ -1404,39 +1375,6 @@ def build_kfp_dag(
)
return kfp_pipeline_from_flow, pipeline_conf

def create_shared_volumes(
self,
step_name_to_aip_component: Dict[str, AIPComponent],
) -> Dict[str, Dict[str, Tuple[ResourceOp, PipelineVolume]]]:
"""
A volume to be shared across foreach split nodes, but not downstream steps.
An example use case is PyTorch distributed training where gradients are communicated
via the shared volume.
Returns: Dict[step_name, Dict[volume_dir, PipelineVolume]]
"""
shared_volumes: Dict[str, Dict[str, Tuple[ResourceOp, PipelineVolume]]] = {}

for aip_component in step_name_to_aip_component.values():
resources = aip_component.resource_requirements
if (
"volume_mode" in resources
and resources["volume_mode"] == "ReadWriteMany"
):
volume_dir = resources["volume_dir"]
(resource_op, volume) = self._create_volume(
step_name=f"{aip_component.step_name}-shared",
size=resources["volume"],
workflow_uid=ARGO_WORKFLOW_UID,
mode=resources["volume_mode"],
volume_type=resources.get("volume_type"),
)
shared_volumes[aip_component.step_name] = (
resource_op,
{volume_dir: volume},
)

return shared_volumes

def _create_metaflow_step_op(
self,
node: DAGNode,
Expand Down
37 changes: 0 additions & 37 deletions metaflow/plugins/aip/tests/flows/resources_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,43 +210,6 @@ def join_step(self, inputs):
assert "12G" in str(output)
self.next(self.end)

# AIP-8056(talebz): Flaky WFSDK test volume_mode=“ReadWriteMany”
# @step
# def split_step(self):
# self.items = [1, 2]
# self.next(self.shared_volume_foreach_step, foreach="items")
#
# @resources(volume="13G", volume_mode="ReadWriteMany")
# @step
# def shared_volume_foreach_step(self):
# output = subprocess.check_output(
# "df -h | grep /opt/metaflow_volume", shell=True
# )
# assert "13G" in str(output)
#
# file_path = "/opt/metaflow_volume/test.txt"
# message = "hello world!"
#
# # validate the volume is shared across the foreach splits
# if self.input == 1:
# with open(file_path, "w") as f:
# f.write(message)
# else:
# while not os.path.exists(file_path):
# time.sleep(1)
# print(".")
#
# with open(file_path, "r") as f:
# read_lines = f.readlines()
# print("read_lines", read_lines)
# assert message == read_lines[0]
#
# self.next(self.shared_volume_join_step)
#
# @step
# def shared_volume_join_step(self, inputs):
# self.next(self.end)

@step
def end(self):
print("All done.")
Expand Down
10 changes: 0 additions & 10 deletions metaflow/plugins/aip/tests/flows/s3_sensor_with_formatter_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ class S3SensorWithFormatterFlow(FlowSpec):
@step
def start(self):
print("S3SensorWithFormatterFlow is starting.")
self.items = [1, 2]
self.next(self.shared_volume_foreach_step, foreach="items")

@resources(volume="13G", volume_mode="ReadWriteMany")
@step
def shared_volume_foreach_step(self):
self.next(self.shared_volume_join_step)

@step
def shared_volume_join_step(self, inputs):
self.next(self.end)

@step
Expand Down
9 changes: 0 additions & 9 deletions metaflow/plugins/resources_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ def myStep(self):
Defaults None - relying on Kubernetes defaults.
**Note:** The volume persists state across step (container) retries.
Default unit is MB - see notes above for more units.
volume_mode: str
Not for AWS batch.
[ReadWriteOnce, ReadWriteMany]
ReadWriteOnce: can be used by this step only
ReadWriteMany:
A volume to be shared across foreach split nodes, but not downstream steps.
An example use case is PyTorch distributed training where gradients are communicated
via the shared volume.
volume_dir: str
Default "/opt/metaflow_volume"
volume_type: str
Expand All @@ -90,7 +82,6 @@ def myStep(self):
# Only AIP supported attributes
"gpu_vendor": None,
"volume": None,
"volume_mode": "ReadWriteOnce",
"volume_dir": "/opt/metaflow_volume",
"volume_type": None,
# Deprecated - kept only to show a meaningful error message
Expand Down
102 changes: 0 additions & 102 deletions metaflow/tutorials/10-pytorch/hello_pytorch.py

This file was deleted.

21 changes: 0 additions & 21 deletions metaflow/tutorials/10-pytorch/models/cnn.py

This file was deleted.

Loading

0 comments on commit dc204ab

Please sign in to comment.