Skip to content

Commit

Permalink
Changes to support Edgelist datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
divija95 committed Sep 4, 2024
1 parent cf65e84 commit b7bd738
Show file tree
Hide file tree
Showing 9 changed files with 10,975 additions and 56 deletions.
10,447 changes: 10,447 additions & 0 deletions inputs/rmat10.el

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions libcusp/include/galois/graphs/DistributedLocalGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,15 @@ class DistLocalGraph {
* @param mflag access flag for edge data
* @returns The edge data for the requested edge
*/
inline EdgeTy& getEdgeData(GraphNode src, edge_iterator ni) {
inline typename std::enable_if<!std::is_void<EdgeTy>::value, edge_iterator>
getEdgeData(GraphNode src, edge_iterator ni) {
GraphNode dst = getEdgeDst(ni);
auto& r = graph->getEdgeData(std::make_pair(src, dst));
return r;
}

inline EdgeTy& getEdgeData(edge_iterator ni) {
inline typename std::enable_if<!std::is_void<EdgeTy>::value, edge_iterator>
getEdgeData(edge_iterator ni) {
auto& r = graph->getEdgeData(*ni);
return r;
}
Expand Down Expand Up @@ -942,12 +944,6 @@ class DistLocalGraph {
graph.setEdgeData(eh, data);
}

template <typename T = NodeTy>
typename std::enable_if<!std::is_void<T>::value, EdgeTy&>::type
getEdgeData(edge_iterator eh) {
return graph.getEdgeData(eh);
}

enum Task {
ADD_VERTEX,
ADD_VERTEX_TOPOLOGY_ONLY,
Expand Down
46 changes: 28 additions & 18 deletions libgalois/include/galois/runtime/GraphUpdateManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

using namespace agile::workflow1;

template <typename NodeData, typename EdgeData>
template <typename NodeData, typename EdgeData,
typename NodeTy = agile::workflow1::Vertex,
typename EdgeTy = agile::workflow1::Edge,
typename OECPolicy = OECPolicy>
class graphUpdateManager {
public:
using T = galois::graphs::DistLocalGraph<NodeData, EdgeData>;
using T = galois::graphs::WMDGraph<NodeData, EdgeData, NodeTy, EdgeTy, OECPolicy>;
graphUpdateManager() = default;
graphUpdateManager(
std::unique_ptr<galois::graphs::FileParser<NodeData, EdgeData>> parser,
Expand Down Expand Up @@ -43,14 +46,28 @@ class graphUpdateManager {
T* graph;
std::unique_ptr<galois::graphs::FileParser<NodeData, EdgeData>> fileParser;

void processNodes(std::vector<NodeData>& nodes) {
template <typename N = NodeData>
typename std::enable_if<std::is_same<N, agile::workflow1::Vertex>::value,
void>::type
processNodes(std::vector<NodeData>& nodes) {
for (auto& node : nodes) {
graph->addVertex(node);
}
}

template <typename N = NodeData>
typename std::enable_if<!std::is_same<N, agile::workflow1::Vertex>::value,
void>::type
processNodes(std::vector<N>& nodes) {
for (auto& node : nodes) {
graph->addVertexTopologyOnly(node.id);
}
}

template <typename N = NodeData, typename E = EdgeData>
void processEdges(std::vector<E>& edges) {
typename std::enable_if<std::is_same<N, agile::workflow1::Vertex>::value,
void>::type
processEdges(std::vector<E>& edges) {
for (auto& edge : edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
Expand All @@ -62,20 +79,13 @@ class graphUpdateManager {
}

template <typename N = NodeData, typename E = EdgeData>
void processLine(const char* line, size_t len) {
galois::graphs::ParsedGraphStructure<N, E> value =
fileParser->ParseLine(const_cast<char*>(line), len);
if (value.isNode)
graph->addVertex(value.node);
else {
for (auto& edge : value.edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
std::vector<E> data;
data.push_back(edge);
std::vector<N> dstData = fileParser->GetDstData(value.edges);
graph->addEdges(edge.src, dsts, data, dstData);
}
typename std::enable_if<!std::is_same<N, agile::workflow1::Vertex>::value,
void>::type
processEdges(std::vector<E>& edges) {
for (auto& edge : edges) {
std::vector<uint64_t> dsts;
dsts.push_back(edge.dst);
graph->addEdgesTopologyOnly(edge.src, dsts);
}
}

Expand Down
218 changes: 205 additions & 13 deletions libwmd/include/galois/wmd/WMDGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@
namespace galois {
namespace graphs {

struct ELVertex {
std::uint64_t id;
constexpr operator std::uint64_t() { return id; }
};

struct ELEdge {
std::uint64_t src;
std::uint64_t dst;
constexpr operator std::uint64_t() { return src; }
bool operator<(const ELEdge& other) const {
return (src == other.src) ? (dst < other.dst) : (src < other.src);
}
};

void inline increment_evilPhase() {
++galois::runtime::evilPhase;
if (galois::runtime::evilPhase >=
Expand Down Expand Up @@ -671,8 +685,6 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
// number of edges in the entire graph (not just locallly loaded portion)
uint64_t globalEdgeSize = 0;

// number of nodes loaded into this graph
uint32_t numLocalNodes = 0;
// number of edges loaded into this graph
uint64_t numLocalEdges = 0;
// offset of local to global node id
Expand All @@ -685,7 +697,49 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
std::vector<uint64_t> offsets; // offsets[numLocalNodes] point to end of edges
std::vector<EdgeDataType> edges;

void
template <typename T = EdgeDataType>
typename std::enable_if<std::is_same<T, ELEdge>::value>::type
exchangeLocalNodeSize(WMDOfflineGraph<NodeDataType, EdgeDataType>& srcGraph) {
auto& net = galois::runtime::getSystemNetworkInterface();
globalNodeOffset.resize(numHosts);
localNodeSize.resize(numHosts);
numNodes = 0;

// send vertex size to other hosts
for (uint32_t h = 0; h < numHosts; ++h) {
if (h == hostID) {
continue;
}
// serialize size_t
galois::runtime::SendBuffer sendBuffer;
galois::runtime::gSerialize(sendBuffer, numLocalNodes);
net.sendTagged(h, galois::runtime::evilPhase, std::move(sendBuffer));
}

for (uint32_t h = 0; h < numHosts - 1; h++) {
decltype(net.recieveTagged(galois::runtime::evilPhase)) p;
do {
p = net.recieveTagged(galois::runtime::evilPhase);
} while (!p);
// deserialize local_node_size
galois::runtime::gDeserialize(p->second, localNodeSize[p->first]);
}

localNodeSize[hostID] = numLocalNodes;
numNodes = localNodeSize[hostID];
// compute prefix sum to get offset
globalNodeOffset[0] = 0;
for (size_t h = 1; h < numHosts; h++) {
globalNodeOffset[h] = localNodeSize[h - 1] + globalNodeOffset[h - 1];
}
srcGraph.setSize(globalNodeOffset[numHosts - 1] +
localNodeSize[numHosts - 1]);

increment_evilPhase();
}

template <typename T = EdgeDataType>
typename std::enable_if<!std::is_same<T, ELEdge>::value>::type
exchangeLocalNodeSize(WMDOfflineGraph<NodeDataType, EdgeDataType>& srcGraph) {
auto& net = galois::runtime::getSystemNetworkInterface();
globalNodeOffset.resize(numHosts);
Expand Down Expand Up @@ -781,7 +835,8 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
* Exchanges vertex ids to form a global id to local id map before exchanging
* edges so that using the map edges can be inserted into the edgelist
*/
void
template <typename T = EdgeDataType>
typename std::enable_if<!std::is_same<T, ELEdge>::value>::type
gatherVerticesAndEdges(std::vector<std::vector<EdgeDataType>>& localEdges,
std::vector<NodeDataType>& localNodes) {
auto& net = galois::runtime::getSystemNetworkInterface();
Expand Down Expand Up @@ -932,6 +987,148 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
increment_evilPhase();
}

/**
* Exchanges vertex ids to form a global id to local id map before exchanging
* edges so that using the map edges can be inserted into the edgelist
*/
template <typename T = EdgeDataType>
typename std::enable_if<std::is_same<T, ELEdge>::value>::type
gatherVerticesAndEdges(std::vector<std::vector<EdgeDataType>>& localEdges,
std::vector<NodeDataType>&) {
auto& net = galois::runtime::getSystemNetworkInterface();
uint32_t activeThreads = galois::getActiveThreads();

// create per thread map
std::vector<std::unordered_map<uint64_t, uint64_t>> perThreadGIDtoLID(
activeThreads);
galois::on_each([&](unsigned tid, unsigned nthreads) {
uint32_t beginNode;
uint32_t endNode;
std::tie(beginNode, endNode) =
galois::block_range((uint32_t)0, globalSize, tid, nthreads);
uint32_t id = 0;
for (uint32_t i = beginNode; i < endNode; ++i) {
uint64_t src = i;
uint32_t host = virtualToPhyMapping[src % numVirtualHosts];
if (host == hostID) {
perThreadGIDtoLID[tid].emplace(src, id);
id++;
}
}
});

// prefix sum to get offset
uint64_t offset = 0;
std::vector<uint64_t> prefixSum(activeThreads, 0);
galois::do_all(
galois::iterate((size_t)0, (size_t)activeThreads),
[&](size_t h) { prefixSum[h] = perThreadGIDtoLID[h].size(); });
for (uint32_t i = 0; i < activeThreads; i++) {
for (uint32_t j = 0; j < i; j++) {
offset += prefixSum[j];
}
prefixSum[i] = offset;
}
using map = phmap::parallel_flat_hash_map_m<uint64_t, uint64_t>;
// parallel insert
galois::do_all(
galois::iterate((size_t)0, (size_t)activeThreads), [&](size_t h) {
for (auto& kv : perThreadGIDtoLID[h]) {
GIDtoLID.lazy_emplace_l(
kv.first, [&](map::value_type&) {},
[&](const map::constructor& ctor) {
ctor(std::pair(kv.first, kv.second + prefixSum[h]));
});
LIDtoGID.lazy_emplace_l(
kv.second + prefixSum[h], [&](map::value_type&) {},
[&](const map::constructor& ctor) {
ctor(std::pair(kv.second + prefixSum[h], kv.first));
});
}
});

std::vector<std::vector<std::vector<EdgeDataType>>> edgesToSend(
numHosts, std::vector<std::vector<EdgeDataType>>());
galois::PerThreadVector<std::vector<std::vector<EdgeDataType>>>
threadEdgesToSend;
for (uint32_t i = 0; i < activeThreads; i++) {
threadEdgesToSend[i].resize(numHosts);
}
// send edge data to other hosts
// Prepare edgeList and Vertex ID list to send to other hosts
galois::on_each([&](unsigned tid, unsigned nthreads) {
uint64_t beginNode;
uint64_t endNode;
std::tie(beginNode, endNode) =
galois::block_range((uint64_t)0, localEdges.size(), tid, nthreads);
for (uint64_t i = beginNode; i < endNode; ++i) {
uint64_t src = localEdges[i][0].src;
int host = virtualToPhyMapping[src % numVirtualHosts];
threadEdgesToSend.get()[host].emplace_back((localEdges[i]));
}
});
localEdges.clear();
for (uint32_t tid = 0; tid < activeThreads; tid++) {
for (uint32_t h = 0; h < numHosts; h++) {
edgesToSend[h].insert(edgesToSend[h].end(),
threadEdgesToSend[tid][h].begin(),
threadEdgesToSend[tid][h].end());
}
}
threadEdgesToSend.clear_all_parallel();
// Send Edgelist
for (uint32_t h = 0; h < numHosts; h++) {
if (h == hostID)
continue;
galois::runtime::SendBuffer sendBuffer;
galois::runtime::gSerialize(sendBuffer, edgesToSend[h]);
galois::gInfo("[", hostID, "] ", "send to ", h,
" edgesToSend size: ", edgesToSend[h].size());
net.sendTagged(h, galois::runtime::evilPhase, std::move(sendBuffer));
}
// Appending edges in each host that belong to self
localEdges.resize(GIDtoLID.size());
// Receiving edges from other hosts and populating edgelist
for (uint32_t h = 0; h < (numHosts - 1); h++) {
decltype(net.recieveTagged(galois::runtime::evilPhase)) p;
do {
p = net.recieveTagged(galois::runtime::evilPhase);
} while (!p);
uint32_t sendingHost = p->first;
std::vector<std::vector<EdgeDataType>> edgeList;
galois::runtime::gDeserialize(p->second, edgeList);
galois::gInfo("[", hostID, "] recv from ", sendingHost,
" edgeList size: ", edgeList.size());
galois::on_each([&](unsigned tid, unsigned nthreads) {
size_t beginNode;
size_t endNode;
std::tie(beginNode, endNode) =
galois::block_range((size_t)0, edgeList.size(), tid, nthreads);
for (size_t j = beginNode; j < endNode; j++) {
auto lid = GIDtoLID[edgeList[j][0].src];
localEdges[lid].insert(std::end(localEdges[lid]),
std::begin(edgeList[j]),
std::end(edgeList[j]));
}
});
edgeList.clear();
}
galois::on_each([&](unsigned tid, unsigned nthreads) {
size_t beginNode;
size_t endNode;
std::tie(beginNode, endNode) = galois::block_range(
(size_t)0, edgesToSend[hostID].size(), tid, nthreads);
for (size_t j = beginNode; j < endNode; j++) {
auto lid = GIDtoLID[edgesToSend[hostID][j][0].src];
localEdges[lid].insert(std::end(localEdges[lid]),
std::begin(edgesToSend[hostID][j]),
std::end(edgesToSend[hostID][j]));
}
});
edgesToSend.clear();
increment_evilPhase();
}

/**
* Flatten the 2D vector localEdges into a CSR edge list
* Will compute edge size and build CSR edge offset mapping
Expand Down Expand Up @@ -979,10 +1176,12 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
uint32_t scaleFactor;
uint64_t numNodes;
uint32_t numVirtualHosts;
uint32_t numLocalNodes;
std::vector<uint64_t> localNodeSize; // number of local nodes in each hosts
std::vector<uint64_t>
globalNodeOffset; // each hosts' local ID offset wrt global ID
std::vector<uint32_t> virtualToPhyMapping;
void setSize(uint64_t size) { globalSize = size; }
/**
* Gets the number of global nodes in the graph
* @returns the total number of nodes in the graph (not just local loaded
Expand Down Expand Up @@ -1037,9 +1236,10 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
}

// build local buffered graph
exchangeLocalNodeSize(srcGraph);
galois::gDebug("[", hostID, "] gatherVerticesAndEdges!");
gatherVerticesAndEdges(srcGraph.localEdges, srcGraph.localNodes);
numLocalNodes = GIDtoLID.size();
exchangeLocalNodeSize(srcGraph);
galois::gDebug("[", hostID, "] ", "flattenEdges!");
flattenEdges(srcGraph.localEdges);

Expand Down Expand Up @@ -1072,11 +1272,7 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
void gatherNodes(WMDOfflineGraph<NodeDataType, EdgeDataType>& srcGraph,
GraphTy& dstGraph,
std::vector<std::vector<uint64_t>>& proxiesOnHosts,
uint64_t totalLocalNodes,
std::unordered_map<uint64_t, uint32_t> globalToLocalMap) {
#ifdef NDEBUG
(void)totalLocalNodes;
#endif
auto& net = galois::runtime::getSystemNetworkInterface();
auto& localNodes = srcGraph.localNodes;

Expand Down Expand Up @@ -1223,10 +1419,6 @@ class WMDBufferedGraph : public BufferedGraph<EdgeDataType> {
nodeRecv.clear();
IDofNodeRecv.clear();
}
#ifndef NDEBUG
// assert(addedData == totalLocalNodes);
#endif

increment_evilPhase();

// clean unused memory
Expand Down
Loading

0 comments on commit b7bd738

Please sign in to comment.