From 49e50bfbcdb829b33a5317ae8aeb20e7ba3f3167 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 7 Jan 2025 15:34:08 +0100 Subject: [PATCH] Apply review comments. --- sagemaker/pipeline/README.md | 17 ++++++---- sagemaker/pipeline/create_sm_pipeline.py | 39 ++++++++++++----------- sagemaker/pipeline/execute_sm_pipeline.py | 17 ++++++---- sagemaker/pipeline/pipeline_parameters.py | 28 +++++++++------- 4 files changed, 58 insertions(+), 43 deletions(-) diff --git a/sagemaker/pipeline/README.md b/sagemaker/pipeline/README.md index ac1c20f47f..ab1ef5ff30 100644 --- a/sagemaker/pipeline/README.md +++ b/sagemaker/pipeline/README.md @@ -66,21 +66,25 @@ To create a new SageMaker pipeline for GraphStorm: ```bash python create_sm_pipeline.py \ - --execution-role arn:aws:iam::123456789012:role/SageMakerRole \ - --region us-west-2 \ + --graph-construction-config-filename my_gconstruct_config.json \ + --graph-name my-graph \ --graphstorm-pytorch-cpu-image-url 123456789012.dkr.ecr.us-west-2.amazonaws.com/graphstorm:sagemaker-cpu \ + --input-data-s3 s3://input-bucket/data \ --instance-count 2 \ --jobs-to-run gconstruct train inference \ - --graph-name my-graph \ - --graph-construction-config-filename my_gconstruct_config.json \ - --input-data-s3 s3://input-bucket/data \ --output-prefix s3://output-bucket/results \ + --pipeline-name my-graphstorm-pipeline \ + --region us-west-2 \ + --role arn:aws:iam::123456789012:role/SageMakerExecutionRole \ --train-inference-task node_classification \ --train-yaml-s3 s3://config-bucket/train.yaml ``` This command creates a new pipeline with the specified configuration. The pipeline will include one GConstruct job, one training job and one inference job. +The `--role` argument is required to provide the execution role SageMaker will use to +run the jobs, and the `--graphstorm-pytorch-cpu-image-url` is needed to provide +the Docker image to use during training and GConstruct. It will use the configuration defined in `s3://input-bucket/data/my_gconstruct_config.json` to construct the graph and the train config file at `s3://config-bucket/train.yaml` to run training and inference. @@ -174,7 +178,8 @@ This will create a pipeline that uses GSProcessing to process and prepare the da use DistPart to partition the data, convert the partitioned data to the GraphBolt format, then run a train and an inference job in sequence. You can use this job sequence when your graph is too large to partition on one instance using -GConstruct (1+ TB is the suggested threshold to move to distributed partitioning). +GConstruct. 10B+ edges is the suggested threshold to move to distributed partitioning, or if your +features are larger than 1TB. ### Asynchronous Execution diff --git a/sagemaker/pipeline/create_sm_pipeline.py b/sagemaker/pipeline/create_sm_pipeline.py index de1e9fe097..5ffd46d335 100644 --- a/sagemaker/pipeline/create_sm_pipeline.py +++ b/sagemaker/pipeline/create_sm_pipeline.py @@ -16,6 +16,7 @@ Create a SageMaker pipeline for GraphStorm. """ +import logging import os import re from typing import List, Optional, Sequence, Union @@ -108,10 +109,9 @@ def _get_or_create_pipeline_session( def create_pipeline(self) -> Pipeline: """Create a SageMaker pipeline for GraphStorm. - The pipeline consists of the following steps: - 1. Partition graph - 2. Train graph - 3. Encode node IDs + The pipeline can consist of the following steps: + 1. Pre-process graph data using GConstruct or GSProcessing + 2. Partition graph 4. Train model 5. Inference @@ -385,8 +385,6 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep: ], ) - print(gsprocessing_config.expr) - gsprocessing_arguments = [ "--config-filename", self.graphconstruct_config_param, @@ -538,6 +536,17 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep: ], ) + train_params = { + "graph-data-s3": self.next_step_data_input, + "graph-name": self.graph_name_param, + "log-level": args.task_config.log_level, + "model-artifact-s3": model_output_path, + "task-type": args.training_config.train_inference_task, + "train-yaml-s3": self.train_config_file_param, + "num-trainers": self.num_trainers_param, + "use-graphbolt": self.use_graphbolt_param, + } + # Training step train_estimator = PyTorch( entry_point=os.path.basename(args.script_paths.train_script), @@ -547,16 +556,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep: instance_count=self.instance_count_param, instance_type=self.train_infer_instance, py_version="py3", - hyperparameters={ - "graph-data-s3": self.next_step_data_input, - "graph-name": self.graph_name_param, - "log-level": args.task_config.log_level, - "model-artifact-s3": model_output_path, - "task-type": args.training_config.train_inference_task, - "train-yaml-s3": self.train_config_file_param, - "num-trainers": self.num_trainers_param, - "use-graphbolt": self.use_graphbolt_param, - }, + hyperparameters=train_params, sagemaker_session=self.pipeline_session, disable_profiler=True, debugger_hook_config=False, @@ -671,6 +671,7 @@ def _get_pipeline_name(args: PipelineArgs) -> str: def main(): """Create or update a GraphStorm SageMaker Pipeline.""" pipeline_args = parse_pipeline_args() + logging.basicConfig(logging.INFO) save_pipeline_args( pipeline_args, f"{pipeline_args.task_config.pipeline_name}-pipeline-args.json" @@ -681,12 +682,12 @@ def main(): pipeline = pipeline_generator.create_pipeline() if pipeline_args.update: - # TODO: If updating ensure pipeline exists first to get more informative error + # TODO: If updating, ensure pipeline exists first to get more informative error pipeline.update(role_arn=pipeline_args.aws_config.execution_role) - print(f"Pipeline '{pipeline.name}' updated successfully.") + logging.info("Pipeline '%s' updated successfully.", pipeline.name) else: pipeline.create(role_arn=pipeline_args.aws_config.execution_role) - print(f"Pipeline '{pipeline.name}' created successfully.") + logging.info("Pipeline '%s' created successfully.", pipeline.name) if __name__ == "__main__": diff --git a/sagemaker/pipeline/execute_sm_pipeline.py b/sagemaker/pipeline/execute_sm_pipeline.py index ec150b2506..f257cc8087 100644 --- a/sagemaker/pipeline/execute_sm_pipeline.py +++ b/sagemaker/pipeline/execute_sm_pipeline.py @@ -17,6 +17,7 @@ """ import argparse +import logging import os import subprocess import sys @@ -141,6 +142,7 @@ def parse_args(): def main(): """Execute GraphStorm SageMaker pipeline""" args = parse_args() + logging.basicConfig(logging.INFO) pipeline_deploy_args = load_pipeline_args( args.pipeline_args_json_file or f"{args.pipeline_name}-pipeline-args.json" @@ -162,7 +164,7 @@ def main(): pipeline_generator = GraphStormPipelineGenerator( pipeline_deploy_args, input_session=local_session ) - # Set shared memory to half the host's size, as SM does + # Set shared memory to half of the host's size, as SM does instance_mem_mb = int(psutil.virtual_memory().total // (1024 * 1024)) local_session.config = { "local": {"container_config": {"shm_size": f"{instance_mem_mb//2}M"}} @@ -179,7 +181,7 @@ def main(): # Prepare parameter overrides execution_params = {} - if args.instance_count is not None: + if args.instance_count: execution_params["InstanceCount"] = args.instance_count pipeline_deploy_args.instance_config.train_infer_instance_count = ( args.instance_count @@ -261,14 +263,15 @@ def main(): if args.local_execution: sys.exit(0) - print(f"Pipeline execution started: {execution.describe()}") - print(f"Execution ARN: {execution.arn}") + logging.info("Pipeline execution started: %s", execution.describe()) + logging.info("Execution ARN: %s", execution.arn) if not args.async_execution: - print("Waiting for pipeline execution to complete...") + logging.info("Waiting for pipeline execution to complete...") execution.wait() - print("Pipeline execution completed.") - print(f"Final status: {execution.describe()['PipelineExecutionStatus']}") + logging.info("Pipeline execution completed.") + logging.info("Final status: %s", + execution.describe()['PipelineExecutionStatus']) if __name__ == "__main__": diff --git a/sagemaker/pipeline/pipeline_parameters.py b/sagemaker/pipeline/pipeline_parameters.py index d353955cb6..2397126360 100644 --- a/sagemaker/pipeline/pipeline_parameters.py +++ b/sagemaker/pipeline/pipeline_parameters.py @@ -69,9 +69,16 @@ def __post_init__(self): assert ( self.cpu_instance_type ), "Need to provide a CPU instance type when training on CPU" + else: + assert ( + self.gpu_instance_type + ), "Need to provide a GPU instance type when training on GPU" if not self.graph_construction_instance_type: self.graph_construction_instance_type = self.cpu_instance_type + assert ( + self.cpu_instance_type + ), "Need to provide a CPU instance for graph construction." logging.warning( "No graph processing instance type specified, using the CPU instance type: %s", self.cpu_instance_type, @@ -106,7 +113,6 @@ def __post_init__(self): "Should not try to run both GConstruct and GSProcessing steps, " f"got job sequence: {self.jobs_to_run}" ) - # TODO: Should we allow running GConstruct with dist_part? # When running gsprocessing ensure we run dist_part as well if "gsprocessing" in self.jobs_to_run and "dist_part" not in self.jobs_to_run: @@ -254,15 +260,15 @@ def get_hash_hex(self): def __post_init__(self): if not self.instance_config.train_on_cpu: - assert self.instance_config.gpu_instance_type, ( - "GPU instance type must be specified if not training on CPU, " - f"got {self.instance_config.train_on_cpu=} " - f"{self.instance_config.gpu_instance_type=}" - ) - assert self.aws_config.graphstorm_pytorch_cpu_image_url, ( + assert self.aws_config.graphstorm_pytorch_gpu_image_url, ( "Must use provide GPU image when training on GPU. " "use --graphstorm-pytorch-gpu-image-url" ) + else: + assert self.aws_config.graphstorm_pytorch_cpu_image_url, ( + "Must use provide CPU image when training on CPU. " + "use --graphstorm-pytorch-cpu-image-url" + ) # Ensure we provide a GConstruct/GSProcessing config file when running construction if ( @@ -330,8 +336,8 @@ def __post_init__(self): # GConstruct uses 'metis', so just translate that if needed if ( + "gconstruct" in self.task_config.jobs_to_run and self.partition_config.partition_algorithm.lower() == "parmetis" - and "gconstruct" in self.task_config.jobs_to_run ): self.partition_config.partition_algorithm = "metis" @@ -497,7 +503,7 @@ def parse_pipeline_args() -> PipelineArgs: optional_args.add_argument( "--volume-size-gb", type=int, - help="Additional volume size for instances in GB.", + help="Additional volume size for SageMaker instances in GB.", default=100, ) @@ -526,7 +532,7 @@ def parse_pipeline_args() -> PipelineArgs: "--base-job-name", type=str, default="gs", - help="Base job name for SageMaker jobs. Default: 'sm'", + help="Base job name for SageMaker jobs. Default: 'gs'", ) required_args.add_argument( "--jobs-to-run", @@ -534,7 +540,7 @@ def parse_pipeline_args() -> PipelineArgs: required=True, help=( "Space-separated string of jobs to run in the pipeline, " - "e.g. 'gconstruct train inference'. " + "e.g. '--jobs-to-run gconstruct train inference'. " f"Should be one or more of: {list(JOB_ORDER.keys())}. Required. " ), )