From 7895677e685251107c6150dbfaee2a2d5bee378b Mon Sep 17 00:00:00 2001 From: divija95 <60272396+divija95@users.noreply.github.com> Date: Wed, 17 Apr 2024 16:37:36 -0500 Subject: [PATCH] adding vertices dynamically (#50) * adding vertices dynamically * changing args --- inputs/wmd/dynamic0.txt | 12 ++- inputs/wmd/dynamic1.txt | 10 +++ .../galois/graphs/DistributedLocalGraph.h | 14 +++- .../galois/runtime/GraphUpdateManager.h | 77 +++++++++++-------- libwmd/test/wmd-graph-build.cpp | 24 ++++-- 5 files changed, 94 insertions(+), 43 deletions(-) diff --git a/inputs/wmd/dynamic0.txt b/inputs/wmd/dynamic0.txt index 37648532b..cd0bf997f 100644 --- a/inputs/wmd/dynamic0.txt +++ b/inputs/wmd/dynamic0.txt @@ -1,12 +1,21 @@ Author,346401281431409585,,,1114502034902546550,,,,, Sale,317248309514344163,1524681741257900519,,,,,8/3/2018,, +Publication,,,,,1535560814470027553,,8/1/2016,, +Publication,,,,,41837343332046478,,10/1/2014,, +Publication,,,,,1207612033585320625,,7/1/2013,, +Publication,,,,,1453261312384284172,,5/10/2016,, +Publication,,,,,622293910371914414,,7/6/2015,, +Publication,,,,,1305145439855521555,,5/18/2016,, HasTopic,,,,618434247743641149,,3884230,,, +Publication,,,,,300559162266445330,,7/1/2016,, Sale,1243472362254658420,205415260510814362,,,,,8/9/2018,, +Topic,,,,,,34433,,51.755,-1.255 Sale,1472154222902711100,529550602103217450,,,,185785,2/17/2019,, Includes,,,1314315120197156050,803952155714850701,,,,, Author,1262668194076216011,,,747423119260925972,,,,, HasTopic,,,,833681012494554358,,787185,,, HasTopic,,,,932362105613871012,,160409,,, +Topic,,,,,,8050277,,, Sale,1125113326787431160,437201545096608055,,,,146,10/14/2018,, HasTopic,,,,1424660009578332566,,334600,,, HasTopic,,,451888058015735870,,,121765,,, @@ -26,13 +35,14 @@ HasTopic,,,,740410432146852843,,5088838,,, Includes,,,15133734353741126,1532662490035322233,,,,, HasTopic,,,,817526874194673140,,18122778,,, HasTopic,,,,440265285168056234,,102014,,, +HasTopic,,,,,1207612033585320625,8050277,,, HasTopic,,,,186108460103013588,,732934,,, Includes,,,1615340315424362057,209800678458482108,,,,, Author,373641740834326257,,,116892402526543412,,,,, HasTopic,,,,740410432146852843,,122113,,, +Author,1443434356636157084,,,,1535560814470027553,,,, Includes,,,1615340315424362057,440265285168056234,,,,, Author,719533111062900642,,,,1433303251800176474,,,, -HasTopic,,,,420762134340393550,,10289,,, HasTopic,,,,1184855350262395542,,11348,,, HasTopic,,,,82629615412640377,,247154,,, Author,369370063627142227,,,1184855350262395542,,,,, diff --git a/inputs/wmd/dynamic1.txt b/inputs/wmd/dynamic1.txt index 8f13aa07e..c5b2ec61d 100644 --- a/inputs/wmd/dynamic1.txt +++ b/inputs/wmd/dynamic1.txt @@ -17,6 +17,14 @@ Sale,895197896920634500,1508332501512270227,,,,487,7/31/2018,, HasTopic,,,,321724159614056152,,158668,,, Includes,,,1427292001647224242,240337224527030225,,,,, HasTopic,,,,1184855350262395542,,223155,,, +Publication,,,,,336942091506354249,,6/1/2016,, +Publication,,,,,476478857527116646,,9/1/2016,, +Topic,,,,,,154709,,, +Publication,,,,,732672612144374614,,1/1/2016,, +Publication,,,,,964200931450858755,,7/12/2016,, +Publication,,,,,525224971033019325,,1/1/2016,, +Publication,,,,,330281200641323539,,9/1/2016,, +Person,1443434356636157084,,,,,,,, Includes,,,1615340315424362057,1184855350262395542,,,,, HasTopic,,,,1532662490035322233,,1337691,,, Author,1352636429150180228,,,186108460103013588,,,,, @@ -34,6 +42,7 @@ Sale,351354309273100074,1073324208204442390,,,,206021,8/7/2018,, Sale,1025135622623992536,1001287904525368324,,,,,10/2/2018,, Author,12321118467056216,,,1512214307542520410,,,,, HasTopic,,,,932362105613871012,,60,,, +HasOrg,,,,,732672612144374614,34433,,, HasTopic,,,,803952155714850701,,5,,, HasTopic,,,,1220295546212024391,,18031504,,, HasTopic,,,,91431002216341149,,73843,,, @@ -41,4 +50,5 @@ Author,437573095319558705,,,447169043921403064,,,,, HasTopic,,,,1424263331858043042,,2566598,,, Includes,,,15133734353741126,1285128710332882742,,,,, HasTopic,,,,1513662032452523252,,466439,,, +HasTopic,,,,,300559162266445330,154709,,, Includes,,,1314315120197156050,393285992310638641,,,,, diff --git a/libcusp/include/galois/graphs/DistributedLocalGraph.h b/libcusp/include/galois/graphs/DistributedLocalGraph.h index a4cd3f91a..91935ec55 100644 --- a/libcusp/include/galois/graphs/DistributedLocalGraph.h +++ b/libcusp/include/galois/graphs/DistributedLocalGraph.h @@ -962,9 +962,11 @@ class DistLocalGraph { localToGlobalVector.push_back(src); globalToLocalMap[src] = localToGlobalVector.size() - 1; numNodes++; + } else { } numOwned++; } else { + assert(globalToLocalMap.find(src) != globalToLocalMap.end()); uint64_t srcLID = globalToLocalMap[src]; if (edge_begin(srcLID) == edge_end(srcLID)) { numNodesWithEdges++; @@ -995,13 +997,16 @@ class DistLocalGraph { } template - void addVertex(uint64_t token, T data) { + void addVertex(T data) { + uint64_t token = data.id; uint64_t belongsTo = getHostID(token); + std::vector dataVec; + dataVec.push_back(data); if (belongsTo == id) { updateVariables(true, token); - // graph->setData(getLID(token), data); + graph->addVertices(dataVec); } else { - sendModifyRequest(belongsTo, ADD_VERTEX, token, data); + sendModifyRequest(belongsTo, ADD_VERTEX, dataVec); } } @@ -1029,8 +1034,9 @@ class DistLocalGraph { lids.push_back(getLID(dsts[i])); } graph->addEdges(getLID(src), lids, data); + } else { - sendModifyRequest(belongsTo, src, dsts, data); + sendModifyRequest(belongsTo, ADD_EDGES, src, dsts, data); } } diff --git a/libgalois/include/galois/runtime/GraphUpdateManager.h b/libgalois/include/galois/runtime/GraphUpdateManager.h index cb563cae6..612ade11d 100644 --- a/libgalois/include/galois/runtime/GraphUpdateManager.h +++ b/libgalois/include/galois/runtime/GraphUpdateManager.h @@ -18,9 +18,8 @@ class graphUpdateManager { graphUpdateManager() = default; graphUpdateManager( std::unique_ptr> parser, - std::string inputFile, int period, T* distGraphPtr) { + int period, T* distGraphPtr) { periodForCheck = period; - graphFile = inputFile; graph = distGraphPtr; fileParser = std::move(parser); } @@ -78,35 +77,44 @@ class graphUpdateManager { void processLine(const char* line, size_t len) { galois::graphs::ParsedGraphStructure value = fileParser->ParseLine(const_cast(line), len); - for (auto& edge : value.edges) { - std::vector dsts; - dsts.push_back(edge.dst); - std::vector data; - data.push_back(edge); - graph->addEdges(edge.src, dsts, data); + if (value.isNode) + graph->addVertex(value.node); + else { + for (auto& edge : value.edges) { + std::vector dsts; + dsts.push_back(edge.dst); + std::vector data; + data.push_back(edge); + graph->addEdges(edge.src, dsts, data); + } } } template void ingestFile() { - std::ifstream inputFile(graphFile); - if (!inputFile.is_open()) { - std::cerr << "Error opening file: " << graphFile << "\n"; - return; - } + std::vector files = fileParser->GetFiles(); + for (auto& file : files) { + std::ifstream inputFile(file); + if (!inputFile.is_open()) { + std::cerr << "Error opening file: " << graphFile << "\n"; + return; + } - // Read each line from the stringstream - std::string line; - uint64_t lineNumber = 0; - while ((std::getline(inputFile, line))) { - processLine(line.c_str(), line.size()); - lineNumber++; - if (lineNumber == batchSize) { - std::this_thread::sleep_for(std::chrono::milliseconds(periodForCheck)); - lineNumber = 0; + // Read each line from the stringstream + std::string line; + uint64_t lineNumber = 0; + while ((std::getline(inputFile, line))) { + processLine(line.c_str(), line.size()); + lineNumber++; + if (lineNumber == batchSize) { + galois::runtime::getHostBarrier().wait(); + std::this_thread::sleep_for( + std::chrono::milliseconds(periodForCheck)); + lineNumber = 0; + } } + inputFile.close(); } - inputFile.close(); auto& net = galois::runtime::getSystemNetworkInterface(); net.flush(); stopIngest = true; @@ -119,13 +127,22 @@ class graphUpdateManager { while (!stopCheck) { auto m = net.recieveTagged(galois::runtime::evilPhase); if (m.has_value()) { - uint64_t src_node; - galois::runtime::gDeserialize(m->second, src_node); - std::vector edge_dsts; - galois::runtime::gDeserialize(m->second, edge_dsts); - std::vector edge_data; - galois::runtime::gDeserialize(m->second, edge_data); - graph->addEdges(src_node, edge_dsts, edge_data); + typename T::Task task; + galois::runtime::gDeserialize(m->second, task); + if (task == T::Task::ADD_VERTEX) { + std::vector node; + galois::runtime::gDeserialize(m->second, node); + for (auto d : node) + graph->addVertex(d); + } else if (task == T::Task::ADD_EDGES) { + uint64_t src_node; + galois::runtime::gDeserialize(m->second, src_node); + std::vector edge_dsts; + galois::runtime::gDeserialize(m->second, edge_dsts); + std::vector edge_data; + galois::runtime::gDeserialize(m->second, edge_data); + graph->addEdges(src_node, edge_dsts, edge_data); + } } std::this_thread::sleep_for( std::chrono::milliseconds(periodForCheck / (batchSize))); diff --git a/libwmd/test/wmd-graph-build.cpp b/libwmd/test/wmd-graph-build.cpp index 8b5742d2d..15d76a1fe 100644 --- a/libwmd/test/wmd-graph-build.cpp +++ b/libwmd/test/wmd-graph-build.cpp @@ -42,7 +42,8 @@ void insertEdge( void parser(std::string line, std::unordered_map>>& vertices) { + std::pair>>& vertices, + bool dynamic = false, bool readNodes = true) { if (line.find("//") != std::string::npos || line.find("#") != std::string::npos) { return; @@ -65,7 +66,7 @@ void parser(std::string line, bool isNode = tokens[0] == "Person" || tokens[0] == "ForumEvent" || tokens[0] == "Forum" || tokens[0] == "Publication" || tokens[0] == "Topic"; - if (isNode) { + if ((isNode && !dynamic) || (isNode && readNodes)) { uint64_t id = 0; agile::workflow1::TYPES vertexType = agile::workflow1::TYPES::NONE; if (tokens[0] == "Person") { @@ -88,7 +89,7 @@ void parser(std::string line, } vertices[id] = std::pair>(vertexType, std::vector()); - } else { + } else if ((!isNode && !dynamic) || (!isNode && !readNodes)) { Edge edge(tokens); insertEdge(edge, vertices); // Inverse edge @@ -118,13 +119,14 @@ void parser(std::string line, void getDataFromGraph( std::string& filename, std::unordered_map>>& - vertices) { + vertices, + bool dynamic = false, bool readNodes = true) { // read file line by line std::string line; std::ifstream myfile(filename); if (myfile.is_open()) { while (getline(myfile, line)) { - parser(line, vertices); + parser(line, vertices, dynamic, readNodes); } myfile.close(); } else { @@ -189,12 +191,13 @@ int main(int argc, char* argv[]) { if (dynFile != "") { std::string dynamicFile = dynFile + std::to_string(net.ID) + ".txt"; - + std::vector filenames; + filenames.emplace_back(dynamicFile); graphUpdateManager GUM( std::make_unique>( 10, filenames), - dynamicFile, 100, graph); + 100, graph); GUM.start(); // wait for GUM to finish while (!GUM.stop()) { @@ -249,9 +252,14 @@ int main(int argc, char* argv[]) { if (net.ID == 0) { getDataFromGraph(file, vertices); if (dynFile != "") { + // Read vertices only first, and then only edges + for (uint32_t i = 0; i < net.Num; i++) { + std::string dynamicFile = dynFile + std::to_string(i) + ".txt"; + getDataFromGraph(dynamicFile, vertices, true, true); + } for (uint32_t i = 0; i < net.Num; i++) { std::string dynamicFile = dynFile + std::to_string(i) + ".txt"; - getDataFromGraph(dynamicFile, vertices); + getDataFromGraph(dynamicFile, vertices, true, false); } } // compare with vertices