From f5e2538b99a0c4bd6607ec15e9a8fdf4e258fb9d Mon Sep 17 00:00:00 2001 From: gdeng Date: Thu, 16 Jan 2025 20:51:14 -0800 Subject: [PATCH 1/6] add support for expert tensor parallelism Signed-off-by: gdeng --- nemo/lightning/_strategy_lib.py | 2 ++ nemo/lightning/megatron_init.py | 3 +++ .../pytorch/strategies/megatron_strategy.py | 5 +++++ nemo/utils/app_state.py | 16 ++++++++++++++++ 4 files changed, 26 insertions(+) diff --git a/nemo/lightning/_strategy_lib.py b/nemo/lightning/_strategy_lib.py index 884ea416ce10..44bb40df98af 100644 --- a/nemo/lightning/_strategy_lib.py +++ b/nemo/lightning/_strategy_lib.py @@ -81,6 +81,7 @@ def init_parallel_ranks( local_rank=init_local_rank, tensor_model_parallel_size=parallel_config.tensor_model_parallel_size, expert_model_parallel_size=parallel_config.expert_model_parallel_size, + expert_tensor_parallel_size=parallel_config.expert_tensor_parallel_size, pipeline_model_parallel_size=parallel_config.pipeline_model_parallel_size, virtual_pipeline_model_parallel_size=parallel_config.virtual_pipeline_model_parallel_size, context_parallel_size=parallel_config.context_parallel_size, @@ -121,6 +122,7 @@ def init_model_parallel(model: Optional[nn.Module] = None) -> None: encoder_tensor_model_parallel_size=app_state.encoder_tensor_model_parallel_size, context_parallel_size=app_state.context_parallel_size, expert_model_parallel_size=app_state.expert_model_parallel_size, + expert_tensor_parallel_size=app_state.expert_tensor_parallel_size, ) # assert that fake tp and pp rank match after model parallel init diff --git a/nemo/lightning/megatron_init.py b/nemo/lightning/megatron_init.py index 948113642f91..11508713a6bd 100644 --- a/nemo/lightning/megatron_init.py +++ b/nemo/lightning/megatron_init.py @@ -91,6 +91,7 @@ def initialize_model_parallel_for_nemo( local_rank, tensor_model_parallel_size=1, expert_model_parallel_size=1, + expert_tensor_parallel_size=None, pipeline_model_parallel_size=1, virtual_pipeline_model_parallel_size=None, pipeline_model_parallel_split_rank=None, @@ -126,6 +127,7 @@ def initialize_model_parallel_for_nemo( app_state.encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size app_state.use_fp8 = use_fp8 app_state.init_mpi_proc_group = init_mpi_proc_group + app_state.expert_tensor_parallel_size=expert_tensor_parallel_size ( app_state.tensor_model_parallel_rank, app_state.pipeline_model_parallel_rank, @@ -143,6 +145,7 @@ def initialize_model_parallel_for_nemo( pipeline_model_parallel_split_rank_=pipeline_model_parallel_split_rank, context_parallel_size_=context_parallel_size, expert_model_parallel_size_=expert_model_parallel_size, + expert_tensor_parallel_size_=expert_tensor_parallel_size, encoder_tensor_model_parallel_size_=encoder_tensor_model_parallel_size, encoder_pipeline_model_parallel_size_=encoder_pipeline_model_parallel_size, use_tp_pp_dp_mapping=use_tp_pp_dp_mapping, diff --git a/nemo/lightning/pytorch/strategies/megatron_strategy.py b/nemo/lightning/pytorch/strategies/megatron_strategy.py index d38753bd7935..39bc7b46be16 100644 --- a/nemo/lightning/pytorch/strategies/megatron_strategy.py +++ b/nemo/lightning/pytorch/strategies/megatron_strategy.py @@ -100,6 +100,7 @@ class ParallelismConfig: encoder_tensor_model_parallel_size: int = 0 encoder_pipeline_model_parallel_size: int = 0 use_te_rng_tracker: bool = False + expert_tensor_parallel_size: Optional[int] = None class MegatronStrategy(DDPStrategy, io.IOMixin): @@ -125,6 +126,7 @@ class MegatronStrategy(DDPStrategy, io.IOMixin): parallelizing layer norms and dropout sequentially. Defaults to False. expert_model_parallel_size (int): Distributes MoE Experts across sub data parallel dimension. Defaults to 1. + expert_tensor_parallel_size (Optional[int]): Sets MoE Experts tensor parallelism size. Defaults to None. moe_extended_tp (bool): Alternative parallelization strategy for expert parallelism. Defaults to False. data_sampler (Optional['DataSampler']): Custom data sampler for distributed training. Defaults to None. parallel_devices (Optional[List[torch.device]]): List of devices to use for parallelism. Defaults to None. @@ -187,6 +189,7 @@ def __init__( context_parallel_size: int = 1, sequence_parallel: bool = False, expert_model_parallel_size: int = 1, + expert_tensor_parallel_size: Optional[int] = None, moe_extended_tp: bool = False, encoder_tensor_model_parallel_size: Optional[int] = 0, encoder_pipeline_model_parallel_size: Optional[int] = 0, @@ -237,6 +240,7 @@ def __init__( ) self.context_parallel_size = context_parallel_size self.expert_model_parallel_size = expert_model_parallel_size + self.expert_tensor_parallel_size = expert_tensor_parallel_size self.moe_extended_tp = moe_extended_tp self.virtual_pipeline_model_parallel_size = virtual_pipeline_model_parallel_size self.sequence_parallel = sequence_parallel @@ -899,6 +903,7 @@ def parallelism(self) -> ParallelismConfig: context_parallel_size=self.context_parallel_size, sequence_parallel=self.sequence_parallel, expert_model_parallel_size=self.expert_model_parallel_size, + expert_tensor_parallel_size=self.expert_tensor_parallel_size, moe_extended_tp=self.moe_extended_tp, encoder_tensor_model_parallel_size=self.encoder_tensor_model_parallel_size, encoder_pipeline_model_parallel_size=self.encoder_pipeline_model_parallel_size, diff --git a/nemo/utils/app_state.py b/nemo/utils/app_state.py index 37193cfdd8c5..c7acb0f07d9f 100644 --- a/nemo/utils/app_state.py +++ b/nemo/utils/app_state.py @@ -186,6 +186,22 @@ def expert_model_parallel_size(self, size): """ self._expert_model_parallel_size = size + @property + def expert_tensor_parallel_size(self): + """Property returns the number of GPUs in each expert tensor parallel group. + Returns: + Number of GPUs in each expert tensor parallel group. + """ + return self._expert_tensor_parallel_size + + @expert_tensor_parallel_size.setter + def expert_tensor_parallel_size(self, size): + """Property sets the number of GPUs in each expert tensor parallel group. + Args: + size (int): Number of GPUs in each expert tensor parallel group. + """ + self._expert_tensor_parallel_size = size + @property def pipeline_model_parallel_size(self): """Property returns the number of GPUs in each model parallel group. From 6d8bd3f733415d89f99886323dc2871fe56ec40c Mon Sep 17 00:00:00 2001 From: gdeng Date: Thu, 16 Jan 2025 22:08:42 -0800 Subject: [PATCH 2/6] remove function that's going to be deprecated Signed-off-by: gdeng --- .../vision_language_foundation/clip/megatron_clip_models.py | 2 +- .../language_modeling/megatron_lm_encoder_decoder_model.py | 2 +- nemo/core/optim/optimizer_with_main_params.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py index 84718f99262f..9016ec054f84 100644 --- a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py +++ b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py @@ -812,7 +812,7 @@ def setup_mcore_distributed_parallel(self): ddp_config, model_chunk, data_parallel_group=parallel_state.get_data_parallel_group(with_context_parallel=True), - expert_data_parallel_group=parallel_state.get_data_modulo_expert_parallel_group(), + expert_data_parallel_group=parallel_state.get_expert_data_parallel_group(), # Turn off bucketing for model_chunk 2 onwards, since communication for these # model chunks is overlapped with compute anyway. disable_bucketing=(model_chunk_idx > 0), diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index e530a40d8aaa..43fd971ca117 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -1882,7 +1882,7 @@ def setup_mcore_distributed_parallel(self): ddp_config, model_chunk, data_parallel_group=parallel_state.get_data_parallel_group(with_context_parallel=True), - expert_data_parallel_group=parallel_state.get_data_modulo_expert_parallel_group(), + expert_data_parallel_group=parallel_state.get_expert_data_parallel_group(), # Turn off bucketing for model_chunk 2 onwards, since communication for these # model chunks is overlapped with compute anyway. disable_bucketing=(model_chunk_idx > 0), diff --git a/nemo/core/optim/optimizer_with_main_params.py b/nemo/core/optim/optimizer_with_main_params.py index 0f549443b772..d6a0c6e22158 100755 --- a/nemo/core/optim/optimizer_with_main_params.py +++ b/nemo/core/optim/optimizer_with_main_params.py @@ -30,7 +30,7 @@ try: from megatron.core.parallel_state import ( - get_data_modulo_expert_parallel_group, + get_expert_data_parallel_group, get_data_parallel_group, get_data_parallel_world_size, ) @@ -74,7 +74,7 @@ def _multi_tensor_copy_this_to_that(this, that, overflow_buf): def _get_grad_data_group(is_expert_group): if is_expert_group: - data_group = get_data_modulo_expert_parallel_group() + data_group = get_expert_data_parallel_group() else: data_group = get_data_parallel_group(with_context_parallel=True) return data_group From 18d9b0edea7d7f63668d868543c7f0db0310ac37 Mon Sep 17 00:00:00 2001 From: gdeng Date: Thu, 16 Jan 2025 22:15:50 -0800 Subject: [PATCH 3/6] format Signed-off-by: gdeng --- nemo/lightning/megatron_init.py | 2 +- nemo/lightning/pytorch/strategies/megatron_strategy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo/lightning/megatron_init.py b/nemo/lightning/megatron_init.py index 11508713a6bd..90f96dc83034 100644 --- a/nemo/lightning/megatron_init.py +++ b/nemo/lightning/megatron_init.py @@ -127,7 +127,7 @@ def initialize_model_parallel_for_nemo( app_state.encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size app_state.use_fp8 = use_fp8 app_state.init_mpi_proc_group = init_mpi_proc_group - app_state.expert_tensor_parallel_size=expert_tensor_parallel_size + app_state.expert_tensor_parallel_size = expert_tensor_parallel_size ( app_state.tensor_model_parallel_rank, app_state.pipeline_model_parallel_rank, diff --git a/nemo/lightning/pytorch/strategies/megatron_strategy.py b/nemo/lightning/pytorch/strategies/megatron_strategy.py index 39bc7b46be16..36000432706a 100644 --- a/nemo/lightning/pytorch/strategies/megatron_strategy.py +++ b/nemo/lightning/pytorch/strategies/megatron_strategy.py @@ -100,7 +100,7 @@ class ParallelismConfig: encoder_tensor_model_parallel_size: int = 0 encoder_pipeline_model_parallel_size: int = 0 use_te_rng_tracker: bool = False - expert_tensor_parallel_size: Optional[int] = None + expert_tensor_parallel_size: Optional[int] = None class MegatronStrategy(DDPStrategy, io.IOMixin): From 3d5accf4ac0e5826ba4ffce6ae5cfd1fbef7572d Mon Sep 17 00:00:00 2001 From: gdengk Date: Fri, 17 Jan 2025 18:34:09 +0000 Subject: [PATCH 4/6] Apply isort and black reformatting Signed-off-by: gdengk --- nemo/core/optim/optimizer_with_main_params.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/core/optim/optimizer_with_main_params.py b/nemo/core/optim/optimizer_with_main_params.py index d6a0c6e22158..9723a876e58a 100755 --- a/nemo/core/optim/optimizer_with_main_params.py +++ b/nemo/core/optim/optimizer_with_main_params.py @@ -30,9 +30,9 @@ try: from megatron.core.parallel_state import ( - get_expert_data_parallel_group, get_data_parallel_group, get_data_parallel_world_size, + get_expert_data_parallel_group, ) from megatron.core.tensor_parallel import copy_tensor_model_parallel_attributes From 6d7e393a18d56554a61a408ec798f8a2f0f0935b Mon Sep 17 00:00:00 2001 From: gdeng Date: Tue, 21 Jan 2025 11:12:36 -0800 Subject: [PATCH 5/6] add missing initilization Signed-off-by: gdeng --- nemo/utils/app_state.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo/utils/app_state.py b/nemo/utils/app_state.py index c7acb0f07d9f..ad702babd2f3 100644 --- a/nemo/utils/app_state.py +++ b/nemo/utils/app_state.py @@ -48,6 +48,7 @@ def __init__(self): self._tensor_model_parallel_size = None self._tensor_model_parallel_group = None self._expert_model_parallel_size = None + self._expert_tensor_parallel_size = None self._pipeline_model_parallel_size = None self._virtual_pipeline_model_parallel_size = None self._encoder_tensor_model_parallel_size = None From 7a399f27175577e401a890b44d1608f91ecd927b Mon Sep 17 00:00:00 2001 From: gdeng Date: Fri, 24 Jan 2025 16:49:39 -0800 Subject: [PATCH 6/6] fix unit test Signed-off-by: gdeng --- tests/lightning/test_strategy_lib.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/lightning/test_strategy_lib.py b/tests/lightning/test_strategy_lib.py index 017c325842d4..197fc8d4982c 100644 --- a/tests/lightning/test_strategy_lib.py +++ b/tests/lightning/test_strategy_lib.py @@ -78,6 +78,7 @@ def test_init_parallel_ranks() -> None: mock_parallel_config.virtual_pipeline_model_parallel_size = 4 mock_parallel_config.context_parallel_size = 2 mock_parallel_config.expert_model_parallel_size = 2 + mock_parallel_config.expert_tensor_parallel_size = None mock_parallel_config.encoder_tensor_model_parallel_size = 0 mock_parallel_config.encoder_pipeline_model_parallel_size = 0 mock_parallel_config.tp_comm_overlap = False