From 4a0158d3e4a1ce652d3f6faa563af26b2c3052c8 Mon Sep 17 00:00:00 2001 From: divija95 <60272396+divija95@users.noreply.github.com> Date: Wed, 17 Apr 2024 16:47:56 -0500 Subject: [PATCH] gluon support to exchange delta mirrors between edit batches (#54) --- .../include/galois/graphs/GluonSubstrate.h | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/libgluon/include/galois/graphs/GluonSubstrate.h b/libgluon/include/galois/graphs/GluonSubstrate.h index 9d53b080b..0f6615acc 100644 --- a/libgluon/include/galois/graphs/GluonSubstrate.h +++ b/libgluon/include/galois/graphs/GluonSubstrate.h @@ -339,6 +339,96 @@ class GluonSubstrate : public galois::runtime::GlobalObject { net.resetMemUsage(); } + // Delta mirrors proxy communication + void exchangeDeltaMirrors(std::vector>& delta_mirrors) { + auto& net = galois::runtime::getSystemNetworkInterface(); + + // send off the mirror nodes + for (unsigned x = 0; x < numHosts; ++x) { + if (x == id) + continue; + + galois::runtime::SendBuffer b; + gSerialize(b, delta_mirrors[x]); + net.sendTagged(x, galois::runtime::evilPhase, std::move(b)); + } + + std::vector> delta_masters(numHosts); + // receive the mirror nodes + for (unsigned x = 0; x < numHosts; ++x) { + if (x == id) + continue; + + decltype(net.recieveTagged(galois::runtime::evilPhase)) p; + do { + p = net.recieveTagged(galois::runtime::evilPhase); + } while (!p); + + galois::runtime::gDeserialize(p->second, delta_masters[x]); + for (size_t i = 0; i < delta_masters[x].size(); ++i) { + (*masterNodes)[x].push_back(delta_masters[x][i]); + } + } + incrementEvilPhase(); + } + + void addDeltaMirrors(std::vector>& delta_mirrors) { + for (size_t i = 0; i < delta_mirrors.size(); ++i) { + for (size_t j = 0; j < delta_mirrors[i].size(); ++j) + std::cout << "mirror " << delta_mirrors[i][j] << " host " << id + << " i " << i << "\n"; + } + std::vector curr_mirrors_sizes(numHosts); + std::vector curr_masters_sizes(numHosts); + for (uint32_t h = 0; h < numHosts; ++h) { + curr_mirrors_sizes[h] = (*mirrorNodes)[h].size(); + curr_masters_sizes[h] = (*masterNodes)[h].size(); + } + exchangeDeltaMirrors(delta_mirrors); + for (unsigned x = 0; x < numHosts; ++x) { + if (x == id) + continue; + for (size_t i = 0; i < delta_mirrors[x].size(); ++i) { + (*mirrorNodes)[x].push_back(delta_mirrors[x][i]); + } + } + + // convert the global ids stored in the master/mirror nodes arrays to local + // ids + for (uint32_t h = 0; h < masterNodes->size(); ++h) { + galois::do_all( + galois::iterate(size_t{curr_masters_sizes[h]}, + (*masterNodes)[h].size()), + [&](size_t n) { + (*masterNodes)[h][n] = userGraph.getLID((*masterNodes)[h][n]); + }, + galois::no_stats()); + } + + for (uint32_t h = 0; h < mirrorNodes->size(); ++h) { + galois::do_all( + galois::iterate(size_t{curr_mirrors_sizes[h]}, + (*mirrorNodes)[h].size()), + [&](size_t n) { + (*mirrorNodes)[h][n] = userGraph.getLID((*mirrorNodes)[h][n]); + }, + galois::no_stats()); + } + + maxSharedSize = 0; + for (auto x = 0U; x < masterNodes->size(); ++x) { + assert(x < mirrorNodes->size()); + if (x == id) + continue; + if ((*masterNodes)[x].size() > maxSharedSize) { + maxSharedSize = (*masterNodes)[x].size(); + } + if ((*mirrorNodes)[x].size() > maxSharedSize) { + maxSharedSize = (*mirrorNodes)[x].size(); + } + } + } + /** * Reports master/mirror stats. * Assumes that communication has already occured so that the host