Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚀[FEA]: Distributed Training/Inference: handle scatter/gather better and more consistently #520

Open
stadlmax opened this issue May 23, 2024 · 3 comments
Assignees
Labels
? - Needs Triage Need team to review and classify distributed Distributed and model parallel tools enhancement New feature or request

Comments

@stadlmax
Copy link
Collaborator

Is this a new feature, an improvement, or a change to existing functionality?

Improvement

How would you describe the priority of this feature request

Low (would be nice)

Please provide a clear description of problem you would like to solve.

Problem exists in model-parallel settings where not all ranks have valid tensors, mainly around gather and scatter routines.

Scatter

  • scatter assumes a single tensor on a source rank which is distributed in parts across other ranks
  • to be able to receive these chunks, however, these ranks need to know the dtype, and other meta-information like requires_grad to not break training pipelines
  • current solutions either require the user to specify these things on each rank, or assume empty "dummy" tensors on each rank that carry these information, these, however, might be not robust when registered in compute-graphs of autograd-frameworks

Gather

  • the backward pass of gather is a a scatter call, so similar problems arise, although this case can be handled more easily by e.g. storing meta-data in the corresponding context of the torch.autograd.Function
  • main issue rather arises in upstream layers if gather returns None on all participating ranks, as it could be more informative to have an object carrying information about this None just being the null-part of a distributed tensor which currently is valid on rank X

Potential Solution

  • in general, we should make things more consistent throughout
  • a potential solution would be to define something like a TensorPlaceholder which carries meta-data on ranks where the tensor is currently not valid and is more informative than just None

Describe any alternatives you have considered

No response

@stadlmax stadlmax added enhancement New feature or request ? - Needs Triage Need team to review and classify labels May 23, 2024
@akshaysubr akshaysubr added the distributed Distributed and model parallel tools label May 28, 2024
@mnabian
Copy link
Collaborator

mnabian commented Oct 18, 2024

@stadlmax could you please provide an update on this issue?

@stadlmax
Copy link
Collaborator Author

I would say that the problem still exists in general. In our implementations, we should have enough workarounds in place to avoid these issues. If we want to simplify the lives of people implementing other distributed solutions, one could think of tackling that issue. I would say low priority.
Orthogonal to that, we can keep monitoring the progress on DTensor in upstream PyTorch which could be a solution to this minor problem.

@coreyjadams
Copy link
Collaborator

On DTensor: It is unlikely that DTensor will ever be suitable for this task. The challenge is that DTensor explicitly assumes tensors are distributed across ranks as if you called torch.chunk and each chunk went to a subsequent device.

I have implemented ShardTensor in modulus as a relatively lightweight extension to DTensor to get around this, though it may not cover the exact cases seen here. In particular, scatter/gather are wrapped in syntax like:

Scatter a tensor:

# a is a tensor on rank 0
# target_mesh is a torch DeviceMesh indicating a specific group of GPUs
# target_placements is a list of placements from DTensor (len(target_placements) == mesh.ndim) that
# specifies exactly how, and what axis (or axes) a tensor is to be distributed on.  Supports sharding and replication.
scattered_a = distribute_tensor(a, target_mesh, target_placements)
# ^ this is differentiable

(all)Gather a tensor:

gathered_a = scattered_a.full_tensor()
# ^ this is differentiable

One point is that this full_tensor operation implies an allgather, not a single-device-gather, because it relies on DTensor concepts which are largely focused on single-program, multi-gpu parallelism.

The above syntax works for DTensor and ShardTensor equally, though the DTensor gather/scatter will assume torch.chunk layouts as mentioned. ShardTensor will not make these assumptions but instead will use the appropriate (and differentiable) scatter_v/gather_v operations to coalesce a distributed tensor.

ShardTensor can also (correctly) work with this syntax, on every rank:

distributed_tensor = ShardTensor.from_local(local_slice, target_mesh, target_placements)

Which will work regardless of the local_slice size, as long as the set of global tensors have the same rank and shape such that they could be gathered into one tensor. Meaning, tensor dimensions must agree on all nodes, excluding dimensions that are being sharded.

We should take a look at the use cases for scatter/gather in modulus to see if it makes sense to apply this in existing code, or if it will be useful in new developments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
? - Needs Triage Need team to review and classify distributed Distributed and model parallel tools enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants