Skip to content

Commit

Permalink
Apply review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Jan 7, 2025
1 parent 55d817f commit 49e50bf
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 43 deletions.
17 changes: 11 additions & 6 deletions sagemaker/pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,25 @@ To create a new SageMaker pipeline for GraphStorm:

```bash
python create_sm_pipeline.py \
--execution-role arn:aws:iam::123456789012:role/SageMakerRole \
--region us-west-2 \
--graph-construction-config-filename my_gconstruct_config.json \
--graph-name my-graph \
--graphstorm-pytorch-cpu-image-url 123456789012.dkr.ecr.us-west-2.amazonaws.com/graphstorm:sagemaker-cpu \
--input-data-s3 s3://input-bucket/data \
--instance-count 2 \
--jobs-to-run gconstruct train inference \
--graph-name my-graph \
--graph-construction-config-filename my_gconstruct_config.json \
--input-data-s3 s3://input-bucket/data \
--output-prefix s3://output-bucket/results \
--pipeline-name my-graphstorm-pipeline \
--region us-west-2 \
--role arn:aws:iam::123456789012:role/SageMakerExecutionRole \
--train-inference-task node_classification \
--train-yaml-s3 s3://config-bucket/train.yaml
```

This command creates a new pipeline with the specified configuration. The pipeline will
include one GConstruct job, one training job and one inference job.
The `--role` argument is required to provide the execution role SageMaker will use to
run the jobs, and the `--graphstorm-pytorch-cpu-image-url` is needed to provide
the Docker image to use during training and GConstruct.
It will use the configuration defined in `s3://input-bucket/data/my_gconstruct_config.json`
to construct the graph and the train config file at `s3://config-bucket/train.yaml`
to run training and inference.
Expand Down Expand Up @@ -174,7 +178,8 @@ This will create a pipeline that uses GSProcessing to process and prepare the da
use DistPart to partition the data, convert the partitioned data to the GraphBolt format,
then run a train and an inference job in sequence.
You can use this job sequence when your graph is too large to partition on one instance using
GConstruct (1+ TB is the suggested threshold to move to distributed partitioning).
GConstruct. 10B+ edges is the suggested threshold to move to distributed partitioning, or if your
features are larger than 1TB.

### Asynchronous Execution

Expand Down
39 changes: 20 additions & 19 deletions sagemaker/pipeline/create_sm_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Create a SageMaker pipeline for GraphStorm.
"""

import logging
import os
import re
from typing import List, Optional, Sequence, Union
Expand Down Expand Up @@ -108,10 +109,9 @@ def _get_or_create_pipeline_session(
def create_pipeline(self) -> Pipeline:
"""Create a SageMaker pipeline for GraphStorm.
The pipeline consists of the following steps:
1. Partition graph
2. Train graph
3. Encode node IDs
The pipeline can consist of the following steps:
1. Pre-process graph data using GConstruct or GSProcessing
2. Partition graph
4. Train model
5. Inference
Expand Down Expand Up @@ -385,8 +385,6 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep:
],
)

print(gsprocessing_config.expr)

gsprocessing_arguments = [
"--config-filename",
self.graphconstruct_config_param,
Expand Down Expand Up @@ -538,6 +536,17 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep:
],
)

train_params = {
"graph-data-s3": self.next_step_data_input,
"graph-name": self.graph_name_param,
"log-level": args.task_config.log_level,
"model-artifact-s3": model_output_path,
"task-type": args.training_config.train_inference_task,
"train-yaml-s3": self.train_config_file_param,
"num-trainers": self.num_trainers_param,
"use-graphbolt": self.use_graphbolt_param,
}

# Training step
train_estimator = PyTorch(
entry_point=os.path.basename(args.script_paths.train_script),
Expand All @@ -547,16 +556,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep:
instance_count=self.instance_count_param,
instance_type=self.train_infer_instance,
py_version="py3",
hyperparameters={
"graph-data-s3": self.next_step_data_input,
"graph-name": self.graph_name_param,
"log-level": args.task_config.log_level,
"model-artifact-s3": model_output_path,
"task-type": args.training_config.train_inference_task,
"train-yaml-s3": self.train_config_file_param,
"num-trainers": self.num_trainers_param,
"use-graphbolt": self.use_graphbolt_param,
},
hyperparameters=train_params,
sagemaker_session=self.pipeline_session,
disable_profiler=True,
debugger_hook_config=False,
Expand Down Expand Up @@ -671,6 +671,7 @@ def _get_pipeline_name(args: PipelineArgs) -> str:
def main():
"""Create or update a GraphStorm SageMaker Pipeline."""
pipeline_args = parse_pipeline_args()
logging.basicConfig(logging.INFO)

save_pipeline_args(
pipeline_args, f"{pipeline_args.task_config.pipeline_name}-pipeline-args.json"
Expand All @@ -681,12 +682,12 @@ def main():
pipeline = pipeline_generator.create_pipeline()

if pipeline_args.update:
# TODO: If updating ensure pipeline exists first to get more informative error
# TODO: If updating, ensure pipeline exists first to get more informative error
pipeline.update(role_arn=pipeline_args.aws_config.execution_role)
print(f"Pipeline '{pipeline.name}' updated successfully.")
logging.info("Pipeline '%s' updated successfully.", pipeline.name)
else:
pipeline.create(role_arn=pipeline_args.aws_config.execution_role)
print(f"Pipeline '{pipeline.name}' created successfully.")
logging.info("Pipeline '%s' created successfully.", pipeline.name)


if __name__ == "__main__":
Expand Down
17 changes: 10 additions & 7 deletions sagemaker/pipeline/execute_sm_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import argparse
import logging
import os
import subprocess
import sys
Expand Down Expand Up @@ -141,6 +142,7 @@ def parse_args():
def main():
"""Execute GraphStorm SageMaker pipeline"""
args = parse_args()
logging.basicConfig(logging.INFO)

pipeline_deploy_args = load_pipeline_args(
args.pipeline_args_json_file or f"{args.pipeline_name}-pipeline-args.json"
Expand All @@ -162,7 +164,7 @@ def main():
pipeline_generator = GraphStormPipelineGenerator(
pipeline_deploy_args, input_session=local_session
)
# Set shared memory to half the host's size, as SM does
# Set shared memory to half of the host's size, as SM does
instance_mem_mb = int(psutil.virtual_memory().total // (1024 * 1024))
local_session.config = {
"local": {"container_config": {"shm_size": f"{instance_mem_mb//2}M"}}
Expand All @@ -179,7 +181,7 @@ def main():

# Prepare parameter overrides
execution_params = {}
if args.instance_count is not None:
if args.instance_count:
execution_params["InstanceCount"] = args.instance_count
pipeline_deploy_args.instance_config.train_infer_instance_count = (
args.instance_count
Expand Down Expand Up @@ -261,14 +263,15 @@ def main():
if args.local_execution:
sys.exit(0)

print(f"Pipeline execution started: {execution.describe()}")
print(f"Execution ARN: {execution.arn}")
logging.info("Pipeline execution started: %s", execution.describe())
logging.info("Execution ARN: %s", execution.arn)

if not args.async_execution:
print("Waiting for pipeline execution to complete...")
logging.info("Waiting for pipeline execution to complete...")
execution.wait()
print("Pipeline execution completed.")
print(f"Final status: {execution.describe()['PipelineExecutionStatus']}")
logging.info("Pipeline execution completed.")
logging.info("Final status: %s",
execution.describe()['PipelineExecutionStatus'])


if __name__ == "__main__":
Expand Down
28 changes: 17 additions & 11 deletions sagemaker/pipeline/pipeline_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,16 @@ def __post_init__(self):
assert (
self.cpu_instance_type
), "Need to provide a CPU instance type when training on CPU"
else:
assert (
self.gpu_instance_type
), "Need to provide a GPU instance type when training on GPU"

if not self.graph_construction_instance_type:
self.graph_construction_instance_type = self.cpu_instance_type
assert (
self.cpu_instance_type
), "Need to provide a CPU instance for graph construction."
logging.warning(
"No graph processing instance type specified, using the CPU instance type: %s",
self.cpu_instance_type,
Expand Down Expand Up @@ -106,7 +113,6 @@ def __post_init__(self):
"Should not try to run both GConstruct and GSProcessing steps, "
f"got job sequence: {self.jobs_to_run}"
)
# TODO: Should we allow running GConstruct with dist_part?

# When running gsprocessing ensure we run dist_part as well
if "gsprocessing" in self.jobs_to_run and "dist_part" not in self.jobs_to_run:
Expand Down Expand Up @@ -254,15 +260,15 @@ def get_hash_hex(self):

def __post_init__(self):
if not self.instance_config.train_on_cpu:
assert self.instance_config.gpu_instance_type, (
"GPU instance type must be specified if not training on CPU, "
f"got {self.instance_config.train_on_cpu=} "
f"{self.instance_config.gpu_instance_type=}"
)
assert self.aws_config.graphstorm_pytorch_cpu_image_url, (
assert self.aws_config.graphstorm_pytorch_gpu_image_url, (
"Must use provide GPU image when training on GPU. "
"use --graphstorm-pytorch-gpu-image-url"
)
else:
assert self.aws_config.graphstorm_pytorch_cpu_image_url, (
"Must use provide CPU image when training on CPU. "
"use --graphstorm-pytorch-cpu-image-url"
)

# Ensure we provide a GConstruct/GSProcessing config file when running construction
if (
Expand Down Expand Up @@ -330,8 +336,8 @@ def __post_init__(self):

# GConstruct uses 'metis', so just translate that if needed
if (
"gconstruct" in self.task_config.jobs_to_run and
self.partition_config.partition_algorithm.lower() == "parmetis"
and "gconstruct" in self.task_config.jobs_to_run
):
self.partition_config.partition_algorithm = "metis"

Expand Down Expand Up @@ -497,7 +503,7 @@ def parse_pipeline_args() -> PipelineArgs:
optional_args.add_argument(
"--volume-size-gb",
type=int,
help="Additional volume size for instances in GB.",
help="Additional volume size for SageMaker instances in GB.",
default=100,
)

Expand Down Expand Up @@ -526,15 +532,15 @@ def parse_pipeline_args() -> PipelineArgs:
"--base-job-name",
type=str,
default="gs",
help="Base job name for SageMaker jobs. Default: 'sm'",
help="Base job name for SageMaker jobs. Default: 'gs'",
)
required_args.add_argument(
"--jobs-to-run",
nargs="+",
required=True,
help=(
"Space-separated string of jobs to run in the pipeline, "
"e.g. 'gconstruct train inference'. "
"e.g. '--jobs-to-run gconstruct train inference'. "
f"Should be one or more of: {list(JOB_ORDER.keys())}. Required. "
),
)
Expand Down

0 comments on commit 49e50bf

Please sign in to comment.