Skip to content

Commit

Permalink
changed Graph Update Manager structure (#57)
Browse files Browse the repository at this point in the history
* changed Graph Update Manager structure

* fixing pre-commit
  • Loading branch information
divija95 authored Apr 24, 2024
1 parent 0ae8100 commit 74f886f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 138 deletions.
78 changes: 23 additions & 55 deletions libcusp/include/galois/graphs/DistributedLocalGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -991,14 +991,12 @@ class DistLocalGraph {
std::vector<NodeTy> data;
data.push_back(dstData.value()[i]);
graph->addVertices(data);
mirrorNodes[getHostID(token)].push_back(token);
}
i++;
if (!isOwned(token)) {
mirrorNodes[getHostID(token)].push_back(token);
} else {
if (edge_begin(getLID(token)) == edge_end(getLID(token))) {
numNodesWithEdges++;
}
if (isOwned(token) &&
(edge_begin(getLID(token)) == edge_end(getLID(token)))) {
numNodesWithEdges++;
}
}
numEdges += dsts.value().size();
Expand All @@ -1007,77 +1005,47 @@ class DistLocalGraph {

/** Topology Modifications **/
void addVertexTopologyOnly(uint32_t token) {
uint64_t belongsTo = getHostID(token);
if (belongsTo == id) {
updateVariables(true, token);
graph->addVertexTopologyOnly();
} else {
sendModifyRequest(belongsTo, ADD_VERTEX_TOPOLOGY_ONLY, token);
}
updateVariables(true, token);
graph->addVertexTopologyOnly();
}

template <typename T>
void addVertex(T data) {
uint64_t token = data.id;
uint64_t belongsTo = getHostID(token);
uint64_t token = data.id;
std::vector<T> dataVec;
dataVec.push_back(data);
if (belongsTo == id) {
updateVariables(true, token);
graph->addVertices(dataVec);
} else {
sendModifyRequest(belongsTo, ADD_VERTEX, dataVec);
}
updateVariables(true, token);
graph->addVertices(dataVec);
}

void addEdgesTopologyOnly(uint64_t src, std::vector<uint64_t> dsts) {
uint64_t belongsTo = getHostID(src);
if (belongsTo == id) {
updateVariables(false, src, dsts);
std::vector<uint64_t> lids;
for (uint32_t i = 0; i < dsts.size(); i++) {
lids.push_back(getLID(dsts[i]));
}
graph->addEdgesTopologyOnly(getLID(src), lids);
} else {
sendModifyRequest(belongsTo, ADD_EDGES_TOPOLOGY_ONLY, src, dsts);
updateVariables(false, src, dsts);
std::vector<uint64_t> lids;
for (uint32_t i = 0; i < dsts.size(); i++) {
lids.push_back(getLID(dsts[i]));
}
graph->addEdgesTopologyOnly(getLID(src), lids);
}

void addEdges(uint64_t src, std::vector<uint64_t> dsts,
std::vector<EdgeTy> data, std::vector<NodeTy> dstData) {
uint64_t belongsTo = getHostID(src);
if (belongsTo == id) {
updateVariables(false, src, dsts, dstData);
std::vector<uint64_t> lids;
for (uint32_t i = 0; i < dsts.size(); i++) {
lids.push_back(getLID(dsts[i]));
}
graph->addEdges(getLID(src), lids, data);
} else {
sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data, dstData);
updateVariables(false, src, dsts, dstData);
std::vector<uint64_t> lids;
for (uint32_t i = 0; i < dsts.size(); i++) {
lids.push_back(getLID(dsts[i]));
}
graph->addEdges(getLID(src), lids, data);
}

void deleteVertex(uint64_t src) {
uint64_t belongsTo = getHostID(src);
if (belongsTo == id) {
// TODO(Divija): Uncomment when we have the graph API
// graph.deleteVertex(getLID(src));
} else {
sendModifyRequest(belongsTo, DELETE_VERTEX, src);
}
// TODO(Divija): Uncomment when we have the graph API
// graph.deleteVertex(getLID(src));
}

void deleteEdges(uint64_t src, std::vector<edge_iterator> edges) {
// TODO:Remove dst tokens from local map?
uint64_t belongsTo = getHostID(src);
if (belongsTo == id) {
// TODO(Divija): Uncomment when we have the graph API
// return graph.deleteEdges(getLID(src), edges);
} else {
sendModifyRequest(belongsTo, DELETE_EDGES, src, edges);
}
// TODO(Divija): Uncomment when we have the graph API
// return graph.deleteEdges(getLID(src), edges);
}
};

Expand Down
170 changes: 93 additions & 77 deletions libgalois/include/galois/runtime/GraphUpdateManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
#include <galois/Timer.h>
#include "galois/wmd/graphTypes.h"

// Usage: call start() to start the ingestion of the file
// call stop() to stop the ingestion of the file
// call setBatchSize() to set the batch size
// Usage: call update() to start the ingestion of the file
// Refer to wmd-graph-build for an example of how to use this class

using namespace agile::workflow1;
Expand All @@ -31,49 +29,38 @@ class graphUpdateManager {
graphUpdateManager(graphUpdateManager&&) = delete;
graphUpdateManager& operator=(graphUpdateManager&&) = delete;

void start() {
// start the dynamic changes
startIngest = std::thread(&graphUpdateManager::ingestFile, this);
checkThread = std::thread(&graphUpdateManager::checkForMessages, this);
void update() {
ingestFile();
graph->updateRanges();
}

void setBatchSize(uint64_t size) { batchSize = size; }

uint64_t getBatchSize() { return batchSize; }

void setPeriod(uint64_t period) { periodForCheck = period; }

uint64_t getPeriod() { return periodForCheck; }

bool stop() {
if (stopIngest) {
while (!checkThread.joinable())
;
startIngest.join();
}
return stopIngest;
}
bool stop2() {
std::this_thread::sleep_for(std::chrono::milliseconds(10 * periodForCheck));
stopCheck = true;
while (!checkThread.joinable())
;
graph->updateRanges();
checkThread.join();
return stopIngest;
}

private:
std::thread checkThread;
std::thread startIngest;
uint64_t periodForCheck;
std::string graphFile;
T* graph;
uint64_t batchSize = 10;
bool stopIngest = false;
bool stopCheck = false;
std::unique_ptr<galois::graphs::FileParser<NodeData, EdgeData>> fileParser;

void processNodes(std::vector<NodeData>& nodes) {
for (auto& node : nodes) {
graph->addVertex(node);
}
}

template <typename N = NodeData, typename E = EdgeData>
void processEdges(std::vector<E>& edges) {
for (auto& edge : edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
std::vector<N> dstData = fileParser->GetDstData(edges);
std::vector<E> data;
data.push_back(edge);
graph->addEdges(edge.src, dsts, data, dstData);
}
}

template <typename N = NodeData, typename E = EdgeData>
void processLine(const char* line, size_t len) {
galois::graphs::ParsedGraphStructure<N, E> value =
Expand All @@ -92,6 +79,73 @@ class graphUpdateManager {
}
}

template <typename N = NodeData, typename E = EdgeData>
void processUpdates(
std::vector<galois::graphs::ParsedGraphStructure<N, E>>& updateVector) {
std::vector<std::vector<EdgeData>> updateEdges;
std::vector<std::vector<NodeData>> updateNodes;
auto& net = galois::runtime::getSystemNetworkInterface();
updateNodes.resize(net.Num);
updateEdges.resize(net.Num);
for (auto& update : updateVector) {
if (update.isNode) {
updateNodes[graph->getHostID(update.node.id)].push_back(update.node);
} else {
for (auto& edge : update.edges) {
updateEdges[graph->getHostID(edge.src)].push_back(edge);
}
}
}

// Send vertex updates to the other hosts
for (unsigned i = 0; i < net.Num; i++) {
if (i == net.ID) {
continue;
}
galois::runtime::SendBuffer b;
galois::runtime::gSerialize(b, updateNodes[i]);
net.sendTagged(i, galois::runtime::evilPhase, std::move(b));
}

// Receive vertex updates from other hosts
for (uint32_t i = 0; i < net.Num - 1; i++) {
decltype(net.recieveTagged(galois::runtime::evilPhase)) p;
do {
p = net.recieveTagged(galois::runtime::evilPhase);
} while (!p);
std::vector<N> recvNodes;
galois::runtime::gDeserialize(p->second, recvNodes);
processNodes(recvNodes);
}
galois::runtime::evilPhase++;

// Send Edge updates to the other hosts
for (uint32_t i = 0; i < net.Num; i++) {
if (i == net.ID) {
continue;
}
galois::runtime::SendBuffer b;
galois::runtime::gSerialize(b, updateEdges[i]);
net.sendTagged(i, galois::runtime::evilPhase, std::move(b));
}

// Receive edge updates from other hosts
for (uint32_t i = 0; i < net.Num - 1; i++) {
decltype(net.recieveTagged(galois::runtime::evilPhase)) p;
do {
p = net.recieveTagged(galois::runtime::evilPhase);
} while (!p);
std::vector<E> recvEdges;
galois::runtime::gDeserialize(p->second, recvEdges);
processEdges(recvEdges);
}
galois::runtime::evilPhase++;

// Process own updates
processNodes(updateNodes[net.ID]);
processEdges(updateEdges[net.ID]);
}

template <typename N = NodeData, typename E = EdgeData>
void ingestFile() {
std::vector<std::string> files = fileParser->GetFiles();
Expand All @@ -104,53 +158,15 @@ class graphUpdateManager {

// Read each line from the stringstream
std::string line;
uint64_t lineNumber = 0;
std::vector<galois::graphs::ParsedGraphStructure<N, E>> parsedData;
while ((std::getline(inputFile, line))) {
processLine(line.c_str(), line.size());
lineNumber++;
if (lineNumber == batchSize) {
galois::runtime::getHostBarrier().wait();
std::this_thread::sleep_for(
std::chrono::milliseconds(periodForCheck));
graph->updateRanges();
lineNumber = 0;
}
parsedData.push_back(fileParser->ParseLine(
const_cast<char*>(line.c_str()), line.size()));
}
processUpdates(parsedData);
inputFile.close();
}
auto& net = galois::runtime::getSystemNetworkInterface();
net.flush();
stopIngest = true;
}

template <typename N = NodeData, typename E = EdgeData>
void checkForMessages() {
// check for messages
auto& net = galois::runtime::getSystemNetworkInterface();
while (!stopCheck) {
auto m = net.recieveTagged(galois::runtime::evilPhase);
if (m.has_value()) {
typename T::Task task;
galois::runtime::gDeserialize(m->second, task);
if (task == T::Task::ADD_VERTEX) {
std::vector<N> node;
galois::runtime::gDeserialize(m->second, node);
for (auto d : node)
graph->addVertex(d);
} else if (task == T::Task::ADD_EDGES) {
uint64_t src_node;
galois::runtime::gDeserialize(m->second, src_node);
std::vector<uint64_t> edge_dsts;
galois::runtime::gDeserialize(m->second, edge_dsts);
std::vector<E> edge_data;
galois::runtime::gDeserialize(m->second, edge_data);
std::vector<N> dst_data;
galois::runtime::gDeserialize(m->second, dst_data);
graph->addEdges(src_node, edge_dsts, edge_data, dst_data);
}
}
std::this_thread::sleep_for(
std::chrono::milliseconds(periodForCheck / (batchSize)));
}
}
};
7 changes: 1 addition & 6 deletions libwmd/test/wmd-graph-build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,8 @@ int main(int argc, char* argv[]) {
agile::workflow1::Edge>>(
10, filenames),
100, graph);
GUM.start();
// wait for GUM to finish
while (!GUM.stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
GUM.update();
galois::runtime::getHostBarrier().wait();
GUM.stop2();
}

// generate a file with sorted token of all nodes and its outgoing edge dst
Expand Down

0 comments on commit 74f886f

Please sign in to comment.