Skip to content

Commit

Permalink
changes to support edits (#56)
Browse files Browse the repository at this point in the history
* changes to support edits

* fixing pre-commit issue

* fixing pre-commit

* fixing minor bug
  • Loading branch information
divija95 authored Apr 19, 2024
1 parent 5835f9b commit bd29284
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
39 changes: 29 additions & 10 deletions libcusp/include/galois/graphs/DistributedLocalGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ class DistLocalGraph {
*/
inline void determineThreadRangesMaster() {
// make sure this hasn't been called before
assert(masterRanges.size() == 0);
if (masterRanges.size() != 0)
masterRanges.clear();

// first check if we even need to do any work; if already calculated,
// use already calculated vector
Expand All @@ -780,7 +781,8 @@ class DistLocalGraph {
*/
inline void determineThreadRangesWithEdges() {
// make sure not called before
assert(withEdgeRanges.size() == 0);
if (withEdgeRanges.size() != 0)
withEdgeRanges.clear();

// first check if we even need to do any work; if already calculated,
// use already calculated vector
Expand All @@ -802,7 +804,8 @@ class DistLocalGraph {
* over the graph in different ways.
*/
void initializeSpecificRanges() {
assert(specificRanges.size() == 0);
if (specificRanges.size() != 0)
specificRanges.clear();

// TODO/FIXME assertion likely not safe if a host gets no nodes
// make sure the thread ranges have already been calculated
Expand Down Expand Up @@ -949,20 +952,27 @@ class DistLocalGraph {
host, galois::runtime::evilPhase, std::move(b));
}

void updateRanges() {
determineThreadRanges();
determineThreadRangesMaster();
determineThreadRangesWithEdges();
initializeSpecificRanges();
}

// Assumptions:
// 1. A vertex is added before any edges are added to it
// 2. No support for deleting edges/vertices yet
// 3. Only works for OEC
void
updateVariables(bool isVertex, uint64_t src,
std::optional<std::vector<uint64_t>> dsts = std::nullopt) {
std::optional<std::vector<uint64_t>> dsts = std::nullopt,
std::optional<std::vector<NodeTy>> dstData = std::nullopt) {

if (isVertex) {
if (globalToLocalMap.find(src) == globalToLocalMap.end()) {
localToGlobalVector.push_back(src);
globalToLocalMap[src] = localToGlobalVector.size() - 1;
numNodes++;
} else {
}
numOwned++;
} else {
Expand All @@ -971,14 +981,24 @@ class DistLocalGraph {
if (edge_begin(srcLID) == edge_end(srcLID)) {
numNodesWithEdges++;
}
uint32_t i = 0;
for (auto token : dsts.value()) {
if (globalToLocalMap.find(token) == globalToLocalMap.end()) {
localToGlobalVector.push_back(token);
globalToLocalMap[token] = localToGlobalVector.size() - 1;
numNodes++;
numNodesWithEdges++;
std::vector<NodeTy> data;
data.push_back(dstData.value()[i]);
graph->addVertices(data);
}
i++;
if (!isOwned(token)) {
mirrorNodes[getHostID(token)].push_back(token);
} else {
if (edge_begin(getLID(token)) == edge_end(getLID(token))) {
numNodesWithEdges++;
}
}
}
numEdges += dsts.value().size();
Expand All @@ -990,7 +1010,7 @@ class DistLocalGraph {
uint64_t belongsTo = getHostID(token);
if (belongsTo == id) {
updateVariables(true, token);
// graph->addVertexTopologyOnly();
graph->addVertexTopologyOnly();
} else {
sendModifyRequest(belongsTo, ADD_VERTEX_TOPOLOGY_ONLY, token);
}
Expand Down Expand Up @@ -1025,18 +1045,17 @@ class DistLocalGraph {
}

void addEdges(uint64_t src, std::vector<uint64_t> dsts,
std::vector<EdgeTy> data) {
std::vector<EdgeTy> data, std::vector<NodeTy> dstData) {
uint64_t belongsTo = getHostID(src);
if (belongsTo == id) {
updateVariables(false, src, dsts);
updateVariables(false, src, dsts, dstData);
std::vector<uint64_t> lids;
for (uint32_t i = 0; i < dsts.size(); i++) {
lids.push_back(getLID(dsts[i]));
}
graph->addEdges(getLID(src), lids, data);

} else {
sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data);
sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data, dstData);
}
}

Expand Down
9 changes: 7 additions & 2 deletions libgalois/include/galois/runtime/GraphUpdateManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class graphUpdateManager {
stopCheck = true;
while (!checkThread.joinable())
;
graph->updateRanges();
checkThread.join();
return stopIngest;
}
Expand Down Expand Up @@ -85,7 +86,8 @@ class graphUpdateManager {
dsts.push_back(edge.dst);
std::vector<E> data;
data.push_back(edge);
graph->addEdges(edge.src, dsts, data);
std::vector<N> dstData = fileParser->GetDstData(value.edges);
graph->addEdges(edge.src, dsts, data, dstData);
}
}
}
Expand All @@ -110,6 +112,7 @@ class graphUpdateManager {
galois::runtime::getHostBarrier().wait();
std::this_thread::sleep_for(
std::chrono::milliseconds(periodForCheck));
graph->updateRanges();
lineNumber = 0;
}
}
Expand Down Expand Up @@ -141,7 +144,9 @@ class graphUpdateManager {
galois::runtime::gDeserialize(m->second, edge_dsts);
std::vector<E> edge_data;
galois::runtime::gDeserialize(m->second, edge_data);
graph->addEdges(src_node, edge_dsts, edge_data);
std::vector<N> dst_data;
galois::runtime::gDeserialize(m->second, dst_data);
graph->addEdges(src_node, edge_dsts, edge_data, dst_data);
}
}
std::this_thread::sleep_for(
Expand Down
8 changes: 8 additions & 0 deletions libwmd/include/galois/wmd/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FileParser {
virtual ~FileParser() {}
virtual ParsedGraphStructure<V, E> ParseLine(char* line,
uint64_t lineLength) = 0;
virtual std::vector<V> GetDstData(std::vector<E>& edges) = 0;
static std::vector<std::string> SplitLine(const char* line,
uint64_t lineLength, char delim,
uint64_t numTokens) {
Expand Down Expand Up @@ -171,6 +172,13 @@ class WMDParser : public FileParser<V, E> {
return ParsedGraphStructure<V, E>(edges);
}
}
std::vector<V> GetDstData(std::vector<E>& edges) override {
std::vector<V> dstData;
for (auto& edge : edges) {
dstData.emplace_back(edge.dst, 0, edge.dst_type);
}
return dstData;
}

private:
uint64_t csvFields_;
Expand Down

0 comments on commit bd29284

Please sign in to comment.