Skip to content

Commit

Permalink
wip: parallel impl. for readAttributeAllsteps
Browse files Browse the repository at this point in the history
huh this works??
  • Loading branch information
franzpoeschel committed Dec 18, 2024
1 parent 4091776 commit d0b4562
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 13 deletions.
5 changes: 3 additions & 2 deletions include/openPMD/auxiliary/Mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ namespace
}
else
{
static_assert(
dependent_false_v<T>, "openPMD_MPI_type: Unsupported type.");
throw std::runtime_error("Unimplemented");
// static_assert(
// dependent_false_v<T>, "openPMD_MPI_type: Unsupported type.");
}
}
} // namespace
Expand Down
197 changes: 186 additions & 11 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
#include "openPMD/auxiliary/Mpi.hpp"
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/auxiliary/TypeTraits.hpp"
#include "openPMD/auxiliary/Variant.hpp"
Expand All @@ -43,6 +44,8 @@
#include <iostream>
#include <iterator>
#include <memory>
#include <mpi.h>
#include <numeric>
#include <set>
#include <sstream>
#include <stdexcept>
Expand Down Expand Up @@ -1417,6 +1420,143 @@ namespace

static constexpr char const *errorMsg = "ReadAttributeAllsteps";
};

template <typename Vec>
auto vec_as_string(Vec const &vec) -> std::string
{
if (vec.empty())
{
return "[]";
}
else
{
std::stringstream res;
res << '[';
auto it = vec.begin();
res << *it++;
auto end = vec.end();
for (; it != end; ++it)
{
res << ", " << *it;
}
res << ']';
return res.str();
}
}

#if openPMD_HAVE_MPI
struct DistributeToAllRanks
{
template <typename T>
static void call(
Parameter<Operation::READ_ATT_ALLSTEPS>::result_type
&put_result_here_in,
MPI_Comm comm,
int rank)
{
if (rank != 0)
{
put_result_here_in = std::vector<T>{};
}
std::vector<T> &put_result_here =
std::get<std::vector<T>>(put_result_here_in);
std::cout << "INIT VECTOR SIZE: " << put_result_here.size() << '\n';
size_t num_items = put_result_here.size();
MPI_Bcast(
&num_items, 1, auxiliary::openPMD_MPI_type<size_t>(), 0, comm);
std::cout << "WILL_COMMUNICATE: " << num_items << '\n';
if constexpr (
std::is_same_v<T, std::string> ||
std::is_same_v<T, std::vector<std::string>> ||
std::is_same_v<T, bool> ||
std::is_same_v<T, std::vector<bool>> || auxiliary::IsArray_v<T>)
{
throw error::OperationUnsupportedInBackend(
"ADIOS2",
"[readAttributeAllsteps] No support for attributes of type "
"string, bool or std::array in parallel.");
}
else if constexpr (
// auxiliary::IsArray_v<T> ||
auxiliary::IsVector_v<T>)
{
std::vector<size_t> sizes;
sizes.reserve(num_items);
if (rank == 0)
{
std::transform(
put_result_here.begin(),
put_result_here.end(),
std::back_inserter(sizes),
[](T const &arr) { return arr.size(); });
}
sizes.resize(num_items);
MPI_Bcast(
sizes.data(),
num_items,
auxiliary::openPMD_MPI_type<size_t>(),
0,
comm);
size_t total_flat_size =
std::accumulate(sizes.begin(), sizes.end(), size_t(0));
using flat_type = typename T::value_type;
std::vector<flat_type> flat_vector;
flat_vector.reserve(total_flat_size);
if (rank == 0)
{
for (auto const &arr : put_result_here)
{
for (auto val : arr)
{
flat_vector.push_back(val);
}
}
}
flat_vector.resize(total_flat_size);
MPI_Bcast(
flat_vector.data(),
total_flat_size,
auxiliary::openPMD_MPI_type<flat_type>(),
0,
comm);
if (rank != 0)
{
std::cout << "RECEIVED: " << vec_as_string(flat_vector)
<< '\n';
size_t offset = 0;
put_result_here.reserve(num_items);
for (size_t current_extent : sizes)
{
put_result_here.emplace_back(
flat_vector.begin() + offset,
flat_vector.begin() + offset + current_extent);
offset += current_extent;
}
}
}
else
{
std::vector<T> receive;
if (rank != 0)
{
receive.resize(num_items);
}
MPI_Bcast(
rank == 0 ? put_result_here.data() : receive.data(),
num_items,
auxiliary::openPMD_MPI_type<T>(),
0,
comm);
if (rank != 0)
{
put_result_here = std::move(receive);
std::cout << "RECEIVED: " << vec_as_string(receive) << '\n';
}
}
}
static constexpr char const *errorMsg = "DistributeToAllRanks";
};
#endif
} // namespace

void ADIOS2IOHandlerImpl::readAttributeAllsteps(
Expand All @@ -1427,17 +1567,52 @@ void ADIOS2IOHandlerImpl::readAttributeAllsteps(
auto pos = setAndGetFilePosition(writable);
auto name = nameOfAttribute(writable, param.name);

adios2::ADIOS adios;
auto IO = adios.DeclareIO("PreparseSnapshots");
// @todo check engine type
// @todo MPI implementation
IO.SetEngine(realEngineType());
auto engine = IO.Open(fullPath(*file), adios2::Mode::Read);
auto status = engine.BeginStep();
auto type = detail::attributeInfo(IO, name, /* verbose = */ true);
switchType<ReadAttributeAllsteps>(
type, IO, engine, name, status, *param.resource);
engine.Close();
auto read_from_file_in_serial = [&]() {
adios2::ADIOS adios;
auto IO = adios.DeclareIO("PreparseSnapshots");
// @todo check engine type
IO.SetEngine(realEngineType());
auto engine = IO.Open(fullPath(*file), adios2::Mode::Read);
auto status = engine.BeginStep();
auto type = detail::attributeInfo(IO, name, /* verbose = */ true);
switchType<ReadAttributeAllsteps>(
type, IO, engine, name, status, *param.resource);
engine.Close();
return type;
};
#if openPMD_HAVE_MPI
if (!m_communicator.has_value())
{
read_from_file_in_serial();
return;
}
int rank, size;
MPI_Comm_rank(*m_communicator, &rank);
MPI_Comm_size(*m_communicator, &size);
Datatype type;
constexpr size_t max_string_len = 30;
char distribute_string[max_string_len]{};
if (rank == 0)
{
type = read_from_file_in_serial();
auto as_string = datatypeToString(type);
if (as_string.size() + 1 > max_string_len)
{
throw error::Internal(
"String size hardcoded too low for MPI_Bcast");
}
std::copy(as_string.begin(), as_string.end(), distribute_string);
}
MPI_Bcast(distribute_string, max_string_len, MPI_CHAR, 0, *m_communicator);
if (rank != 0)
{
type = stringToDatatype(std::string(distribute_string));
}
switchType<DistributeToAllRanks>(
type, *param.resource, *m_communicator, rank);
#else
read_from_file_in_serial();
#endif
}

void ADIOS2IOHandlerImpl::listPaths(
Expand Down

0 comments on commit d0b4562

Please sign in to comment.