Skip to content

Commit

Permalink
fixes #780
Browse files Browse the repository at this point in the history
add instance type for aws_batch_scheduler multinode jobs
  • Loading branch information
azzhipa committed Oct 19, 2023
1 parent a711634 commit db3d484
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 69 deletions.
5 changes: 5 additions & 0 deletions torchx/schedulers/aws_batch_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ def _role_to_node_properties(
"mountPoints": mount_points,
"volumes": volumes,
}
if role.num_replicas > 1:
# making instance_type work for any resource that explicitly specify it
instance_type = getattr(role.resource, "instance_type", None)
if instance_type is not None:
container["instanceType"] = instance_type

return {
"targetNodes": f"{start_idx}:{start_idx + role.num_replicas - 1}",
Expand Down
39 changes: 36 additions & 3 deletions torchx/schedulers/test/aws_batch_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from torchx.specs import AppState, Resource


def _test_app() -> specs.AppDef:
def _test_app(
num_replicas: int = 2, resource: Optional[Resource] = None
) -> specs.AppDef:
trainer_role = specs.Role(
name="trainer",
image="pytorch/torchx:latest",
Expand All @@ -41,13 +43,14 @@ def _test_app() -> specs.AppDef:
f" --rank0_host $${{{specs.macros.rank0_env}:=localhost}}",
],
env={"FOO": "bar"},
resource=specs.Resource(
resource=resource
or specs.Resource(
cpu=2,
memMB=3000,
gpu=4,
),
port_map={"foo": 1234},
num_replicas=2,
num_replicas=num_replicas,
max_retries=3,
mounts=[
specs.BindMount(src_path="/src", dst_path="/dst", read_only=True),
Expand Down Expand Up @@ -156,6 +159,36 @@ def test_submit_dryrun_privileged(self) -> None:
self.assertEqual(1, len(node_groups))
self.assertTrue(node_groups[0]["container"]["privileged"])

def test_submit_dryrun_instance_type_multinode(self) -> None:
cfg = AWSBatchOpts({"queue": "ignored_in_test", "privileged": True})
resource = specs.named_resources_aws.aws_p3dn_24xlarge()
app = _test_app(num_replicas=2, resource=resource)
info = create_scheduler("test").submit_dryrun(app, cfg)
node_groups = info.request.job_def["nodeProperties"]["nodeRangeProperties"]
self.assertEqual(1, len(node_groups))
self.assertEqual(
resource.capabilities[specs.named_resources_aws.K8S_ITYPE],
node_groups[0]["container"]["instanceType"],
)

def test_submit_dryrun_no_instance_type_singlenode(self) -> None:
cfg = AWSBatchOpts({"queue": "ignored_in_test", "privileged": True})
resource = specs.named_resources_aws.aws_p3dn_24xlarge()
app = _test_app(num_replicas=1, resource=resource)
info = create_scheduler("test").submit_dryrun(app, cfg)
node_groups = info.request.job_def["nodeProperties"]["nodeRangeProperties"]
self.assertEqual(1, len(node_groups))
self.assertTrue("instanceType" not in node_groups[0]["container"])

def test_submit_dryrun_no_instance_type_non_aws(self) -> None:
cfg = AWSBatchOpts({"queue": "ignored_in_test", "privileged": True})
resource = specs.named_resources_aws.aws_p3dn_24xlarge()
app = _test_app(num_replicas=2)
info = create_scheduler("test").submit_dryrun(app, cfg)
node_groups = info.request.job_def["nodeProperties"]["nodeRangeProperties"]
self.assertEqual(1, len(node_groups))
self.assertTrue("instanceType" not in node_groups[0]["container"])

@mock_rand()
def test_submit_dryrun(self) -> None:
cfg = AWSBatchOpts({"queue": "testqueue", "user": "testuser"})
Expand Down
106 changes: 40 additions & 66 deletions torchx/specs/named_resources_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"""

from dataclasses import dataclass
from typing import Callable, Mapping

from torchx.specs.api import Resource
Expand All @@ -45,161 +46,134 @@
GiB: int = int(1024 * MEM_TAX)


@dataclass
class AwsResource(Resource):
instance_type: str = ""

def __post_init__(self):
if not self.instance_type:
raise TypeError("__init__ missing 1 required argument: 'instance_type'")

Check warning on line 55 in torchx/specs/named_resources_aws.py

View check run for this annotation

Codecov / codecov/patch

torchx/specs/named_resources_aws.py#L55

Added line #L55 was not covered by tests
if K8S_ITYPE not in self.capabilities:
self.capabilities[K8S_ITYPE] = self.instance_type


def aws_p3_2xlarge() -> Resource:
return Resource(
cpu=8, gpu=1, memMB=61 * GiB, capabilities={K8S_ITYPE: "p3.2xlarge"}
)
return AwsResource(cpu=8, gpu=1, memMB=61 * GiB, instance_type="p3.2xlarge")


def aws_p3_8xlarge() -> Resource:
return Resource(
cpu=32, gpu=4, memMB=244 * GiB, capabilities={K8S_ITYPE: "p3.8xlarge"}
)
return AwsResource(cpu=32, gpu=4, memMB=244 * GiB, instance_type="p3.8xlarge")


def aws_p3_16xlarge() -> Resource:
return Resource(
cpu=64, gpu=8, memMB=488 * GiB, capabilities={K8S_ITYPE: "p3.16xlarge"}
)
return AwsResource(cpu=64, gpu=8, memMB=488 * GiB, instance_type="p3.16xlarge")


def aws_p3dn_24xlarge() -> Resource:
return Resource(
return AwsResource(
cpu=96,
gpu=8,
memMB=768 * GiB,
capabilities={K8S_ITYPE: "p3dn.24xlarge"},
instance_type="p3dn.24xlarge",
devices={EFA_DEVICE: 1},
)


def aws_p4d_24xlarge() -> Resource:
return Resource(
return AwsResource(
cpu=96,
gpu=8,
memMB=1152 * GiB,
capabilities={K8S_ITYPE: "p4d.24xlarge"},
instance_type="p4d.24xlarge",
devices={EFA_DEVICE: 4},
)


def aws_p4de_24xlarge() -> Resource:
# p4de has same cpu, gpu, memMB as p4d but gpu memory is 2x (32GB vs 64GB per GPU)
return Resource(
return AwsResource(
cpu=96,
gpu=8,
memMB=1152 * GiB,
capabilities={K8S_ITYPE: "p4de.24xlarge"},
instance_type="p4de.24xlarge",
devices={EFA_DEVICE: 4},
)


def aws_t3_medium() -> Resource:
return Resource(cpu=2, gpu=0, memMB=4 * GiB, capabilities={K8S_ITYPE: "t3.medium"})
return AwsResource(cpu=2, gpu=0, memMB=4 * GiB, instance_type="t3.medium")


def aws_m5_2xlarge() -> Resource:
return Resource(
cpu=8, gpu=0, memMB=32 * GiB, capabilities={K8S_ITYPE: "m5.2xlarge"}
)
return AwsResource(cpu=8, gpu=0, memMB=32 * GiB, instance_type="m5.2xlarge")


def aws_g4dn_xlarge() -> Resource:
return Resource(
cpu=4, gpu=1, memMB=16 * GiB, capabilities={K8S_ITYPE: "g4dn.xlarge"}
)
return AwsResource(cpu=4, gpu=1, memMB=16 * GiB, instance_type="g4dn.xlarge")


def aws_g4dn_2xlarge() -> Resource:
return Resource(
cpu=8, gpu=1, memMB=32 * GiB, capabilities={K8S_ITYPE: "g4dn.2xlarge"}
)
return AwsResource(cpu=8, gpu=1, memMB=32 * GiB, instance_type="g4dn.2xlarge")


def aws_g4dn_4xlarge() -> Resource:
return Resource(
cpu=16, gpu=1, memMB=64 * GiB, capabilities={K8S_ITYPE: "g4dn.4xlarge"}
)
return AwsResource(cpu=16, gpu=1, memMB=64 * GiB, instance_type="g4dn.4xlarge")


def aws_g4dn_8xlarge() -> Resource:
return Resource(
cpu=32, gpu=1, memMB=128 * GiB, capabilities={K8S_ITYPE: "g4dn.8xlarge"}
)
return AwsResource(cpu=32, gpu=1, memMB=128 * GiB, instance_type="g4dn.8xlarge")


def aws_g4dn_16xlarge() -> Resource:
return Resource(
cpu=64, gpu=1, memMB=256 * GiB, capabilities={K8S_ITYPE: "g4dn.16xlarge"}
)
return AwsResource(cpu=64, gpu=1, memMB=256 * GiB, instance_type="g4dn.16xlarge")


def aws_g4dn_12xlarge() -> Resource:
return Resource(
cpu=48, gpu=4, memMB=192 * GiB, capabilities={K8S_ITYPE: "g4dn.12xlarge"}
)
return AwsResource(cpu=48, gpu=4, memMB=192 * GiB, instance_type="g4dn.12xlarge")


def aws_g4dn_metal() -> Resource:
return Resource(
cpu=96, gpu=8, memMB=384 * GiB, capabilities={K8S_ITYPE: "g4dn.metal"}
)
return AwsResource(cpu=96, gpu=8, memMB=384 * GiB, instance_type="g4dn.metal")


def aws_g5_xlarge() -> Resource:
return Resource(cpu=4, gpu=1, memMB=16 * GiB, capabilities={K8S_ITYPE: "g5.xlarge"})
return AwsResource(cpu=4, gpu=1, memMB=16 * GiB, instance_type="g5.xlarge")


def aws_g5_2xlarge() -> Resource:
return Resource(
cpu=8, gpu=1, memMB=32 * GiB, capabilities={K8S_ITYPE: "g5.2xlarge"}
)
return AwsResource(cpu=8, gpu=1, memMB=32 * GiB, instance_type="g5.2xlarge")


def aws_g5_4xlarge() -> Resource:
return Resource(
cpu=16, gpu=1, memMB=64 * GiB, capabilities={K8S_ITYPE: "g5.4xlarge"}
)
return AwsResource(cpu=16, gpu=1, memMB=64 * GiB, instance_type="g5.4xlarge")


def aws_g5_8xlarge() -> Resource:
return Resource(
cpu=32, gpu=1, memMB=128 * GiB, capabilities={K8S_ITYPE: "g5.8xlarge"}
)
return AwsResource(cpu=32, gpu=1, memMB=128 * GiB, instance_type="g5.8xlarge")


def aws_g5_16xlarge() -> Resource:
return Resource(
cpu=64, gpu=1, memMB=256 * GiB, capabilities={K8S_ITYPE: "g5.16xlarge"}
)
return AwsResource(cpu=64, gpu=1, memMB=256 * GiB, instance_type="g5.16xlarge")


def aws_g5_12xlarge() -> Resource:
return Resource(
cpu=48, gpu=4, memMB=192 * GiB, capabilities={K8S_ITYPE: "g5.12xlarge"}
)
return AwsResource(cpu=48, gpu=4, memMB=192 * GiB, instance_type="g5.12xlarge")


def aws_g5_24xlarge() -> Resource:
return Resource(
cpu=96, gpu=4, memMB=384 * GiB, capabilities={K8S_ITYPE: "g5.24xlarge"}
)
return AwsResource(cpu=96, gpu=4, memMB=384 * GiB, instance_type="g5.24xlarge")


def aws_g5_48xlarge() -> Resource:
return Resource(
cpu=192, gpu=8, memMB=768 * GiB, capabilities={K8S_ITYPE: "g5.48xlarge"}
)
return AwsResource(cpu=192, gpu=8, memMB=768 * GiB, instance_type="g5.48xlarge")


def aws_trn1_2xl() -> Resource:
return Resource(cpu=8, gpu=0, memMB=32 * GiB, capabilities={K8S_ITYPE: "trn1.2xl"})
return AwsResource(cpu=8, gpu=0, memMB=32 * GiB, instance_type="trn1.2xl")


def aws_trn1_32xl() -> Resource:
return Resource(
cpu=128, gpu=0, memMB=512 * GiB, capabilities={K8S_ITYPE: "trn1.32xl"}
)
return AwsResource(cpu=128, gpu=0, memMB=512 * GiB, instance_type="trn1.32xl")


NAMED_RESOURCES: Mapping[str, Callable[[], Resource]] = {
Expand Down

0 comments on commit db3d484

Please sign in to comment.