diff --git a/megatron/__init__.py b/megatron/__init__.py index 09858d3b7c..d546646363 100644 --- a/megatron/__init__.py +++ b/megatron/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import torch +import deepspeed from .package_info import ( __description__, @@ -37,19 +38,19 @@ def print_rank_0(message): """If distributed is initialized, print only on rank 0.""" - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.is_initialized(): + if deepspeed.comm.get_rank() == 0: print(message, flush=True) else: print(message, flush=True) def is_last_rank(): - return torch.distributed.get_rank() == ( - torch.distributed.get_world_size() - 1) + return deepspeed.comm.get_rank() == ( + deepspeed.comm.get_world_size() - 1) def print_rank_last(message): """If distributed is initialized, print only on last rank.""" - if torch.distributed.is_initialized(): + if deepspeed.comm.is_initialized(): if is_last_rank(): print(message, flush=True) else: diff --git a/megatron/checkpointing.py b/megatron/checkpointing.py index 13259df4a8..cce7d613e3 100644 --- a/megatron/checkpointing.py +++ b/megatron/checkpointing.py @@ -21,6 +21,7 @@ import numpy as np import torch +import deepspeed from megatron import (get_args, mpu, @@ -118,7 +119,7 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler): print_rank_0('saving checkpoint at iteration {:7d} to {}'.format( iteration, args.save)) - if not torch.distributed.is_initialized() or mpu.get_data_parallel_rank() == 0 \ + if not deepspeed.comm.is_initialized() or mpu.get_data_parallel_rank() == 0 \ or args.deepspeed: # Arguments, iteration, and model. @@ -177,21 +178,21 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler): model[0].module.state_dict = original_state_dict # Wait so everyone is done (necessary) - if torch.distributed.is_initialized(): - torch.distributed.barrier() + if deepspeed.comm.is_initialized(): + deepspeed.comm.barrier() print_rank_0(' successfully saved checkpoint at iteration {:7d} to {}'.format( iteration, args.save)) # And update the latest iteration - if not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0: + if not deepspeed.comm.is_initialized() or deepspeed.comm.get_rank() == 0: tracker_filename = get_checkpoint_tracker_filename(args.save) with open(tracker_filename, 'w') as f: f.write(str(iteration)) # Wait so everyone is done (not necessary) - if torch.distributed.is_initialized(): - torch.distributed.barrier() + if deepspeed.comm.is_initialized(): + deepspeed.comm.barrier() def _transpose_first_dim(t, num_splits, num_splits_first, model): input_shape = t.size() @@ -419,8 +420,8 @@ def load_checkpoint(model, optimizer, lr_scheduler, load_arg='load', strict=True sys.exit() # Some utilities want to load a checkpoint without distributed being initialized - if torch.distributed.is_initialized(): - torch.distributed.barrier() + if deepspeed.comm.is_initialized(): + deepspeed.comm.barrier() print_rank_0(f' successfully loaded checkpoint from {args.load} ' f'at iteration {iteration}') @@ -448,7 +449,7 @@ def load_biencoder_checkpoint(model, only_query_model=False, checkpoint_name = get_checkpoint_name(load_path, iteration, False) if mpu.get_data_parallel_rank() == 0: print('global rank {} is loading checkpoint {}'.format( - torch.distributed.get_rank(), checkpoint_name)) + deepspeed.comm.get_rank(), checkpoint_name)) state_dict = torch.load(checkpoint_name, map_location='cpu') ret_state_dict = state_dict['model'] @@ -460,7 +461,7 @@ def load_biencoder_checkpoint(model, only_query_model=False, assert len(model) == 1 model[0].load_state_dict(ret_state_dict) - torch.distributed.barrier() + deepspeed.comm.barrier() if mpu.get_data_parallel_rank() == 0: print(' successfully loaded {}'.format(checkpoint_name)) diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index f7b3b961b8..2e34e5d754 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -3,6 +3,7 @@ import numpy as np import torch +import deepspeed from megatron import get_args, get_tokenizer, mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, \ @@ -157,7 +158,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo assert block_dataset.sizes.dtype == np.int32 # Build samples mapping - verbose = torch.distributed.get_rank() == 0 + verbose = deepspeed.comm.get_rank() == 0 start_time = time.time() print_rank_0(' > building samples index mapping for {} ...'.format( name)) @@ -188,8 +189,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo # device_index=rank which is not the case for model # parallel case counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( + deepspeed.comm.all_reduce(counts, group=mpu.get_data_parallel_group()) + assert counts[0].item() == deepspeed.comm.get_world_size( group=mpu.get_data_parallel_group()) # Load indexed dataset. diff --git a/megatron/data/blendable_dataset.py b/megatron/data/blendable_dataset.py index 5ba4b98aa4..7d9e072bf5 100644 --- a/megatron/data/blendable_dataset.py +++ b/megatron/data/blendable_dataset.py @@ -19,6 +19,7 @@ import numpy as np import torch +import deepspeed from megatron import print_rank_0 from megatron import mpu @@ -53,7 +54,7 @@ def __init__(self, datasets, weights): helpers.build_blending_indices(self.dataset_index, self.dataset_sample_index, weights, num_datasets, self.size, - torch.distributed.get_rank() == 0) + deepspeed.comm.get_rank() == 0) print_rank_0('> elapsed time for building blendable dataset indices: ' '{:.2f} (sec)'.format(time.time() - start_time)) diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 3052e9fdd3..a26197c135 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -25,6 +25,7 @@ import numpy as np import torch +import deepspeed from megatron import ( get_args, @@ -662,7 +663,7 @@ def get_samples_mapping(indexed_dataset, indexmap_filename += '.npy' # Build the indexed mapping if not exist. - if torch.distributed.get_rank() == 0 and \ + if deepspeed.comm.get_rank() == 0 and \ not os.path.isfile(indexmap_filename): print(' > WARNING: could not find index map file {}, building ' 'the indices on rank 0 ...'.format(indexmap_filename)) @@ -672,7 +673,7 @@ def get_samples_mapping(indexed_dataset, assert indexed_dataset.sizes.dtype == np.int32 # Build samples mapping - verbose = torch.distributed.get_rank() == 0 + verbose = deepspeed.comm.get_rank() == 0 start_time = time.time() print_rank_0(' > building sapmles index mapping for {} ...'.format( name)) @@ -700,11 +701,11 @@ def get_samples_mapping(indexed_dataset, # device_index=rank which is not the case for model # parallel case counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) + deepspeed.comm.all_reduce(counts, group=mpu.get_data_parallel_group()) + deepspeed.comm.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + deepspeed.comm.get_world_size() // + deepspeed.comm.get_world_size(group=mpu.get_tensor_model_parallel_group())) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index e6c64e975d..c91259332c 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -20,6 +20,7 @@ import numpy as np import torch +import deepspeed from megatron import mpu, print_rank_0 from megatron.data.blendable_dataset import BlendableDataset @@ -212,7 +213,7 @@ def _build_index_mappings(name, data_prefix, documents, sizes, shuffle_idx_filename = _filename + '_shuffle_idx.npy' # Build the indexed mapping if not exist. - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: if (not os.path.isfile(doc_idx_filename)) or \ (not os.path.isfile(sample_idx_filename)) or \ (not os.path.isfile(shuffle_idx_filename)): @@ -297,11 +298,11 @@ def _build_index_mappings(name, data_prefix, documents, sizes, # device_index=rank which is not the case for model # parallel case counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) + deepspeed.comm.all_reduce(counts, group=mpu.get_data_parallel_group()) + deepspeed.comm.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + deepspeed.comm.get_world_size() // + deepspeed.comm.get_world_size(group=mpu.get_tensor_model_parallel_group())) # Load mappings. start_time = time.time() diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index aecf5549a7..360d1ec60d 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -3,6 +3,7 @@ import numpy as np import torch +import deepspeed from megatron import mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, pad_and_convert_to_numpy @@ -147,7 +148,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo assert block_dataset.sizes.dtype == np.int32 # Build samples mapping - verbose = torch.distributed.get_rank() == 0 + verbose = deepspeed.comm.get_rank() == 0 start_time = time.time() print_rank_0(' > building samples index mapping for {} ...'.format( name)) @@ -178,8 +179,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo # device_index=rank which is not the case for model # parallel case counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( + deepspeed.comm.all_reduce(counts, group=mpu.get_data_parallel_group()) + assert counts[0].item() == deepspeed.comm.get_world_size( group=mpu.get_data_parallel_group()) # Load indexed dataset. diff --git a/megatron/global_vars.py b/megatron/global_vars.py index c486f0d6b1..571f23a5c5 100644 --- a/megatron/global_vars.py +++ b/megatron/global_vars.py @@ -20,6 +20,7 @@ import time import torch +import deepspeed from megatron.tokenizer import build_tokenizer from .arguments import parse_args @@ -254,9 +255,9 @@ def log(self, names, normalizer=1.0, reset=True): elapsed_time = self.timers[name].elapsed( reset=reset) * 1000.0 / normalizer string += ' | {}: {:.2f}'.format(name, elapsed_time) - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == ( - torch.distributed.get_world_size() - 1): + if deepspeed.comm.is_initialized(): + if deepspeed.comm.get_rank() == ( + deepspeed.comm.get_world_size() - 1): print(string, flush=True) else: print(string, flush=True) diff --git a/megatron/indexer.py b/megatron/indexer.py index c0d1ca7de1..a54abbf08b 100644 --- a/megatron/indexer.py +++ b/megatron/indexer.py @@ -1,6 +1,6 @@ import sys import torch -import torch.distributed as dist +import deepspeed.comm as dist from megatron import get_args from megatron import mpu @@ -112,7 +112,7 @@ def build_and_save_index(self): # This process signals to finalize its shard and then synchronize with # the other processes self.evidence_embedder_obj.save_shard() - torch.distributed.barrier() + deepspeed.comm.barrier() del self.model # rank 0 process builds the final copy @@ -124,4 +124,4 @@ def build_and_save_index(self): self.evidence_embedder_obj.clear() # complete building the final copy - torch.distributed.barrier() + deepspeed.comm.barrier() diff --git a/megatron/initialize.py b/megatron/initialize.py index 4264afa805..1d81250054 100644 --- a/megatron/initialize.py +++ b/megatron/initialize.py @@ -54,10 +54,10 @@ def initialize_megatron(extra_args_provider=None, args_defaults={}, args_defaults=args_defaults, ignore_unknown_args=ignore_unknown_args) - # torch.distributed initialization + # deepspeed.comm initialization def finish_mpu_init(): args = get_args() - # Pytorch distributed. + # Pydeepspeed.comm. _initialize_distributed() # Random seeds for reproducibility. @@ -100,7 +100,7 @@ def _compile_dependencies(): # Compile dataset C++ code. # ========================= # TODO: move this to ninja - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: start_time = time.time() print('> compiling dataset index builder ...') from megatron.data.dataset_utils import compile_helper @@ -131,20 +131,20 @@ def _compile_dependencies(): ' back to unfused kernel invocations.', flush=True) # Always build on rank zero first. - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: start_time = time.time() print('> compiling and loading fused kernels ...', flush=True) fused_kernels.load(args) - torch.distributed.barrier() + deepspeed.comm.barrier() else: - torch.distributed.barrier() + deepspeed.comm.barrier() fused_kernels.load(args) # Simple barrier to make sure all ranks have passed the # compilation phase successfully before moving on to the # rest of the program. We think this might ensure that # the lock is released. - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>>> done with compiling and loading fused kernels. ' 'Compilation time: {:.3f} seconds'.format( time.time() - start_time), flush=True) @@ -182,20 +182,20 @@ def setup_deepspeed_random_and_activation_checkpointing(args): def _initialize_distributed(): - """Initialize torch.distributed and mpu.""" + """Initialize deepspeed.comm and mpu.""" args = get_args() device_count = torch.cuda.device_count() - if torch.distributed.is_initialized(): + if deepspeed.comm.is_initialized(): if args.rank == 0: - print('torch distributed is already initialized, ' + print('deepspeed.comm is already initialized, ' 'skipping initialization ...', flush=True) - args.rank = torch.distributed.get_rank() - args.world_size = torch.distributed.get_world_size() + args.rank = deepspeed.comm.get_rank() + args.world_size = deepspeed.comm.get_world_size() else: if args.rank == 0: - print('> initializing torch distributed ...', flush=True) + print('> initializing deepspeed.comm ...', flush=True) # Manually set the device ids. if device_count > 0: device = args.rank % device_count @@ -238,9 +238,9 @@ def _init_autoresume(): """Set autoresume start time.""" autoresume = get_adlr_autoresume() if autoresume: - torch.distributed.barrier() + deepspeed.comm.barrier() autoresume.init() - torch.distributed.barrier() + deepspeed.comm.barrier() def _set_random_seed(seed_): diff --git a/megatron/memory.py b/megatron/memory.py index be5a117bcd..adae2bb57d 100644 --- a/megatron/memory.py +++ b/megatron/memory.py @@ -15,6 +15,7 @@ import torch +import deepspeed # A dictionary of all the memory buffers allocated. @@ -47,7 +48,7 @@ class MemoryBuffer: """ def __init__(self, name, numel, dtype, track_usage): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: element_size = torch.tensor([], dtype=dtype).element_size() print('> building the {} memory buffer with {} num elements ' 'and {} dtype ({:.1f} MB)...'.format( @@ -119,7 +120,7 @@ def print_average_usage(self): """Print memory usage average over time. We would like this value to be as high as possible.""" assert self.track_usage, 'You need to enable track usage.' - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print(' > usage of {} memory buffer: {:.2f} %'.format( self.name, self.in_use_value * 100.0 / self.total_value), flush=True) diff --git a/megatron/model/biencoder_model.py b/megatron/model/biencoder_model.py index 51ac0a060d..b92fac428e 100644 --- a/megatron/model/biencoder_model.py +++ b/megatron/model/biencoder_model.py @@ -1,5 +1,6 @@ import os import torch +import deepspeed import sys from megatron import get_args, print_rank_0 @@ -165,7 +166,7 @@ def init_state_dict_from_bert(self): checkpoint_name = get_checkpoint_name(args.bert_load, iteration, False) if mpu.get_data_parallel_rank() == 0: print('global rank {} is loading BERT checkpoint {}'.format( - torch.distributed.get_rank(), checkpoint_name)) + deepspeed.comm.get_rank(), checkpoint_name)) # Load the checkpoint. try: diff --git a/megatron/model/distributed.py b/megatron/model/distributed.py index 53d3362231..21eb5bca3c 100644 --- a/megatron/model/distributed.py +++ b/megatron/model/distributed.py @@ -18,6 +18,7 @@ import torch from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors +import deepspeed from megatron import get_args from megatron import mpu @@ -191,7 +192,7 @@ def allreduce_gradients(self): if self._grad_buffers is not None: for _, buffer_ in self._grad_buffers.items(): buffer_.data /= mpu.get_data_parallel_world_size() - torch.distributed.all_reduce( + deepspeed.comm.all_reduce( buffer_.data, group=mpu.get_data_parallel_group()) else: # Otherwise, bucketize and all-reduce @@ -211,7 +212,7 @@ def allreduce_gradients(self): grads = [param.grad.data for param in bucket] coalesced = _flatten_dense_tensors(grads) coalesced /= mpu.get_data_parallel_world_size() - torch.distributed.all_reduce( + deepspeed.comm.all_reduce( coalesced, group=mpu.get_data_parallel_group()) for buf, synced in zip(grads, _unflatten_dense_tensors( coalesced, grads)): diff --git a/megatron/model/language_model.py b/megatron/model/language_model.py index b2d3799b70..377505c6c6 100644 --- a/megatron/model/language_model.py +++ b/megatron/model/language_model.py @@ -166,7 +166,7 @@ def add_tokentype_embeddings(self, num_tokentypes): """ if self.tokentype_embeddings is not None: raise Exception('tokentype embeddings is already initialized') - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('adding embedding for {} tokentypes'.format(num_tokentypes), flush=True) self.num_tokentypes = num_tokentypes diff --git a/megatron/model/module.py b/megatron/model/module.py index df92d95a9f..1ca76db9e3 100644 --- a/megatron/model/module.py +++ b/megatron/model/module.py @@ -18,6 +18,7 @@ import torch from torch.autograd import Variable from torch.nn.parameter import Parameter +import deepspeed from megatron import get_args from megatron import mpu @@ -99,9 +100,9 @@ def initialize_word_embeddings(self, init_method_normal): # Ensure that first and last stages have the same initial parameter # values. - if torch.distributed.is_initialized(): + if deepspeed.comm.is_initialized(): if mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage(): - torch.distributed.all_reduce(self.word_embeddings_weight().data, + deepspeed.comm.all_reduce(self.word_embeddings_weight().data, group=mpu.get_embedding_group()) else: print("WARNING! Distributed processes aren't initialized, so " diff --git a/megatron/model/realm_model.py b/megatron/model/realm_model.py index 5730a85e36..460a5c44f0 100644 --- a/megatron/model/realm_model.py +++ b/megatron/model/realm_model.py @@ -1,5 +1,6 @@ import os import torch +import deepspeed from megatron import get_args, print_rank_0 from megatron.checkpointing import get_checkpoint_tracker_filename, get_checkpoint_name @@ -127,7 +128,7 @@ def init_state_dict_from_bert(self): checkpoint_name = get_checkpoint_name(args.bert_load, iteration, False) if mpu.get_data_parallel_rank() == 0: print('global rank {} is loading checkpoint {}'.format( - torch.distributed.get_rank(), checkpoint_name)) + deepspeed.comm.get_rank(), checkpoint_name)) try: state_dict = torch.load(checkpoint_name, map_location='cpu') diff --git a/megatron/mpu/cross_entropy.py b/megatron/mpu/cross_entropy.py index 8c790cd3e1..c816da5495 100644 --- a/megatron/mpu/cross_entropy.py +++ b/megatron/mpu/cross_entropy.py @@ -15,6 +15,7 @@ import torch +import deepspeed from .initialize import get_tensor_model_parallel_group from .initialize import get_tensor_model_parallel_rank @@ -29,8 +30,8 @@ def forward(ctx, vocab_parallel_logits, target): # Maximum value along vocab dimension across all GPUs. logits_max = torch.max(vocab_parallel_logits, dim=-1)[0] - torch.distributed.all_reduce(logits_max, - op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(logits_max, + op=deepspeed.comm.ReduceOp.MAX, group=get_tensor_model_parallel_group()) # Subtract the maximum value. vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1)) @@ -60,16 +61,16 @@ def forward(ctx, vocab_parallel_logits, target): predicted_logits = predicted_logits_1d.view_as(target) predicted_logits[target_mask] = 0.0 # All reduce is needed to get the chunks from other GPUs. - torch.distributed.all_reduce(predicted_logits, - op=torch.distributed.ReduceOp.SUM, + deepspeed.comm.all_reduce(predicted_logits, + op=deepspeed.comm.ReduceOp.SUM, group=get_tensor_model_parallel_group()) # Sum of exponential of logits along vocab dimension across all GPUs. exp_logits = vocab_parallel_logits torch.exp(vocab_parallel_logits, out=exp_logits) sum_exp_logits = exp_logits.sum(dim=-1) - torch.distributed.all_reduce(sum_exp_logits, - op=torch.distributed.ReduceOp.SUM, + deepspeed.comm.all_reduce(sum_exp_logits, + op=deepspeed.comm.ReduceOp.SUM, group=get_tensor_model_parallel_group()) # Loss = log(sum(exp(logits))) - predicted-logit. diff --git a/megatron/mpu/data.py b/megatron/mpu/data.py index dd57a8ffc0..ff1d00d5f9 100644 --- a/megatron/mpu/data.py +++ b/megatron/mpu/data.py @@ -14,6 +14,7 @@ # limitations under the License. import torch +import deepspeed from .initialize import get_tensor_model_parallel_group from .initialize import get_tensor_model_parallel_rank @@ -47,7 +48,7 @@ def _build_key_size_numel_dictionaries(keys, data): # Move to GPU and broadcast. sizes_cuda = torch.cuda.LongTensor(sizes) - torch.distributed.broadcast(sizes_cuda, get_tensor_model_parallel_src_rank(), + deepspeed.comm.broadcast(sizes_cuda, get_tensor_model_parallel_src_rank(), group=get_tensor_model_parallel_group()) # Move back to cpu and unpack. @@ -101,7 +102,7 @@ def broadcast_data(keys, data, datatype): dtype=datatype) # Broadcast - torch.distributed.broadcast(flatten_data, get_tensor_model_parallel_src_rank(), + deepspeed.comm.broadcast(flatten_data, get_tensor_model_parallel_src_rank(), group=get_tensor_model_parallel_group()) # Unpack diff --git a/megatron/mpu/initialize.py b/megatron/mpu/initialize.py index c24d1179ad..bfac834276 100644 --- a/megatron/mpu/initialize.py +++ b/megatron/mpu/initialize.py @@ -19,6 +19,7 @@ import torch from .utils import ensure_divisibility +import deepspeed # Intra-layer model parallel group that the current rank belongs to. @@ -76,14 +77,14 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box. """ - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> initializing tensor model parallel with size {}'.format( tensor_model_parallel_size_)) print('> initializing pipeline model parallel with size {}'.format( pipeline_model_parallel_size_)) # Get world size and rank. Ensure some consistencies. - assert torch.distributed.is_initialized() - world_size = torch.distributed.get_world_size() + assert deepspeed.comm.is_initialized() + world_size = deepspeed.comm.get_world_size() tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size) pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size) ensure_divisibility(world_size, @@ -101,7 +102,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0 _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_ - rank = torch.distributed.get_rank() + rank = deepspeed.comm.get_rank() # Build the data-parallel groups. global _DATA_PARALLEL_GROUP @@ -115,7 +116,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, ranks = range(start_rank + j, end_rank, tensor_model_parallel_size) all_data_parallel_group_ranks.append(list(ranks)) - group = torch.distributed.new_group(ranks) + group = deepspeed.comm.new_group(ranks) if rank in ranks: _DATA_PARALLEL_GROUP = group @@ -126,7 +127,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, for i in range(data_parallel_size): ranks = [data_parallel_group_ranks[i] for data_parallel_group_ranks in all_data_parallel_group_ranks] - group = torch.distributed.new_group(ranks) + group = deepspeed.comm.new_group(ranks) if rank in ranks: _MODEL_PARALLEL_GROUP = group @@ -137,7 +138,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, for i in range(num_tensor_model_parallel_groups): ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) - group = torch.distributed.new_group(ranks) + group = deepspeed.comm.new_group(ranks) if rank in ranks: _TENSOR_MODEL_PARALLEL_GROUP = group @@ -153,7 +154,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, for i in range(num_pipeline_model_parallel_groups): ranks = range(i, world_size, num_pipeline_model_parallel_groups) - group = torch.distributed.new_group(ranks) + group = deepspeed.comm.new_group(ranks) if rank in ranks: _PIPELINE_MODEL_PARALLEL_GROUP = group _PIPELINE_GLOBAL_RANKS = ranks @@ -163,7 +164,7 @@ def initialize_model_parallel(tensor_model_parallel_size_=1, embedding_ranks = [ranks[0], ranks[-1]] else: embedding_ranks = ranks - group = torch.distributed.new_group(embedding_ranks) + group = deepspeed.comm.new_group(embedding_ranks) if rank in embedding_ranks: _EMBEDDING_GROUP = group @@ -229,7 +230,7 @@ def get_tensor_model_parallel_world_size(): global _MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE if _MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE is not None: return _MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE - return torch.distributed.get_world_size(group=get_tensor_model_parallel_group()) + return deepspeed.comm.get_world_size(group=get_tensor_model_parallel_group()) def get_model_parallel_world_size(): @@ -242,7 +243,7 @@ def get_pipeline_model_parallel_world_size(): global _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE is not None: return _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE - return torch.distributed.get_world_size(group=get_pipeline_model_parallel_group()) + return deepspeed.comm.get_world_size(group=get_pipeline_model_parallel_group()) def set_tensor_model_parallel_rank(rank): @@ -262,7 +263,7 @@ def get_tensor_model_parallel_rank(): global _MPU_TENSOR_MODEL_PARALLEL_RANK if _MPU_TENSOR_MODEL_PARALLEL_RANK is not None: return _MPU_TENSOR_MODEL_PARALLEL_RANK - return torch.distributed.get_rank(group=get_tensor_model_parallel_group()) + return deepspeed.comm.get_rank(group=get_tensor_model_parallel_group()) def get_model_parallel_rank(): @@ -275,7 +276,7 @@ def get_pipeline_model_parallel_rank(): global _MPU_PIPELINE_MODEL_PARALLEL_RANK if _MPU_PIPELINE_MODEL_PARALLEL_RANK is not None: return _MPU_PIPELINE_MODEL_PARALLEL_RANK - return torch.distributed.get_rank(group=get_pipeline_model_parallel_group()) + return deepspeed.comm.get_rank(group=get_pipeline_model_parallel_group()) def is_pipeline_first_stage(ignore_virtual=False): @@ -321,7 +322,7 @@ def get_virtual_pipeline_model_parallel_world_size(): def get_tensor_model_parallel_src_rank(): """Calculate the global rank corresponding to the first local rank in the tensor model parallel group.""" - global_rank = torch.distributed.get_rank() + global_rank = deepspeed.comm.get_rank() local_world_size = get_tensor_model_parallel_world_size() return (global_rank // local_world_size) * local_world_size @@ -356,12 +357,12 @@ def get_pipeline_model_parallel_prev_rank(): def get_data_parallel_world_size(): """Return world size for the data parallel group.""" - return torch.distributed.get_world_size(group=get_data_parallel_group()) + return deepspeed.comm.get_world_size(group=get_data_parallel_group()) def get_data_parallel_rank(): """Return my rank for the data parallel group.""" - return torch.distributed.get_rank(group=get_data_parallel_group()) + return deepspeed.comm.get_rank(group=get_data_parallel_group()) def destroy_model_parallel(): diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index 821d9acfec..1c61362fac 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -14,6 +14,7 @@ # limitations under the License. import torch +import deepspeed from .initialize import get_tensor_model_parallel_group, get_tensor_model_parallel_world_size, get_tensor_model_parallel_rank from .utils import split_tensor_along_last_dim @@ -27,7 +28,7 @@ def _reduce(input_): return input_ # All-reduce. - torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group()) + deepspeed.comm.all_reduce(input_, group=get_tensor_model_parallel_group()) return input_ @@ -65,7 +66,7 @@ def _gather(input_): tensor_list = [torch.empty_like(input_) for _ in range(world_size)] tensor_list[rank] = input_ - torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) + deepspeed.comm.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) # Note: torch.cat already creates a contiguous tensor. output = torch.cat(tensor_list, dim=last_dim).contiguous() diff --git a/megatron/mpu/random.py b/megatron/mpu/random.py index a1c1a4c713..e8f4e87f68 100644 --- a/megatron/mpu/random.py +++ b/megatron/mpu/random.py @@ -23,6 +23,7 @@ from torch import _C from torch.cuda import _lazy_call, device as device_ctx_manager from torch.utils.checkpoint import detach_variable +import deepspeed from megatron import get_args from megatron.memory import allocate_mem_buff @@ -119,7 +120,7 @@ def gather_split_1d_tensor(tensor): device=torch.cuda.current_device(), requires_grad=False) chunks = [gathered[i*numel:(i+1)*numel] for i in range(world_size)] - torch.distributed.all_gather(chunks, tensor, + deepspeed.comm.all_gather(chunks, tensor, group=get_tensor_model_parallel_group()) return gathered @@ -228,11 +229,11 @@ def model_parallel_cuda_manual_seed(seed): # Data parallel gets the original seed. data_parallel_seed = seed - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> initializing model parallel cuda seeds on global rank {}, ' 'model parallel rank {}, and data parallel rank {} with ' 'model parallel seed: {} and data parallel seed: {}'.format( - torch.distributed.get_rank(), get_tensor_model_parallel_rank(), + deepspeed.comm.get_rank(), get_tensor_model_parallel_rank(), get_data_parallel_rank(), tensor_model_parallel_seed, data_parallel_seed), flush=True) _CUDA_RNG_STATE_TRACKER.reset() diff --git a/megatron/mpu/tests/commons.py b/megatron/mpu/tests/commons.py index 5e7a186728..f12d12d302 100644 --- a/megatron/mpu/tests/commons.py +++ b/megatron/mpu/tests/commons.py @@ -18,6 +18,7 @@ import random import numpy import torch +import deepspeed import mpu @@ -40,7 +41,7 @@ def set_random_seed(seed): def initialize_distributed(backend='nccl'): - """Initialize torch.distributed.""" + """Initialize deepspeed.comm.""" # Get local rank in case it is provided. parser = argparse.ArgumentParser() parser.add_argument('--local_rank', type=int, default=None, @@ -52,7 +53,7 @@ def initialize_distributed(backend='nccl'): rank = int(os.getenv('RANK', '0')) world_size = int(os.getenv("WORLD_SIZE", '1')) - print('> initializing torch.distributed with local rank: {}, ' + print('> initializing deepspeed.comm with local rank: {}, ' 'rank: {}, world size: {}'.format(local_rank, rank, world_size)) # Set the device id. @@ -66,7 +67,7 @@ def initialize_distributed(backend='nccl'): master_ip = os.getenv('MASTER_ADDR', 'localhost') master_port = os.getenv('MASTER_PORT', '6000') init_method += master_ip + ':' + master_port - torch.distributed.init_process_group( + deepspeed.comm.init_process_group( backend=backend, world_size=world_size, rank=rank, @@ -74,10 +75,10 @@ def initialize_distributed(backend='nccl'): def print_separator(message): - torch.distributed.barrier() + deepspeed.comm.barrier() filler_len = (78 - len(message)) // 2 filler = '-' * filler_len string = '\n' + filler + ' {} '.format(message) + filler - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print(string, flush=True) - torch.distributed.barrier() + deepspeed.comm.barrier() diff --git a/megatron/mpu/tests/test_cross_entropy.py b/megatron/mpu/tests/test_cross_entropy.py index 46d7ba981c..1db18e9ff7 100644 --- a/megatron/mpu/tests/test_cross_entropy.py +++ b/megatron/mpu/tests/test_cross_entropy.py @@ -21,6 +21,7 @@ import mpu import torch.nn.functional as F import torch +import deepspeed import random import sys sys.path.append("../..") @@ -57,7 +58,7 @@ def mpu_cross_entropy(batch_size, seq_length, vocab_size, def test_cross_entropy(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing cross entropy with model parallel size {} ...'. format(tensor_model_parallel_size)) @@ -80,26 +81,26 @@ def test_cross_entropy(tensor_model_parallel_size): error = loss_torch.sub_(loss_mpu).abs().max() print(' max error in loss on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 error = grad_torch.sub_(grad_mpu).abs().max() print(' max error in grad on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Reset groups mpu.destroy_tensor_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') if __name__ == '__main__': initialize_distributed() - world_size = torch.distributed.get_world_size() + world_size = deepspeed.comm.get_world_size() tensor_model_parallel_size = 1 while tensor_model_parallel_size <= world_size: diff --git a/megatron/mpu/tests/test_data.py b/megatron/mpu/tests/test_data.py index ae36277036..5d48885d6a 100644 --- a/megatron/mpu/tests/test_data.py +++ b/megatron/mpu/tests/test_data.py @@ -21,12 +21,13 @@ import functools import operator import sys +import deepspeed sys.path.append("../..") def test_broadcast_data(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing broadcast_data with model parallel size {} ...'. format(tensor_model_parallel_size)) @@ -71,15 +72,15 @@ def test_broadcast_data(tensor_model_parallel_size): # Reset groups mpu.destroy_tensor_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') if __name__ == '__main__': initialize_distributed() - world_size = torch.distributed.get_world_size() + world_size = deepspeed.comm.get_world_size() tensor_model_parallel_size = 1 while tensor_model_parallel_size <= world_size: diff --git a/megatron/mpu/tests/test_initialize.py b/megatron/mpu/tests/test_initialize.py index ba505b8d5c..67e1f1ab63 100644 --- a/megatron/mpu/tests/test_initialize.py +++ b/megatron/mpu/tests/test_initialize.py @@ -18,35 +18,36 @@ import mpu import torch import sys +import deepspeed sys.path.append("../..") def test_initialize_model_parallel(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing initialize_model_parallel with size {} ...'.format( tensor_model_parallel_size)) tensor_model_parallel_size_ = min(tensor_model_parallel_size, - torch.distributed.get_world_size()) + deepspeed.comm.get_world_size()) assert not mpu.model_parallel_is_initialized() mpu.initialize_model_parallel(tensor_model_parallel_size_) assert mpu.model_parallel_is_initialized() # Checks. def check(group, world_size, rank): - assert world_size == torch.distributed.get_world_size(group=group) - assert rank == torch.distributed.get_rank(group=group) + assert world_size == deepspeed.comm.get_world_size(group=group) + assert rank == deepspeed.comm.get_rank(group=group) # Model parallel. world_size = tensor_model_parallel_size_ - rank = torch.distributed.get_rank() % tensor_model_parallel_size_ + rank = deepspeed.comm.get_rank() % tensor_model_parallel_size_ assert world_size == mpu.get_tensor_model_parallel_world_size() assert rank == mpu.get_tensor_model_parallel_rank() check(mpu.get_tensor_model_parallel_group(), world_size, rank) # Data parallel. - world_size = torch.distributed.get_world_size() // tensor_model_parallel_size_ - rank = torch.distributed.get_rank() // tensor_model_parallel_size + world_size = deepspeed.comm.get_world_size() // tensor_model_parallel_size_ + rank = deepspeed.comm.get_rank() // tensor_model_parallel_size assert world_size == mpu.get_data_parallel_world_size() assert rank == mpu.get_data_parallel_rank() check(mpu.get_data_parallel_group(), world_size, rank) @@ -54,38 +55,38 @@ def check(group, world_size, rank): # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') def test_get_tensor_model_parallel_src_rank(tensor_model_parallel_size_): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing get_tensor_model_parallel_src_rank with size {} ...'.format( tensor_model_parallel_size_)) tensor_model_parallel_size = min(tensor_model_parallel_size_, - torch.distributed.get_world_size()) + deepspeed.comm.get_world_size()) assert not mpu.model_parallel_is_initialized() mpu.initialize_model_parallel(tensor_model_parallel_size) assert mpu.model_parallel_is_initialized() # Checks - src_rank = torch.distributed.get_rank() - mpu.get_tensor_model_parallel_rank() + src_rank = deepspeed.comm.get_rank() - mpu.get_tensor_model_parallel_rank() assert mpu.get_tensor_model_parallel_src_rank() == src_rank # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') if __name__ == '__main__': initialize_distributed() - world_size = torch.distributed.get_world_size() + world_size = deepspeed.comm.get_world_size() tensor_model_parallel_size = 1 while tensor_model_parallel_size <= world_size: print_separator('test initialize model parallel') diff --git a/megatron/mpu/tests/test_layers.py b/megatron/mpu/tests/test_layers.py index b12f48509b..e082e7cd6f 100644 --- a/megatron/mpu/tests/test_layers.py +++ b/megatron/mpu/tests/test_layers.py @@ -21,6 +21,7 @@ from torch.nn.parameter import Parameter import torch.nn.init as init import torch +import deepspeed import random import sys sys.path.append("../..") @@ -28,7 +29,7 @@ def test_parallel_embedding(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing parallel embedding with model parallel size {} ...'. format(tensor_model_parallel_size)) @@ -67,16 +68,16 @@ def test_parallel_embedding(tensor_model_parallel_size): loss_vocab_parallel = torch.mul(output, loss_weight).sum() loss_vocab_parallel.backward() - torch.distributed.barrier() + deepspeed.comm.barrier() error = loss_parallel.sub(loss_original).abs() print(' error in loss (parallel) on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-12, 'error: {}'.format(error) - torch.distributed.barrier() + deepspeed.comm.barrier() error = loss_vocab_parallel.sub(loss_original).abs() print(' error in loss (vocab parallel) on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-12, 'error: {}'.format(error) weight_grad_orig = torch.split(embedding_original.weight.grad, @@ -84,7 +85,7 @@ def test_parallel_embedding(tensor_model_parallel_size): 1)[mpu.get_tensor_model_parallel_rank()] error = embedding_parallel.weight.grad.sub(weight_grad_orig).abs().max() print(' error in grad (parallel) on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-12, 'error: {}'.format(error) weight_grad_orig = torch.split(embedding_original.weight.grad, @@ -93,21 +94,21 @@ def test_parallel_embedding(tensor_model_parallel_size): error = embedding_vocab_parallel.weight.grad.sub( weight_grad_orig).abs().max() print(' error in grad (vocab parallel) on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-12, 'error: {}'.format(error) # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') def test_initialize_affine_weight(tensor_model_parallel_size): mpu.initialize_model_parallel(tensor_model_parallel_size) - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing initialize_affine_weight with model parallel ' 'size: {}'.format(tensor_model_parallel_size)) tensor_model_parallel_size = mpu.get_tensor_model_parallel_world_size() @@ -137,9 +138,9 @@ def test_initialize_affine_weight(tensor_model_parallel_size): # Compare. error = weight.sub(my_weight).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' column parallel max error (should be zero) on global rank ' - '{}: {}'.format(torch.distributed.get_rank(), error)) + '{}: {}'.format(deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # ------------ @@ -160,16 +161,16 @@ def test_initialize_affine_weight(tensor_model_parallel_size): # Compare. error = weight.sub(my_weight).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' row parallel max error (should be zero) on global rank ' - '{}: {}'.format(torch.distributed.get_rank(), error)) + '{}: {}'.format(deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print(' >> passed the test :-)') @@ -186,7 +187,7 @@ def forward(self): def test_column_parallel_linear(tensor_model_parallel_size): mpu.initialize_model_parallel(tensor_model_parallel_size) - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing ColumnParallelLinear with model parallel ' 'size: {}'.format(tensor_model_parallel_size)) tensor_model_parallel_size = mpu.get_tensor_model_parallel_world_size() @@ -223,37 +224,37 @@ def test_column_parallel_linear(tensor_model_parallel_size): my_dLdA = torch.split(dLdA, output_size_coeff, dim=0)[rank].contiguous().clone() error = my_dLdA.sub(linear_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdA on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 my_dLdb = torch.split(dLdb, output_size_coeff, dim=0)[rank].contiguous().clone() error = my_dLdb.sub(linear_layer.bias.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdb on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 error = dLdX.sub(identity_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdX on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print(' >> passed the test :-)') def test_row_parallel_linear(tensor_model_parallel_size): mpu.initialize_model_parallel(tensor_model_parallel_size) - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing RowParallelLinear with model parallel ' 'size: {}'.format(tensor_model_parallel_size)) tensor_model_parallel_size = mpu.get_tensor_model_parallel_world_size() @@ -290,28 +291,28 @@ def test_row_parallel_linear(tensor_model_parallel_size): my_dLdA = torch.split(dLdA, input_size_coeff, dim=1)[rank].contiguous().clone() error = my_dLdA.sub(linear_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdA on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 error = dLdb.sub(linear_layer.bias.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdb on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 error = dLdX.sub(identity_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' error in dLdX on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print(' >> passed the test :-)') @@ -335,7 +336,7 @@ def parallel_self_attention(tensor_model_parallel_size, num_att_heads_per_partit set_random_seed(seed) num_att_heads = num_att_heads_per_partition * \ - torch.distributed.get_world_size() + deepspeed.comm.get_world_size() hidden_size = hidden_size_per_att_head * num_att_heads # Network @@ -360,7 +361,7 @@ def parallel_self_attention(tensor_model_parallel_size, num_att_heads_per_partit def test_parallel_self_attention(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing ParallelSelfAttention with model parallel ' 'size: {}'.format(tensor_model_parallel_size)) @@ -382,9 +383,9 @@ def test_parallel_self_attention(tensor_model_parallel_size): assert hideen_size_1 == hidden_size error = loss_1.sub(loss).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' loss error on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 5.0e-6 my_lin_grad_list = torch.split( @@ -393,20 +394,20 @@ def test_parallel_self_attention(tensor_model_parallel_size): my_lin_grad = torch.cat(my_lin_grad_list, dim=0) error = my_lin_grad.sub( attention_layer.query_key_value.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' weight gradient error on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 5.0e-6 error = identity_layer_1.weight.grad.sub( identity_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' input gradient error on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 5.0e-6 - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print(' >> passed the test :-)') @@ -420,7 +421,7 @@ def parallel_transformer(tensor_model_parallel_size, num_att_heads_per_partition set_random_seed(seed) num_att_heads = num_att_heads_per_partition * \ - torch.distributed.get_world_size() + deepspeed.comm.get_world_size() hidden_size = hidden_size_per_att_head * num_att_heads intermediate_size = 4 * hidden_size @@ -448,7 +449,7 @@ def parallel_transformer(tensor_model_parallel_size, num_att_heads_per_partition def test_parallel_transformer_layer(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing ParallelTransformerLayer with model parallel ' 'size: {}'.format(tensor_model_parallel_size)) @@ -468,20 +469,20 @@ def test_parallel_transformer_layer(tensor_model_parallel_size): hidden_size_per_att_head, batch_size, sequence_length) error = loss_1.sub(loss).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' loss error on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 5.0e-5, 'error: {}'.format(error) error = identity_layer_1.weight.grad.sub( identity_layer.weight.grad).abs().max() - torch.distributed.barrier() + deepspeed.comm.barrier() print(' input gradient error on global rank {}: {}'.format( - torch.distributed.get_rank(), error)) + deepspeed.comm.get_rank(), error)) assert error < 5.0e-5, 'error: {}'.format(error) - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print(' >> passed the test :-)') @@ -491,7 +492,7 @@ def test_parallel_transformer_layer(tensor_model_parallel_size): torch.backends.cudnn.benchmark = False initialize_distributed() - world_size = torch.distributed.get_world_size() + world_size = deepspeed.comm.get_world_size() print_separator('test initialize affine weight') tensor_model_parallel_size = 1 diff --git a/megatron/mpu/tests/test_random.py b/megatron/mpu/tests/test_random.py index 9c9c503410..1820704092 100644 --- a/megatron/mpu/tests/test_random.py +++ b/megatron/mpu/tests/test_random.py @@ -17,13 +17,14 @@ from commons import initialize_distributed import mpu import torch +import deepspeed import sys sys.path.append("../..") def test_set_cuda_rng_state(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing set_rng_state with size {} ...'. format(tensor_model_parallel_size)) @@ -51,7 +52,7 @@ def test_set_cuda_rng_state(tensor_model_parallel_size): new_rng_state = torch.cuda.get_rng_state() max_diff = new_rng_state.sub(rng_state).max() print(' max diff in rng state (should be non-zero) on global rank {}: {}'. - format(torch.distributed.get_rank(), max_diff)) + format(deepspeed.comm.get_rank(), max_diff)) assert max_diff > 0 # Reset the rng state and do the same stuff. @@ -66,26 +67,26 @@ def test_set_cuda_rng_state(tensor_model_parallel_size): # Results should be the same error = result_2.sub(result_1).abs().max() print(' max error in generated tensors (should be zero) on ' - 'global rank {}: {}'.format(torch.distributed.get_rank(), error)) + 'global rank {}: {}'.format(deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Input state should have remained intact. error = rng_state.sub(rng_state_copy).max() print(' max error in rng state (should be zero) on global rank {}: {}'. - format(torch.distributed.get_rank(), error)) + format(deepspeed.comm.get_rank(), error)) assert error == 0 # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') def test_cuda_rng_tracker(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing cuda rng tracker with size {} ...'. format(tensor_model_parallel_size)) @@ -133,14 +134,14 @@ def test_cuda_rng_tracker(tensor_model_parallel_size): diff = result_11.sub(result_21).abs().max() diff = min(diff, result_12.sub(result_22).abs().max()) print(' max diff in generated tensors (should be non-zero) on ' - 'global rank {}: {}'.format(torch.distributed.get_rank(), diff)) + 'global rank {}: {}'.format(deepspeed.comm.get_rank(), diff)) assert diff > 1.0e-6 error = max(result_11.sub(target_11).abs().max(), result_12.sub(target_12).abs().max()) error = max(error, result_21.sub(target_21).abs().max()) error = max(error, result_22.sub(target_22).abs().max()) print(' max error in generated tensors (should be zero) on ' - 'global rank {}: {}'.format(torch.distributed.get_rank(), error)) + 'global rank {}: {}'.format(deepspeed.comm.get_rank(), error)) assert error < 1.0e-6 # Reset the tracker @@ -149,14 +150,14 @@ def test_cuda_rng_tracker(tensor_model_parallel_size): # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') def test_model_parallel_cuda_manual_seed(tensor_model_parallel_size): - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print('> testing model parallel cuda manual seed with size {} ...'. format(tensor_model_parallel_size)) @@ -175,15 +176,15 @@ def test_model_parallel_cuda_manual_seed(tensor_model_parallel_size): # Reset groups mpu.destroy_model_parallel() - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: + deepspeed.comm.barrier() + if deepspeed.comm.get_rank() == 0: print('>> passed the test :-)') if __name__ == '__main__': initialize_distributed() - world_size = torch.distributed.get_world_size() + world_size = deepspeed.comm.get_world_size() tensor_model_parallel_size = 1 while tensor_model_parallel_size <= world_size: diff --git a/megatron/optimizer/clip_grads.py b/megatron/optimizer/clip_grads.py index 036a1d4c4c..69322ab002 100644 --- a/megatron/optimizer/clip_grads.py +++ b/megatron/optimizer/clip_grads.py @@ -17,6 +17,7 @@ import torch from torch._six import inf +import deepspeed from apex.multi_tensor_apply import multi_tensor_applier import amp_C @@ -76,8 +77,8 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): total_norm = max(grad.abs().max() for grad in grads_for_norm) total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)]) # Take max across all model-parallel GPUs. - torch.distributed.all_reduce(total_norm_cuda, - op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(total_norm_cuda, + op=deepspeed.comm.ReduceOp.MAX, group=mpu.get_model_parallel_group()) total_norm = total_norm_cuda[0].item() @@ -103,8 +104,8 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): total_norm += grad_norm ** norm_type # Sum across all model-parallel GPUs. - torch.distributed.all_reduce(total_norm, - op=torch.distributed.ReduceOp.SUM, + deepspeed.comm.all_reduce(total_norm, + op=deepspeed.comm.ReduceOp.SUM, group=mpu.get_model_parallel_group()) total_norm = total_norm.item() ** (1.0 / norm_type) @@ -140,8 +141,8 @@ def count_zeros_fp32(parameters): total_num_zeros = num_zeros + total_num_zeros # Sum across all model-parallel GPUs. - torch.distributed.all_reduce(total_num_zeros, - op=torch.distributed.ReduceOp.SUM, + deepspeed.comm.all_reduce(total_num_zeros, + op=deepspeed.comm.ReduceOp.SUM, group=mpu.get_model_parallel_group()) total_num_zeros = total_num_zeros.item() diff --git a/megatron/optimizer/optimizer.py b/megatron/optimizer/optimizer.py index 77baddd62a..2091ddaf23 100644 --- a/megatron/optimizer/optimizer.py +++ b/megatron/optimizer/optimizer.py @@ -19,6 +19,7 @@ from abc import abstractmethod import torch +import deepspeed from apex.multi_tensor_apply import multi_tensor_applier import amp_C @@ -330,8 +331,8 @@ def _unscale_main_grads_and_check_for_nan(self): torch._amp_foreach_non_finite_check_and_unscale_( main_grads, self.found_inf, self.grad_scaler.inv_scale) # Update across all model parallel instances. - torch.distributed.all_reduce(self.found_inf, - op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(self.found_inf, + op=deepspeed.comm.ReduceOp.MAX, group=mpu.get_model_parallel_group()) # Check for nan. diff --git a/megatron/p2p_communication.py b/megatron/p2p_communication.py index 863a60b0ad..5a00024aa0 100644 --- a/megatron/p2p_communication.py +++ b/megatron/p2p_communication.py @@ -35,7 +35,7 @@ def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next, previous rank. recv_next: boolean for whether tensor should be received from next rank. - use_ring_exchange: boolean for whether torch.distributed.ring_exchange() + use_ring_exchange: boolean for whether deepspeed.comm.ring_exchange() API should be used. Returns: diff --git a/megatron/text_generation_utils.py b/megatron/text_generation_utils.py index 7e81e5fae2..f4ebb69b0c 100644 --- a/megatron/text_generation_utils.py +++ b/megatron/text_generation_utils.py @@ -22,6 +22,7 @@ import torch import torch.nn.functional as F +import deepspeed from megatron import get_args from megatron import get_tokenizer @@ -138,7 +139,7 @@ def generate_samples_input_from_file(model): input_info = [terminate_runs, raw_text_len, context_length] input_info_tensor = torch.cuda.LongTensor(input_info) - torch.distributed.all_reduce(input_info_tensor, + deepspeed.comm.all_reduce(input_info_tensor, group=mpu.get_model_parallel_group()) terminate_runs = input_info_tensor[0].item() raw_text_len = input_info_tensor[1].item() @@ -155,14 +156,14 @@ def generate_samples_input_from_file(model): src = mpu.get_pipeline_model_parallel_first_rank() group = mpu.get_pipeline_model_parallel_group() context_tokens_tensor = torch.cuda.LongTensor(context_tokens) - torch.distributed.broadcast(context_tokens_tensor, src, group) + deepspeed.comm.broadcast(context_tokens_tensor, src, group) else: src = mpu.get_pipeline_model_parallel_first_rank() group = mpu.get_pipeline_model_parallel_group() context_tokens_tensor = torch.empty(context_length, dtype=torch.int64, device=torch.device("cuda")) - torch.distributed.broadcast(context_tokens_tensor, src, group) + deepspeed.comm.broadcast(context_tokens_tensor, src, group) context_tokens = context_tokens_tensor.cpu().numpy().tolist() token_stream = get_token_stream(model, [context_tokens]) @@ -260,7 +261,7 @@ def generate_samples_interactive(model, print_frequency=24): input_info = [terminate_runs, raw_text_len, context_length] input_info_tensor = torch.cuda.LongTensor(input_info) - torch.distributed.all_reduce(input_info_tensor, + deepspeed.comm.all_reduce(input_info_tensor, group=mpu.get_model_parallel_group()) terminate_runs = input_info_tensor[0].item() raw_text_len = input_info_tensor[1].item() @@ -277,14 +278,14 @@ def generate_samples_interactive(model, print_frequency=24): src = mpu.get_pipeline_model_parallel_first_rank() group = mpu.get_pipeline_model_parallel_group() context_tokens_tensor = torch.cuda.LongTensor(context_tokens) - torch.distributed.broadcast(context_tokens_tensor, src, group) + deepspeed.comm.broadcast(context_tokens_tensor, src, group) else: src = mpu.get_pipeline_model_parallel_first_rank() group = mpu.get_pipeline_model_parallel_group() context_tokens_tensor = torch.empty(context_length, dtype=torch.int64, device=torch.device("cuda")) - torch.distributed.broadcast(context_tokens_tensor, src, group) + deepspeed.comm.broadcast(context_tokens_tensor, src, group) context_tokens = context_tokens_tensor.cpu().numpy().tolist() token_stream = get_token_stream(model, [context_tokens]) @@ -403,10 +404,10 @@ def get_token_stream(model, context_tokens, model_latencies=[], single_token_lat context_tokens_tensor = torch.cuda.LongTensor(context_tokens) context_length_tensor = torch.cuda.LongTensor(context_lengths) - torch.distributed.broadcast(context_length_tensor, + deepspeed.comm.broadcast(context_length_tensor, mpu.get_tensor_model_parallel_src_rank(), group=mpu.get_tensor_model_parallel_group()) - torch.distributed.broadcast(context_tokens_tensor, + deepspeed.comm.broadcast(context_tokens_tensor, mpu.get_tensor_model_parallel_src_rank(), group=mpu.get_tensor_model_parallel_group()) @@ -571,7 +572,7 @@ def sample_sequence_batch(model, context_tokens, context_lengths, tokens[:, context_length] = new_tokens src = mpu.get_pipeline_model_parallel_last_rank() group = mpu.get_embedding_group() - torch.distributed.broadcast(new_tokens, src, group) + deepspeed.comm.broadcast(new_tokens, src, group) done_token = (prev == eos_id).byte() & started.byte() just_finished = (done_token & ~is_done).bool() @@ -581,7 +582,7 @@ def sample_sequence_batch(model, context_tokens, context_lengths, done = torch.all(is_done) src = mpu.get_pipeline_model_parallel_last_rank() group = mpu.get_pipeline_model_parallel_group() - torch.distributed.broadcast(done, src, group) + deepspeed.comm.broadcast(done, src, group) yield tokens, lengths else: @@ -589,7 +590,7 @@ def sample_sequence_batch(model, context_tokens, context_lengths, src = mpu.get_pipeline_model_parallel_last_rank() group = mpu.get_embedding_group() new_tokens = torch.empty_like(tokens[:, context_length]) - torch.distributed.broadcast(new_tokens, src, group) + deepspeed.comm.broadcast(new_tokens, src, group) tokens[:, context_length] = new_tokens yield tokens, None else: @@ -598,7 +599,7 @@ def sample_sequence_batch(model, context_tokens, context_lengths, done = torch.cuda.ByteTensor([0]) src = mpu.get_pipeline_model_parallel_last_rank() group = mpu.get_pipeline_model_parallel_group() - torch.distributed.broadcast(done, src, group) + deepspeed.comm.broadcast(done, src, group) context_length += 1 counter += 1 diff --git a/megatron/training.py b/megatron/training.py index ba090b08e9..da7a0cad07 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -58,7 +58,7 @@ def print_datetime(string): """Note that this call will sync across all ranks.""" - torch.distributed.barrier() + deepspeed.comm.barrier() time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print_rank_0('[' + string + '] datetime: {} '.format(time_str)) @@ -101,8 +101,8 @@ def pretrain(train_valid_test_dataset_provider, # image ... launches. global _TRAIN_START_TIME start_time_tensor = torch.cuda.FloatTensor([_TRAIN_START_TIME]) - torch.distributed.all_reduce(start_time_tensor, - op=torch.distributed.ReduceOp.MIN) + deepspeed.comm.all_reduce(start_time_tensor, + op=deepspeed.comm.ReduceOp.MIN) _TRAIN_START_TIME = start_time_tensor.item() print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) @@ -437,13 +437,13 @@ def setup_model_and_optimizer(model_provider_func, teacher=False): timers = get_timers() # Extra barrier is added to make sure all ranks report the # max time. - torch.distributed.barrier() + deepspeed.comm.barrier() timers('load-checkpoint').start() if args.mos: args.iteration = load_checkpoint(model, optimizer, lr_scheduler, strict=False, load_only_weights=False) else: args.iteration = load_checkpoint(model, optimizer, lr_scheduler) - torch.distributed.barrier() + deepspeed.comm.barrier() timers('load-checkpoint').stop() timers.log(['load-checkpoint']) else: @@ -529,7 +529,7 @@ def train_step(forward_step_func, data_iterator, grad = word_embeddings_weight.main_grad else: grad = word_embeddings_weight.grad - torch.distributed.all_reduce(grad, group=mpu.get_embedding_group()) + deepspeed.comm.all_reduce(grad, group=mpu.get_embedding_group()) timers('backward-embedding-all-reduce').stop() # Update parameters. @@ -718,30 +718,30 @@ def add_to_logging(name): opt_stats_2[1] = max(opt_stats_2[1], optimizer.state[param]['exp_avg_sq'].sqrt().abs_().max().item()) opt_stats_2[2] = max(opt_stats_2[2], abs(optimizer.state[param]['exp_avg'].max().item()), abs(optimizer.state[param]['exp_avg'].min().item())) opt_stats_2[3] = max(opt_stats_2[3], abs(param.max().item()), abs(param.min().item())) - # print('step {} rank {} before sync opt_stats {}, {}'.format(iteration, torch.distributed.get_rank(), opt_stats_2, opt_stats)) + # print('step {} rank {} before sync opt_stats {}, {}'.format(iteration, deepspeed.comm.get_rank(), opt_stats_2, opt_stats)) if args.zero_stage > 0: # ZeRO partiions optimizer states opt_stats = torch.cuda.FloatTensor(opt_stats) - torch.distributed.all_reduce(opt_stats, group=mpu.get_data_parallel_group()) + deepspeed.comm.all_reduce(opt_stats, group=mpu.get_data_parallel_group()) opt_stats_2 = torch.cuda.FloatTensor(opt_stats_2) - torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(opt_stats_2, op=deepspeed.comm.ReduceOp.MAX, group=mpu.get_data_parallel_group()) if args.tensor_model_parallel_size > 1: opt_stats = torch.cuda.FloatTensor(opt_stats) - torch.distributed.all_reduce(opt_stats, group=mpu.get_tensor_model_parallel_group()) + deepspeed.comm.all_reduce(opt_stats, group=mpu.get_tensor_model_parallel_group()) opt_stats_2 = torch.cuda.FloatTensor(opt_stats_2) - torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(opt_stats_2, op=deepspeed.comm.ReduceOp.MAX, group=mpu.get_tensor_model_parallel_group()) if args.pipeline_model_parallel_size > 1: opt_stats = torch.cuda.FloatTensor(opt_stats) - torch.distributed.all_reduce(opt_stats, group=mpu.get_pipeline_model_parallel_group()) + deepspeed.comm.all_reduce(opt_stats, group=mpu.get_pipeline_model_parallel_group()) opt_stats_2 = torch.cuda.FloatTensor(opt_stats_2) - torch.distributed.all_reduce(opt_stats_2, op=torch.distributed.ReduceOp.MAX, + deepspeed.comm.all_reduce(opt_stats_2, op=deepspeed.comm.ReduceOp.MAX, group=mpu.get_pipeline_model_parallel_group()) - # print('step {} rank {} after sync opt_stats {}, {}'.format(iteration, torch.distributed.get_rank(), opt_stats_2, opt_stats)) + # print('step {} rank {} after sync opt_stats {}, {}'.format(iteration, deepspeed.comm.get_rank(), opt_stats_2, opt_stats)) if writer and is_last_rank(): writer.add_scalar('optimizer/variance_l2 vs tokens', opt_stats[0]**0.5, args.consumed_train_tokens) writer.add_scalar('optimizer/variance_sqrt_l2 vs tokens', opt_stats[1]**0.5, args.consumed_train_tokens) @@ -830,10 +830,10 @@ def save_checkpoint_and_time(iteration, model, optimizer, lr_scheduler): timers = get_timers() # Extra barrier is added to make sure # all ranks report the max time. - torch.distributed.barrier() + deepspeed.comm.barrier() timers('save-checkpoint').start() save_checkpoint(iteration, model, optimizer, lr_scheduler) - torch.distributed.barrier() + deepspeed.comm.barrier() timers('save-checkpoint').stop() timers.log(['save-checkpoint']) @@ -933,8 +933,8 @@ def train(forward_step_func, model, optimizer, lr_scheduler, train_time = (time.time() - _TRAIN_START_TIME) / 60.0 done_cuda = torch.cuda.IntTensor( [train_time > args.exit_duration_in_mins]) - torch.distributed.all_reduce( - done_cuda, op=torch.distributed.ReduceOp.MAX) + deepspeed.comm.all_reduce( + done_cuda, op=deepspeed.comm.ReduceOp.MAX) done = done_cuda.item() if done: if not saved_checkpoint: @@ -948,7 +948,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler, if not saved_checkpoint: save_checkpoint_and_time(iteration, model, optimizer, lr_scheduler) - torch.distributed.barrier() + deepspeed.comm.barrier() print_datetime('exiting program at iteration {}'.format(iteration)) sys.exit() @@ -1132,7 +1132,7 @@ def build_train_valid_test_data_iterators( flags = torch.cuda.LongTensor([0, 0, 0]) # Broadcast num tokens. - torch.distributed.broadcast(flags, + deepspeed.comm.broadcast(flags, mpu.get_tensor_model_parallel_src_rank(), group=mpu.get_tensor_model_parallel_group()) args.do_train = flags[0].item() diff --git a/megatron/utils.py b/megatron/utils.py index ed7047c952..32deb7f432 100644 --- a/megatron/utils.py +++ b/megatron/utils.py @@ -18,6 +18,7 @@ import sys import torch +import deepspeed from torch.nn.parallel import DistributedDataParallel as torchDDP from apex.multi_tensor_apply import multi_tensor_applier @@ -72,8 +73,8 @@ def calc_params_l2_norm(model): ) norm_2 = norm * norm # Sum across all model-parallel GPUs. - torch.distributed.all_reduce(norm_2, - op=torch.distributed.ReduceOp.SUM, + deepspeed.comm.all_reduce(norm_2, + op=deepspeed.comm.ReduceOp.SUM, group=mpu.get_model_parallel_group()) return norm_2.item() ** 0.5 @@ -82,10 +83,10 @@ def average_losses_across_data_parallel_group(losses): """Reduce a tensor of losses across all GPUs.""" averaged_losses = torch.cat( [loss.clone().detach().view(1) for loss in losses]) - torch.distributed.all_reduce(averaged_losses, + deepspeed.comm.all_reduce(averaged_losses, group=mpu.get_data_parallel_group()) averaged_losses = averaged_losses / \ - torch.distributed.get_world_size(group=mpu.get_data_parallel_group()) + deepspeed.comm.get_world_size(group=mpu.get_data_parallel_group()) return averaged_losses @@ -103,14 +104,14 @@ def report_memory(name): string += ' | max reserved: {}'.format( torch.cuda.max_memory_reserved() / mega_bytes) if mpu.get_data_parallel_rank() == 0: - print("[Rank {}] {}".format(torch.distributed.get_rank(), string), + print("[Rank {}] {}".format(deepspeed.comm.get_rank(), string), flush=True) def print_params_min_max_norm(optimizer, iteration): """Print min, max, and norm of all parameters.""" index = 0 - rank = torch.distributed.get_rank() + rank = deepspeed.comm.get_rank() string = 'iteration, rank, index, tensor-model-parallel, min, max, norm\n' optimizer_ = optimizer.optimizer for param_group in optimizer_.param_groups: @@ -133,12 +134,12 @@ def check_adlr_autoresume_termination(iteration, model, args = get_args() autoresume = get_adlr_autoresume() # Add barrier to ensure consistnecy. - torch.distributed.barrier() + deepspeed.comm.barrier() if autoresume.termination_requested(): if args.save: save_checkpoint(iteration, model, optimizer, lr_scheduler) print_rank_0(">>> autoresume termination request found!") - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: autoresume.request_resume() print_rank_0(">>> training terminated. Returning") sys.exit(0) @@ -205,7 +206,7 @@ def get_ltor_masks_and_position_ids(data, def get_parameters_in_billions(model): - gpus_per_model = torch.distributed.get_world_size(group=mpu.get_model_parallel_group()) + gpus_per_model = deepspeed.comm.get_world_size(group=mpu.get_model_parallel_group()) approx_parameters_in_billions = sum([sum([p.ds_numel if hasattr(p,'ds_id') else p.nelement() for p in model_module.parameters()]) for model_module in model]) @@ -215,7 +216,7 @@ def get_parameters_in_billions(model): def flops_calculator(model, args, iteration_time): return # currently broken - gpus_per_model = torch.distributed.get_world_size(group = mpu.get_model_parallel_group()) + gpus_per_model = deepspeed.comm.get_world_size(group = mpu.get_model_parallel_group()) approx_parameters_in_billions = get_parameters_in_billions(model) diff --git a/pretrain_ict.py b/pretrain_ict.py index 1438b3d578..b6fd6f51f9 100644 --- a/pretrain_ict.py +++ b/pretrain_ict.py @@ -17,7 +17,7 @@ import math import torch -import torch.distributed as dist +import deepspeed.comm as dist import torch.nn.functional as F from megatron import get_args @@ -43,8 +43,8 @@ def pretrain_ict_model_provider(): def get_group_world_size_rank(): group = mpu.get_data_parallel_group() - rank = torch.distributed.get_rank(group=group) - world_size = torch.distributed.get_world_size(group=group) + rank = deepspeed.comm.get_rank(group=group) + world_size = deepspeed.comm.get_world_size(group=group) return group, rank, world_size @@ -58,7 +58,7 @@ def forward(ctx, input_): tensor_list = [torch.empty_like(input_) for _ in range(world_size)] tensor_list[rank] = input_ - torch.distributed.all_gather(tensor_list, input_, group=group) + deepspeed.comm.all_gather(tensor_list, input_, group=group) output = torch.cat(tensor_list, dim=0).contiguous() diff --git a/tasks/eval_harness/evaluate.py b/tasks/eval_harness/evaluate.py index e5a8521871..fe5a3d6d17 100644 --- a/tasks/eval_harness/evaluate.py +++ b/tasks/eval_harness/evaluate.py @@ -20,6 +20,7 @@ import time import torch +import deepspeed from megatron import get_args from megatron import print_rank_0 from megatron import get_tokenizer @@ -346,7 +347,7 @@ def load_ds_checkpoint_and_setup_megatron(extra_args_provider): megatron.global_vars._GLOBAL_ARGS = args initialize_megatron() - torch.distributed.barrier() + deepspeed.comm.barrier() # Initializing megatron will update eg. tokenizer size. Override again. override_args(args, cp_args, skip_keys, skip_if_specified) @@ -384,7 +385,7 @@ def load_ds_checkpoint_and_setup_megatron(extra_args_provider): if args.eval_fp32: model = model.float() - torch.distributed.barrier() + deepspeed.comm.barrier() return model def tasks_args(parser): diff --git a/tasks/eval_utils.py b/tasks/eval_utils.py index 7549f4a094..933c4d629c 100644 --- a/tasks/eval_utils.py +++ b/tasks/eval_utils.py @@ -20,6 +20,7 @@ from functools import partial import torch +import deepspeed from megatron import get_args from megatron import print_rank_last, is_last_rank @@ -173,7 +174,7 @@ def correct_answers_forward_step(batch, model): # Reduce. if mpu.is_pipeline_last_stage(): unreduced = torch.cuda.LongTensor([correct, total]) - torch.distributed.all_reduce(unreduced, + deepspeed.comm.all_reduce(unreduced, group=mpu.get_data_parallel_group()) # Print on screen. diff --git a/tasks/orqa/evaluate_utils.py b/tasks/orqa/evaluate_utils.py index ebee03522e..c7ee91716c 100644 --- a/tasks/orqa/evaluate_utils.py +++ b/tasks/orqa/evaluate_utils.py @@ -14,6 +14,7 @@ # limitations under the License. import torch +import deepspeed from megatron import get_args, print_rank_0 from megatron.checkpointing import load_biencoder_checkpoint @@ -79,7 +80,7 @@ def faiss_wrapper(self): use_gpu=self.faiss_use_gpu) # Wait for the FAISS index to be initialized in all the nodes - torch.distributed.barrier() + deepspeed.comm.barrier() def generate_query_vectors(self, qa_data, split): @@ -120,16 +121,16 @@ def evaluate(self, qa_data, split): query_tensor, reference_list = self.generate_query_vectors(qa_data, \ split) local_rank = args.local_rank - rank = torch.distributed.get_rank() + rank = deepspeed.comm.get_rank() device_count = torch.cuda.device_count() - num_nodes = torch.distributed.get_world_size() // device_count + num_nodes = deepspeed.comm.get_world_size() // device_count node_id = rank // device_count for node in range(num_nodes): start_rank = node * device_count end_rank = (node + 1) * device_count ranks_list = list(range(start_rank, end_rank)) - node_group = torch.distributed.new_group(ranks=ranks_list) + node_group = deepspeed.comm.new_group(ranks=ranks_list) if node_id == node: device_start_rank = start_rank @@ -137,7 +138,7 @@ def evaluate(self, qa_data, split): input_ = torch.empty_like(query_tensor).copy_(query_tensor).detach_() tensor_list = [torch.empty_like(input_) for _ in range(device_count)] - torch.distributed.all_gather(tensor_list, query_tensor, group=group) + deepspeed.comm.all_gather(tensor_list, query_tensor, group=group) if local_rank == 0 and self.mips_index is not None: all_query_tensor = torch.cat(tensor_list, dim=0).contiguous() @@ -154,9 +155,9 @@ def evaluate(self, qa_data, split): topkindex = torch.empty(device_count * len(query_tensor), \ args.faiss_topk_retrievals, dtype=torch.int64).cuda() - torch.distributed.broadcast(distance, src=device_start_rank, \ + deepspeed.comm.broadcast(distance, src=device_start_rank, \ group=group) - torch.distributed.broadcast(topkindex, src=device_start_rank, \ + deepspeed.comm.broadcast(topkindex, src=device_start_rank, \ group=group) distance = torch.split(distance, len(query_tensor), dim=0)\ diff --git a/tasks/vision/eval_utils.py b/tasks/vision/eval_utils.py index aabc04a159..1fe1bc3943 100644 --- a/tasks/vision/eval_utils.py +++ b/tasks/vision/eval_utils.py @@ -23,6 +23,7 @@ from tasks.vision.finetune_utils import build_data_loader from tasks.vision.finetune_utils import process_batch from torchvision import datasets, transforms +import deepspeed def accuracy_func_provider(): @@ -87,7 +88,7 @@ def calculate_correct_answers(model, dataloader, epoch): # Reduce. unreduced = torch.cuda.LongTensor([correct, total]) - torch.distributed.all_reduce(unreduced, group=mpu.get_data_parallel_group()) + deepspeed.comm.all_reduce(unreduced, group=mpu.get_data_parallel_group()) # Print on screen. correct_ans = unreduced[0].item() diff --git a/tasks/zeroshot_gpt/evaluate.py b/tasks/zeroshot_gpt/evaluate.py index 6366cfb60f..f9c27098dd 100644 --- a/tasks/zeroshot_gpt/evaluate.py +++ b/tasks/zeroshot_gpt/evaluate.py @@ -18,6 +18,7 @@ import math import torch +import deepspeed from megatron import get_args from megatron import print_rank_0, is_last_rank @@ -143,7 +144,7 @@ def evaluate(data_loader, model, eval_metric): # Reduce across processes. if mpu.is_pipeline_last_stage(): - torch.distributed.all_reduce(output, + deepspeed.comm.all_reduce(output, group=mpu.get_data_parallel_group()) total_output += output diff --git a/tools/generate_samples_gpt.py b/tools/generate_samples_gpt.py index 5df2c698e4..29d49301bf 100644 --- a/tools/generate_samples_gpt.py +++ b/tools/generate_samples_gpt.py @@ -145,7 +145,7 @@ def main(): #if torch.cuda.current_device() == 0: - if torch.distributed.get_rank() == 0: + if deepspeed.comm.get_rank() == 0: print_latency(latencies) print_latency(model_latencies, "model_latencies") print_latency(single_token_latency, "single_token_latency")