Skip to content

Commit

Permalink
gluon support to exchange delta mirrors between edit batches (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
divija95 authored Apr 17, 2024
1 parent 7895677 commit 4a0158d
Showing 1 changed file with 90 additions and 0 deletions.
90 changes: 90 additions & 0 deletions libgluon/include/galois/graphs/GluonSubstrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,96 @@ class GluonSubstrate : public galois::runtime::GlobalObject {
net.resetMemUsage();
}

// Delta mirrors proxy communication
void exchangeDeltaMirrors(std::vector<std::vector<size_t>>& delta_mirrors) {
auto& net = galois::runtime::getSystemNetworkInterface();

// send off the mirror nodes
for (unsigned x = 0; x < numHosts; ++x) {
if (x == id)
continue;

galois::runtime::SendBuffer b;
gSerialize(b, delta_mirrors[x]);
net.sendTagged(x, galois::runtime::evilPhase, std::move(b));
}

std::vector<std::vector<size_t>> delta_masters(numHosts);
// receive the mirror nodes
for (unsigned x = 0; x < numHosts; ++x) {
if (x == id)
continue;

decltype(net.recieveTagged(galois::runtime::evilPhase)) p;
do {
p = net.recieveTagged(galois::runtime::evilPhase);
} while (!p);

galois::runtime::gDeserialize(p->second, delta_masters[x]);
for (size_t i = 0; i < delta_masters[x].size(); ++i) {
(*masterNodes)[x].push_back(delta_masters[x][i]);
}
}
incrementEvilPhase();
}

void addDeltaMirrors(std::vector<std::vector<size_t>>& delta_mirrors) {
for (size_t i = 0; i < delta_mirrors.size(); ++i) {
for (size_t j = 0; j < delta_mirrors[i].size(); ++j)
std::cout << "mirror " << delta_mirrors[i][j] << " host " << id
<< " i " << i << "\n";
}
std::vector<uint32_t> curr_mirrors_sizes(numHosts);
std::vector<uint32_t> curr_masters_sizes(numHosts);
for (uint32_t h = 0; h < numHosts; ++h) {
curr_mirrors_sizes[h] = (*mirrorNodes)[h].size();
curr_masters_sizes[h] = (*masterNodes)[h].size();
}
exchangeDeltaMirrors(delta_mirrors);
for (unsigned x = 0; x < numHosts; ++x) {
if (x == id)
continue;
for (size_t i = 0; i < delta_mirrors[x].size(); ++i) {
(*mirrorNodes)[x].push_back(delta_mirrors[x][i]);
}
}

// convert the global ids stored in the master/mirror nodes arrays to local
// ids
for (uint32_t h = 0; h < masterNodes->size(); ++h) {
galois::do_all(
galois::iterate(size_t{curr_masters_sizes[h]},
(*masterNodes)[h].size()),
[&](size_t n) {
(*masterNodes)[h][n] = userGraph.getLID((*masterNodes)[h][n]);
},
galois::no_stats());
}

for (uint32_t h = 0; h < mirrorNodes->size(); ++h) {
galois::do_all(
galois::iterate(size_t{curr_mirrors_sizes[h]},
(*mirrorNodes)[h].size()),
[&](size_t n) {
(*mirrorNodes)[h][n] = userGraph.getLID((*mirrorNodes)[h][n]);
},
galois::no_stats());
}

maxSharedSize = 0;
for (auto x = 0U; x < masterNodes->size(); ++x) {
assert(x < mirrorNodes->size());
if (x == id)
continue;
if ((*masterNodes)[x].size() > maxSharedSize) {
maxSharedSize = (*masterNodes)[x].size();
}
if ((*mirrorNodes)[x].size() > maxSharedSize) {
maxSharedSize = (*mirrorNodes)[x].size();
}
}
}

/**
* Reports master/mirror stats.
* Assumes that communication has already occured so that the host
Expand Down

0 comments on commit 4a0158d

Please sign in to comment.