Skip to content

Commit

Permalink
adding vertices dynamically (#50)
Browse files Browse the repository at this point in the history
* adding vertices dynamically

* changing args
  • Loading branch information
divija95 authored Apr 17, 2024
1 parent 943c184 commit 7895677
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 43 deletions.
12 changes: 11 additions & 1 deletion inputs/wmd/dynamic0.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
Author,346401281431409585,,,1114502034902546550,,,,,
Sale,317248309514344163,1524681741257900519,,,,,8/3/2018,,
Publication,,,,,1535560814470027553,,8/1/2016,,
Publication,,,,,41837343332046478,,10/1/2014,,
Publication,,,,,1207612033585320625,,7/1/2013,,
Publication,,,,,1453261312384284172,,5/10/2016,,
Publication,,,,,622293910371914414,,7/6/2015,,
Publication,,,,,1305145439855521555,,5/18/2016,,
HasTopic,,,,618434247743641149,,3884230,,,
Publication,,,,,300559162266445330,,7/1/2016,,
Sale,1243472362254658420,205415260510814362,,,,,8/9/2018,,
Topic,,,,,,34433,,51.755,-1.255
Sale,1472154222902711100,529550602103217450,,,,185785,2/17/2019,,
Includes,,,1314315120197156050,803952155714850701,,,,,
Author,1262668194076216011,,,747423119260925972,,,,,
HasTopic,,,,833681012494554358,,787185,,,
HasTopic,,,,932362105613871012,,160409,,,
Topic,,,,,,8050277,,,
Sale,1125113326787431160,437201545096608055,,,,146,10/14/2018,,
HasTopic,,,,1424660009578332566,,334600,,,
HasTopic,,,451888058015735870,,,121765,,,
Expand All @@ -26,13 +35,14 @@ HasTopic,,,,740410432146852843,,5088838,,,
Includes,,,15133734353741126,1532662490035322233,,,,,
HasTopic,,,,817526874194673140,,18122778,,,
HasTopic,,,,440265285168056234,,102014,,,
HasTopic,,,,,1207612033585320625,8050277,,,
HasTopic,,,,186108460103013588,,732934,,,
Includes,,,1615340315424362057,209800678458482108,,,,,
Author,373641740834326257,,,116892402526543412,,,,,
HasTopic,,,,740410432146852843,,122113,,,
Author,1443434356636157084,,,,1535560814470027553,,,,
Includes,,,1615340315424362057,440265285168056234,,,,,
Author,719533111062900642,,,,1433303251800176474,,,,
HasTopic,,,,420762134340393550,,10289,,,
HasTopic,,,,1184855350262395542,,11348,,,
HasTopic,,,,82629615412640377,,247154,,,
Author,369370063627142227,,,1184855350262395542,,,,,
Expand Down
10 changes: 10 additions & 0 deletions inputs/wmd/dynamic1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ Sale,895197896920634500,1508332501512270227,,,,487,7/31/2018,,
HasTopic,,,,321724159614056152,,158668,,,
Includes,,,1427292001647224242,240337224527030225,,,,,
HasTopic,,,,1184855350262395542,,223155,,,
Publication,,,,,336942091506354249,,6/1/2016,,
Publication,,,,,476478857527116646,,9/1/2016,,
Topic,,,,,,154709,,,
Publication,,,,,732672612144374614,,1/1/2016,,
Publication,,,,,964200931450858755,,7/12/2016,,
Publication,,,,,525224971033019325,,1/1/2016,,
Publication,,,,,330281200641323539,,9/1/2016,,
Person,1443434356636157084,,,,,,,,
Includes,,,1615340315424362057,1184855350262395542,,,,,
HasTopic,,,,1532662490035322233,,1337691,,,
Author,1352636429150180228,,,186108460103013588,,,,,
Expand All @@ -34,11 +42,13 @@ Sale,351354309273100074,1073324208204442390,,,,206021,8/7/2018,,
Sale,1025135622623992536,1001287904525368324,,,,,10/2/2018,,
Author,12321118467056216,,,1512214307542520410,,,,,
HasTopic,,,,932362105613871012,,60,,,
HasOrg,,,,,732672612144374614,34433,,,
HasTopic,,,,803952155714850701,,5,,,
HasTopic,,,,1220295546212024391,,18031504,,,
HasTopic,,,,91431002216341149,,73843,,,
Author,437573095319558705,,,447169043921403064,,,,,
HasTopic,,,,1424263331858043042,,2566598,,,
Includes,,,15133734353741126,1285128710332882742,,,,,
HasTopic,,,,1513662032452523252,,466439,,,
HasTopic,,,,,300559162266445330,154709,,,
Includes,,,1314315120197156050,393285992310638641,,,,,
14 changes: 10 additions & 4 deletions libcusp/include/galois/graphs/DistributedLocalGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -962,9 +962,11 @@ class DistLocalGraph {
localToGlobalVector.push_back(src);
globalToLocalMap[src] = localToGlobalVector.size() - 1;
numNodes++;
} else {
}
numOwned++;
} else {
assert(globalToLocalMap.find(src) != globalToLocalMap.end());
uint64_t srcLID = globalToLocalMap[src];
if (edge_begin(srcLID) == edge_end(srcLID)) {
numNodesWithEdges++;
Expand Down Expand Up @@ -995,13 +997,16 @@ class DistLocalGraph {
}

template <typename T>
void addVertex(uint64_t token, T data) {
void addVertex(T data) {
uint64_t token = data.id;
uint64_t belongsTo = getHostID(token);
std::vector<T> dataVec;
dataVec.push_back(data);
if (belongsTo == id) {
updateVariables(true, token);
// graph->setData(getLID(token), data);
graph->addVertices(dataVec);
} else {
sendModifyRequest(belongsTo, ADD_VERTEX, token, data);
sendModifyRequest(belongsTo, ADD_VERTEX, dataVec);
}
}

Expand Down Expand Up @@ -1029,8 +1034,9 @@ class DistLocalGraph {
lids.push_back(getLID(dsts[i]));
}
graph->addEdges(getLID(src), lids, data);

} else {
sendModifyRequest(belongsTo, src, dsts, data);
sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data);
}
}

Expand Down
77 changes: 47 additions & 30 deletions libgalois/include/galois/runtime/GraphUpdateManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ class graphUpdateManager {
graphUpdateManager() = default;
graphUpdateManager(
std::unique_ptr<galois::graphs::FileParser<NodeData, EdgeData>> parser,
std::string inputFile, int period, T* distGraphPtr) {
int period, T* distGraphPtr) {
periodForCheck = period;
graphFile = inputFile;
graph = distGraphPtr;
fileParser = std::move(parser);
}
Expand Down Expand Up @@ -78,35 +77,44 @@ class graphUpdateManager {
void processLine(const char* line, size_t len) {
galois::graphs::ParsedGraphStructure<N, E> value =
fileParser->ParseLine(const_cast<char*>(line), len);
for (auto& edge : value.edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
std::vector<E> data;
data.push_back(edge);
graph->addEdges(edge.src, dsts, data);
if (value.isNode)
graph->addVertex(value.node);
else {
for (auto& edge : value.edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
std::vector<E> data;
data.push_back(edge);
graph->addEdges(edge.src, dsts, data);
}
}
}

template <typename N = NodeData, typename E = EdgeData>
void ingestFile() {
std::ifstream inputFile(graphFile);
if (!inputFile.is_open()) {
std::cerr << "Error opening file: " << graphFile << "\n";
return;
}
std::vector<std::string> files = fileParser->GetFiles();
for (auto& file : files) {
std::ifstream inputFile(file);
if (!inputFile.is_open()) {
std::cerr << "Error opening file: " << graphFile << "\n";
return;
}

// Read each line from the stringstream
std::string line;
uint64_t lineNumber = 0;
while ((std::getline(inputFile, line))) {
processLine(line.c_str(), line.size());
lineNumber++;
if (lineNumber == batchSize) {
std::this_thread::sleep_for(std::chrono::milliseconds(periodForCheck));
lineNumber = 0;
// Read each line from the stringstream
std::string line;
uint64_t lineNumber = 0;
while ((std::getline(inputFile, line))) {
processLine(line.c_str(), line.size());
lineNumber++;
if (lineNumber == batchSize) {
galois::runtime::getHostBarrier().wait();
std::this_thread::sleep_for(
std::chrono::milliseconds(periodForCheck));
lineNumber = 0;
}
}
inputFile.close();
}
inputFile.close();
auto& net = galois::runtime::getSystemNetworkInterface();
net.flush();
stopIngest = true;
Expand All @@ -119,13 +127,22 @@ class graphUpdateManager {
while (!stopCheck) {
auto m = net.recieveTagged(galois::runtime::evilPhase);
if (m.has_value()) {
uint64_t src_node;
galois::runtime::gDeserialize(m->second, src_node);
std::vector<uint64_t> edge_dsts;
galois::runtime::gDeserialize(m->second, edge_dsts);
std::vector<E> edge_data;
galois::runtime::gDeserialize(m->second, edge_data);
graph->addEdges(src_node, edge_dsts, edge_data);
typename T::Task task;
galois::runtime::gDeserialize(m->second, task);
if (task == T::Task::ADD_VERTEX) {
std::vector<N> node;
galois::runtime::gDeserialize(m->second, node);
for (auto d : node)
graph->addVertex(d);
} else if (task == T::Task::ADD_EDGES) {
uint64_t src_node;
galois::runtime::gDeserialize(m->second, src_node);
std::vector<uint64_t> edge_dsts;
galois::runtime::gDeserialize(m->second, edge_dsts);
std::vector<E> edge_data;
galois::runtime::gDeserialize(m->second, edge_data);
graph->addEdges(src_node, edge_dsts, edge_data);
}
}
std::this_thread::sleep_for(
std::chrono::milliseconds(periodForCheck / (batchSize)));
Expand Down
24 changes: 16 additions & 8 deletions libwmd/test/wmd-graph-build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ void insertEdge(

void parser(std::string line,
std::unordered_map<std::uint64_t,
std::pair<TYPES, std::vector<Edge>>>& vertices) {
std::pair<TYPES, std::vector<Edge>>>& vertices,
bool dynamic = false, bool readNodes = true) {
if (line.find("//") != std::string::npos ||
line.find("#") != std::string::npos) {
return;
Expand All @@ -65,7 +66,7 @@ void parser(std::string line,
bool isNode = tokens[0] == "Person" || tokens[0] == "ForumEvent" ||
tokens[0] == "Forum" || tokens[0] == "Publication" ||
tokens[0] == "Topic";
if (isNode) {
if ((isNode && !dynamic) || (isNode && readNodes)) {
uint64_t id = 0;
agile::workflow1::TYPES vertexType = agile::workflow1::TYPES::NONE;
if (tokens[0] == "Person") {
Expand All @@ -88,7 +89,7 @@ void parser(std::string line,
}
vertices[id] =
std::pair<TYPES, std::vector<Edge>>(vertexType, std::vector<Edge>());
} else {
} else if ((!isNode && !dynamic) || (!isNode && !readNodes)) {
Edge edge(tokens);
insertEdge(edge, vertices);
// Inverse edge
Expand Down Expand Up @@ -118,13 +119,14 @@ void parser(std::string line,
void getDataFromGraph(
std::string& filename,
std::unordered_map<std::uint64_t, std::pair<TYPES, std::vector<Edge>>>&
vertices) {
vertices,
bool dynamic = false, bool readNodes = true) {
// read file line by line
std::string line;
std::ifstream myfile(filename);
if (myfile.is_open()) {
while (getline(myfile, line)) {
parser(line, vertices);
parser(line, vertices, dynamic, readNodes);
}
myfile.close();
} else {
Expand Down Expand Up @@ -189,12 +191,13 @@ int main(int argc, char* argv[]) {

if (dynFile != "") {
std::string dynamicFile = dynFile + std::to_string(net.ID) + ".txt";

std::vector<std::string> filenames;
filenames.emplace_back(dynamicFile);
graphUpdateManager<agile::workflow1::Vertex, agile::workflow1::Edge> GUM(
std::make_unique<galois::graphs::WMDParser<agile::workflow1::Vertex,
agile::workflow1::Edge>>(
10, filenames),
dynamicFile, 100, graph);
100, graph);
GUM.start();
// wait for GUM to finish
while (!GUM.stop()) {
Expand Down Expand Up @@ -249,9 +252,14 @@ int main(int argc, char* argv[]) {
if (net.ID == 0) {
getDataFromGraph(file, vertices);
if (dynFile != "") {
// Read vertices only first, and then only edges
for (uint32_t i = 0; i < net.Num; i++) {
std::string dynamicFile = dynFile + std::to_string(i) + ".txt";
getDataFromGraph(dynamicFile, vertices, true, true);
}
for (uint32_t i = 0; i < net.Num; i++) {
std::string dynamicFile = dynFile + std::to_string(i) + ".txt";
getDataFromGraph(dynamicFile, vertices);
getDataFromGraph(dynamicFile, vertices, true, false);
}
}
// compare with vertices
Expand Down

0 comments on commit 7895677

Please sign in to comment.