Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2240: Add Rabenseifner and Recursive doubling allreduce algorithms for ObjGroup #2272

Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7def560
#2240: Initial work for new allreduce
JacobDomagala Mar 24, 2024
01b6afb
#2240: Semi working Rabenseifner
JacobDomagala Mar 27, 2024
5289362
#2240: Working Rabenseifner (non-commutative ops)
JacobDomagala Apr 4, 2024
d52afeb
#2240: Fix non power of 2 for new allreduce
JacobDomagala Apr 7, 2024
5372da5
#2240: Initial work for adding recursive doubling allreduce algorithm
JacobDomagala Apr 10, 2024
90a20e0
#2240: Make sure the order of reduce operations is correct
JacobDomagala Apr 11, 2024
8bf1cc9
#2240: Working Recursive doubling
JacobDomagala Apr 15, 2024
bb1ca10
#2240: Code cleanup and make Rabenseifner work with any Op type
JacobDomagala Apr 16, 2024
166f231
#2240: Improve accuracy of timing allreduce algorithms in allreduce.cc
JacobDomagala Apr 26, 2024
fa16fa1
#2240: Add unit tests for new allreduce and cleanup code
JacobDomagala May 21, 2024
a0fdad8
#2240: DataHandler for Rabenseifner allreduce that provides common AP…
JacobDomagala May 28, 2024
f9a60fa
#2240: Fix warnings
JacobDomagala May 28, 2024
63b39f5
#2240: Update ObjGroup test to use custom DataHandler for Rabenseifne…
JacobDomagala May 30, 2024
a07f6c8
#2240: Add unit test for Rabenseifner with Kokkos::View as DataType a…
JacobDomagala May 31, 2024
316bfb8
#2240: Move function definitions to impl.h file for Rabenseifner
JacobDomagala Jun 3, 2024
b8cd612
#2240: Add allreduce print category and use it in rabenseifner instea…
JacobDomagala Jun 4, 2024
5f40e4b
#2240: Provide documentation for RecursiveDoubling algorithm
JacobDomagala Jun 4, 2024
b200ecd
#2240: Use vt_debug_print for RecursiveDoubling allreduce
JacobDomagala Jun 4, 2024
eb1bc40
#2240: Update allreduce perf tests to use array of payload sizes
JacobDomagala Jun 5, 2024
977e9e3
#2240: Fix runtime failure in allreduce perf test
JacobDomagala Jun 7, 2024
1456dd5
#2240: Working allreduce perf test with Kokkos
JacobDomagala Jun 16, 2024
5803848
#2240: Working RecursiveDoubling with multiple allreduce in flight
JacobDomagala Jun 17, 2024
2015e78
#2240: Update Rabenseifner to use ID for each allreduce and update tests
JacobDomagala Jun 18, 2024
87ad4cf
#2240: Fix failing unit and performance tests for multiple allreduce …
JacobDomagala Jun 25, 2024
ab0357b
#2240: Fix compile issues on some compilers and runtime issue with pa…
JacobDomagala Jul 2, 2024
be3ee2c
#2240: Update logs
JacobDomagala Jul 6, 2024
28139a7
#2240: Fix issues with handlers being executed and payload not being …
JacobDomagala Jul 16, 2024
c5232dc
#2240: Add helpers and use Kokkos::View for internals of Rabenseifner…
JacobDomagala Jul 17, 2024
57b8cab
#2240: Store Reducers by tuple(ProxyType, DataType, OperandType)
JacobDomagala Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
#2240: Semi working Rabenseifner
  • Loading branch information
JacobDomagala committed Jul 18, 2024
commit 01b6afb2c427c90d922387d10cb5563893c4f5b2
230 changes: 205 additions & 25 deletions src/vt/collective/reduce/allreduce/allreduce.h
Original file line number Diff line number Diff line change
@@ -47,11 +47,12 @@
#include "vt/config.h"
#include "vt/context/context.h"
#include "vt/messaging/message/message.h"
#include "vt/objgroup/proxy/proxy_objgroup.h"

#include <tuple>
#include <cstdint>

namespace vt::collective::reduce::alleduce {
namespace vt::collective::reduce::allreduce {

template <typename DataT>
struct AllreduceMsg
@@ -66,65 +67,244 @@ struct AllreduceMsg
explicit AllreduceMsg(DataT&& in_val)
: MessageParentType(),
val_(std::forward<DataT>(in_val)) { }
explicit AllreduceMsg(DataT const& in_val)
explicit AllreduceMsg(DataT const& in_val, int step = 0)
: MessageParentType(),
val_(in_val) { }
val_(in_val),
step_(step) { }

template <typename SerializeT>
void serialize(SerializeT& s) {
MessageParentType::serialize(s);
s | val_;
s | step_;
}

DataT val_ = {};
int32_t step_ = {};
};

template <typename DataT>
struct Allreduce {
void rightHalf(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->vec_.size(); i++) {
val_[(val_.size() / 2) + i] += msg->vec_[i];
void initialize(
const DataT& data, vt::objgroup::proxy::Proxy<Allreduce> proxy,
uint32_t num_nodes) {
this_node_ = vt::theContext()->getNode();
is_even_ = this_node_ % 2 == 0;
val_ = data;
proxy_ = proxy;
num_steps_ = static_cast<int32_t>(log2(num_nodes));
nprocs_pof2_ = 1 << num_steps_;
nprocs_rem_ = num_nodes - nprocs_pof2_;
is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_);
if (is_part_of_adjustment_group_) {
if (is_even_) {
vrt_node_ = this_node_ / 2;
} else {
vrt_node_ = -1;
}
} else {
vrt_node_ = this_node_ - nprocs_rem_;
}

r_index_.resize(num_steps_, 0);
r_count_.resize(num_steps_, 0);
s_index_.resize(num_steps_, 0);
s_count_.resize(num_steps_, 0);

w_size_ = data.size();

int step = 0;
size_t wsize = data.size();
for (int mask = 1; mask < nprocs_pof2_; mask <<= 1) {
auto vdest = vrt_node_ ^ mask;
auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_;

if (this_node_ < dest) {
r_count_[step] = wsize / 2;
s_count_[step] = wsize - r_count_[step];
s_index_[step] = r_index_[step] + r_count_[step];
} else {
s_count_[step] = wsize / 2;
r_count_[step] = wsize - s_count_[step];
r_index_[step] = s_index_[step] + s_count_[step];
}

if (step + 1 < num_steps_) {
r_index_[step + 1] = r_index_[step];
s_index_[step + 1] = r_index_[step];
wsize = r_count_[step];
step++;
}
}

// std::string str(1024, 0x0);
// for (int i = 0; i < num_steps_; ++i) {
// str.append(fmt::format(
// "Step{}: send_idx = {} send_count = {} recieve_idx = {} recieve_count "
// "= {}\n",
// i, s_index_[i], s_count_[i], r_index_[i], r_count_[i]));
// }
// fmt::print(
// "[{}] Initialize with size = {} num_steps {} \n {}", this_node_, w_size_,
// num_steps_, str);
}

void partOneCollective() {
if (is_part_of_adjustment_group_) {
auto const partner = is_even_ ? this_node_ + 1 : this_node_ - 1;

if (is_even_) {
proxy_[partner].template send<&Allreduce::partOneRightHalf>(
std::vector<int32_t>{val_.begin() + (val_.size() / 2), val_.end()});
vrt_node_ = this_node_ / 2;
} else {
proxy_[partner].template send<&Allreduce::partOneLeftHalf>(
std::vector<int32_t>{val_.begin(), val_.end() - (val_.size() / 2)});
vrt_node_ = -1;
}
} else {
vrt_node_ = this_node_ - nprocs_rem_;
}

if (nprocs_rem_ == 0) {
partTwo();
}
}

void rightHalfComplete(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->vec_.size(); i++) {
val_[(val_.size() / 2) + i] = msg->vec_[i];
void partOneRightHalf(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
val_[(val_.size() / 2) + i] += msg->val_[i];
}

// Send to left node
proxy_[theContext()->getNode() - 1]
.template send<&Allreduce::partOneFinalPart>(
std::vector<int32_t>{val_.begin() + (val_.size() / 2), val_.end()});
}

void leftHalf(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->vec_.size(); i++) {
val_[i] += msg->vec_[i];
void partOneLeftHalf(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
val_[i] += msg->val_[i];
}
}

void leftHalfComplete(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->vec_.size(); i++) {
val_[i] = msg->vec_[i];
void partOneFinalPart(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
val_[(val_.size() / 2) + i] = msg->val_[i];
}

partTwo();
}

void sendHandler(AllreduceMsg<DataT>* msg) {
uint32_t start = is_even_ ? 0 : val_.size() / 2;
uint32_t end = is_even_ ? val_.size() / 2 : val_.size();
for (int i = 0; start < end; start++) {
val_[start] += msg->vec_[i++];
void partTwo() {
auto vdest = vrt_node_ ^ mask_;
auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_;

// fmt::print(
// "[{}] Part2 Step {}: Sending to Node {} starting with idx = {} and count "
// "{} \n",
// this_node_, step_, dest, s_index_[step_], s_count_[step_]);
proxy_[dest].template send<&Allreduce::partTwoHandler>(
std::vector<int32_t>{
val_.begin() + (s_index_[step_]),
val_.begin() + (s_index_[step_]) + s_count_[step_]},
step_);

mask_ <<= 1;
if (step_ + 1 < num_steps_) {
step_++;
}
}

void reducedHan(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->vec_.size(); i++) {
val_[val_.size() / 2 + i] = msg->vec_[i];
void partTwoHandler(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
val_[r_index_[msg->step_] + i] += msg->val_[i];
}

// std::string data(128, 0x0);
// for (auto val : msg->val_) {
// data.append(fmt::format("{} ", val));
// }
// fmt::print(
// "[{}] Part2 Step {}: Received data ({}) idx = {} from {}\n", this_node_,
// msg->step_, data, r_index_[msg->step_],
// theContext()->getFromNodeCurrentTask());

if (mask_ < nprocs_pof2_) {
partTwo();
} else {
step_ = num_steps_ - 1;
mask_ = nprocs_pof2_ >> 1;
partThree();
}
}

Allreduce() { is_even_ = theContext()->getNode() % 2 == 0; }
void partThree() {
auto vdest = vrt_node_ ^ mask_;
auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_;

// std::string data(128, 0x0);
// auto subV = std::vector<int32_t>{
// val_.begin() + (r_index_[step_]),
// val_.begin() + (r_index_[step_]) + r_count_[step_]};
// for (auto val : subV) {
// data.append(fmt::format("{} ", val));
// }

// fmt::print(
// "[{}] Part3 Step {}: Sending to Node {} starting with idx = {} and count "
// "{} "
// "data={} \n",
// this_node_, step_, dest, r_index_[step_], r_count_[step_], data);

proxy_[dest].template send<&Allreduce::partThreeHandler>(
std::vector<int32_t>{
val_.begin() + (r_index_[step_]),
val_.begin() + (r_index_[step_]) + r_count_[step_]},
step_);

mask_ >>= 1;
step_--;
}

void partThreeHandler(AllreduceMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
val_[s_index_[msg->step_] + i] = msg->val_[i];
}

// std::string data(128, 0x0);
// for (auto val : msg->val_) {
// data.append(fmt::format("{} ", val));
// }
// fmt::print(
// "[{}] Part3 Step {}: Received data ({}) idx = {} from {}\n", this_node_,
// msg->step_, data, s_index_[msg->step_],
// theContext()->getFromNodeCurrentTask());

if (mask_ > 0) {
partThree();
}
}

NodeType this_node_ = {};
bool is_even_ = false;
vt::objgroup::proxy::Proxy<Allreduce> proxy_ = {};
DataT val_ = {};
NodeType vrt_node_ = {};
bool is_part_of_adjustment_group_ = false;
int32_t num_steps_ = {};
int32_t nprocs_pof2_ = {};
int32_t nprocs_rem_ = {};
int32_t mask_ = 1;

size_t w_size_ = {};
int32_t step_ = 0;
std::vector<int32_t> r_index_ = {};
std::vector<int32_t> r_count_ = {};
std::vector<int32_t> s_index_ = {};
std::vector<int32_t> s_count_ = {};
};

} // namespace vt::collective::reduce::alleduce
} // namespace vt::collective::reduce::allreduce

#endif /*INCLUDED_VT_COLLECTIVE_REDUCE_REDUCE_H*/
12 changes: 6 additions & 6 deletions src/vt/collective/reduce/allreduce/rabenseifner.h
Original file line number Diff line number Diff line change
@@ -9,11 +9,10 @@

#include <utility>

namespace vt::collective::reduce::alleduce {
namespace vt::collective::reduce::allreduce {

template <auto f, template <typename Arg> class Op, typename... Args>
void allreduce(Args&&... data) {

void allreduce_r(Args&&... data) {
auto msg = vt::makeMessage<AllreduceMsg>(std::forward<Args>(data)...);
auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();
@@ -39,7 +38,8 @@ void allreduce(Args&&... data) {
vt::runInEpochCollective([=] {
if (is_part_of_adjustment_group) {
auto const partner = is_even ? this_node + 1 : this_node - 1;
grp_proxy[partner].send<&Reducer::sendHandler>(std::forward<Args...>(data...));
grp_proxy[partner].send<&Reducer::sendHandler>(
std::forward<Args...>(data...));
}
});

@@ -123,6 +123,6 @@ void allreduce(Args&&... data) {
*/
}

} // namespace vt::collective::reduce::alleduce
} // namespace vt::collective::reduce::allreduce

#endif // INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H
#endif // INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H
3 changes: 3 additions & 0 deletions src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
@@ -291,6 +291,9 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
ProxyType<ObjT> proxy, std::string const& name, std::string const& parent = ""
);

template <auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType allreduce_r(ProxyType<ObjT> proxy, const DataT& data);

/**
* \brief Perform a reduction over an objgroup
*
29 changes: 29 additions & 0 deletions src/vt/objgroup/manager.impl.h
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@
//@HEADER
*/

#include "vt/messaging/message/smart_ptr.h"
#include <utility>
#if !defined INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H
#define INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H

@@ -57,6 +59,7 @@
#include "vt/collective/collective_alg.h"
#include "vt/messaging/active.h"
#include "vt/elm/elm_id_bits.h"
#include "vt/collective/reduce/allreduce/allreduce.h"

#include <memory>

@@ -262,6 +265,32 @@ ObjGroupManager::PendingSendType ObjGroupManager::broadcast(MsgSharedPtr<MsgT> m
return objgroup::broadcast(msg,han);
}

template <
auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType
ObjGroupManager::allreduce_r(ProxyType<ObjT> proxy, const DataT& data) {
// check payload size and choose appropriate algorithm

auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();

using Reducer = collective::reduce::allreduce::Allreduce<DataT>;

auto grp_proxy =
vt::theObjGroup()->makeCollective<Reducer>("allreduce_rabenseifner");

grp_proxy[this_node].template invoke<&Reducer::initialize>(
data, grp_proxy, num_nodes);

vt::runInEpochCollective([=] {
grp_proxy[this_node].template invoke<&Reducer::partOneCollective>();
});

proxy[this_node].template invoke<f>(grp_proxy.get()->val_);

return PendingSendType{nullptr};
}

template <typename ObjT, typename MsgT, ActiveTypedFnType<MsgT> *f>
ObjGroupManager::PendingSendType ObjGroupManager::reduce(
ProxyType<ObjT> proxy, MsgSharedPtr<MsgT> msg,
9 changes: 9 additions & 0 deletions src/vt/objgroup/proxy/proxy_objgroup.h
Original file line number Diff line number Diff line change
@@ -198,6 +198,15 @@ struct Proxy {
Args&&... args
) const;

template <
auto f,
template <typename Arg> class Op = collective::NoneOp,
typename... Args
>
PendingSendType allreduce_h(
Args&&... args
) const;

/**
* \brief Reduce back to a point target. Performs a reduction using operator
* `Op` followed by a send to `f` with the result.
Loading