From d64dbc29b86ca55893aa33a027b8e685cebbf8eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 28 Feb 2024 03:37:46 +0100 Subject: [PATCH] Parallel JSON (#1475) * Plumberwork for MPI communicator in JSON backend * Parallel reading * ... and writing * Set padding according to MPI rank * Write README.txt file * Bug fix: don't double prepend base dir * Test parallel output in openpmd-pipe test * Bug fix: use mpi_rank_%i.toml when writing to TOML * Refactor `if` statement * Add documentation --- CMakeLists.txt | 2 +- docs/source/backends/json.rst | 37 ++- include/openPMD/IO/JSON/JSONIOHandler.hpp | 15 +- include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp | 20 +- src/IO/AbstractIOHandlerHelper.cpp | 19 +- src/IO/JSON/JSONIOHandler.cpp | 18 +- src/IO/JSON/JSONIOHandlerImpl.cpp | 211 ++++++++++++++++-- src/auxiliary/Filesystem.cpp | 3 +- 8 files changed, 294 insertions(+), 31 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c21690f0c9..ef3f77bf9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1370,7 +1370,7 @@ if(openPMD_BUILD_TESTING) --outfile \ ../samples/git-sample/thetaMode/data_%T.bp && \ \ - ${Python_EXECUTABLE} \ + ${MPI_TEST_EXE} ${Python_EXECUTABLE} \ ${openPMD_RUNTIME_OUTPUT_DIRECTORY}/openpmd-pipe \ --infile ../samples/git-sample/thetaMode/data_%T.bp \ --outfile ../samples/git-sample/thetaMode/data%T.json \ diff --git a/docs/source/backends/json.rst b/docs/source/backends/json.rst index 48ec6b1f44..bbae92aaf6 100644 --- a/docs/source/backends/json.rst +++ b/docs/source/backends/json.rst @@ -92,7 +92,6 @@ propagate the exception thrown by Niels Lohmann's library. The (keys) names ``"attributes"``, ``"data"`` and ``"datatype"`` are reserved and must not be used for base/mesh/particles path, records and their components. -A parallel (i.e. MPI) implementation is *not* available. TOML Restrictions ----------------- @@ -106,7 +105,41 @@ TOML does not support null values. The (keys) names ``"attributes"``, ``"data"`` and ``"datatype"`` are reserved and must not be used for base/mesh/particles path, records and their components. -A parallel (i.e. MPI) implementation is *not* available. + +Using in parallel (MPI) +----------------------- + +Parallel I/O is not a first-class citizen in the JSON and TOML backends, and neither backend will "go out of its way" to support parallel workflows. + +However there is a rudimentary form of read and write support in parallel: + +Parallel reading +................ + +In order not to overload the parallel filesystem with parallel reads, read access to JSON datasets is done by rank 0 and then broadcast to all other ranks. +Note that there is no granularity whatsoever in reading a JSON file. +A JSON file is always read into memory and broadcast to all other ranks in its entirety. + +Parallel writing +................ + +When executed in an MPI context, the JSON/TOML backends will not directly output a single text file, but instead a folder containing one file per MPI rank. +Neither backend will perform any data aggregation at all. + +.. note:: + + The parallel write support of the JSON/TOML backends is intended mainly for debugging and prototyping workflows. + +The folder will use the specified Series name, but append the postfix ``.parallel``. +(This is a deliberate indication that this folder cannot directly be opened again by the openPMD-api as a JSON/TOML dataset.) +This folder contains for each MPI rank *i* a file ``mpi_rank_.json`` (resp. ``mpi_rank_.toml``), containing the serial output of that rank. +A ``README.txt`` with basic usage instructions is also written. + +.. note:: + + There is no direct support in the openPMD-api to read a JSON/TOML dataset written in this parallel fashion. The single files (e.g. ``data.json.parallel/mpi_rank_0.json``) are each valid openPMD files and can be read separately, however. + + Note that the auxiliary function ``json::merge()`` (or in Python ``openpmd_api.merge_json()``) is not adequate for merging the single JSON/TOML files back into one, since it does not merge anything below the array level. Example diff --git a/include/openPMD/IO/JSON/JSONIOHandler.hpp b/include/openPMD/IO/JSON/JSONIOHandler.hpp index 7fdea5b6f0..7cb6870f5b 100644 --- a/include/openPMD/IO/JSON/JSONIOHandler.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandler.hpp @@ -24,17 +24,30 @@ #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/JSON/JSONIOHandlerImpl.hpp" +#if openPMD_HAVE_MPI +#include +#endif + namespace openPMD { class JSONIOHandler : public AbstractIOHandler { public: JSONIOHandler( - std::string const &path, + std::string path, + Access at, + openPMD::json::TracingJSON config, + JSONIOHandlerImpl::FileFormat, + std::string originalExtension); +#if openPMD_HAVE_MPI + JSONIOHandler( + std::string path, Access at, + MPI_Comm, openPMD::json::TracingJSON config, JSONIOHandlerImpl::FileFormat, std::string originalExtension); +#endif ~JSONIOHandler() override; diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index 5ce9d057c3..da7e296d59 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -31,6 +31,9 @@ #include #include +#if openPMD_HAVE_MPI +#include +#endif #include #include @@ -70,6 +73,7 @@ struct File std::string name; bool valid = true; + bool printedReadmeWarningAlready = false; }; std::shared_ptr fileState; @@ -167,6 +171,15 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl FileFormat, std::string originalExtension); +#if openPMD_HAVE_MPI + JSONIOHandlerImpl( + AbstractIOHandler *, + MPI_Comm, + openPMD::json::TracingJSON config, + FileFormat, + std::string originalExtension); +#endif + ~JSONIOHandlerImpl() override; void @@ -230,6 +243,10 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl std::future flush(); private: +#if openPMD_HAVE_MPI + std::optional m_communicator; +#endif + using FILEHANDLE = std::fstream; // map each Writable to its associated file @@ -323,7 +340,8 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl // write to disk the json contents associated with the file // remove from m_dirty if unsetDirty == true - void putJsonContents(File const &, bool unsetDirty = true); + auto putJsonContents(File const &, bool unsetDirty = true) + -> decltype(m_jsonVals)::iterator; // figure out the file position of the writable // (preferring the parent's file position) and extend it diff --git a/src/IO/AbstractIOHandlerHelper.cpp b/src/IO/AbstractIOHandlerHelper.cpp index 699dfd3619..8576343e5d 100644 --- a/src/IO/AbstractIOHandlerHelper.cpp +++ b/src/IO/AbstractIOHandlerHelper.cpp @@ -125,8 +125,23 @@ std::unique_ptr createIOHandler( "ssc", std::move(originalExtension)); case Format::JSON: - throw error::WrongAPIUsage( - "JSON backend not available in parallel openPMD."); + return constructIOHandler( + "JSON", + path, + access, + comm, + std::move(options), + JSONIOHandlerImpl::FileFormat::Json, + std::move(originalExtension)); + case Format::TOML: + return constructIOHandler( + "JSON", + path, + access, + comm, + std::move(options), + JSONIOHandlerImpl::FileFormat::Toml, + std::move(originalExtension)); default: throw error::WrongAPIUsage( "Unknown file format! Did you specify a file ending? Specified " diff --git a/src/IO/JSON/JSONIOHandler.cpp b/src/IO/JSON/JSONIOHandler.cpp index 041b236340..d2a6217eb5 100644 --- a/src/IO/JSON/JSONIOHandler.cpp +++ b/src/IO/JSON/JSONIOHandler.cpp @@ -26,15 +26,29 @@ namespace openPMD JSONIOHandler::~JSONIOHandler() = default; JSONIOHandler::JSONIOHandler( - std::string const &path, + std::string path, Access at, openPMD::json::TracingJSON jsonCfg, JSONIOHandlerImpl::FileFormat format, std::string originalExtension) - : AbstractIOHandler{path, at} + : AbstractIOHandler{std::move(path), at} , m_impl{this, std::move(jsonCfg), format, std::move(originalExtension)} {} +#if openPMD_HAVE_MPI +JSONIOHandler::JSONIOHandler( + std::string path, + Access at, + MPI_Comm comm, + openPMD::json::TracingJSON jsonCfg, + JSONIOHandlerImpl::FileFormat format, + std::string originalExtension) + : AbstractIOHandler{std::move(path), at} + , m_impl{JSONIOHandlerImpl{ + this, comm, std::move(jsonCfg), format, std::move(originalExtension)}} +{} +#endif + std::future JSONIOHandler::flush(internal::ParsedFlushParams &) { return m_impl.flush(); diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 36d153de6a..657c15e5fb 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -23,12 +23,17 @@ #include "openPMD/Datatype.hpp" #include "openPMD/DatatypeHelpers.hpp" #include "openPMD/Error.hpp" +#include "openPMD/IO/AbstractIOHandler.hpp" +#include "openPMD/IO/AbstractIOHandlerImpl.hpp" #include "openPMD/auxiliary/Filesystem.hpp" +#include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/auxiliary/Memory.hpp" #include "openPMD/auxiliary/StringManip.hpp" #include "openPMD/auxiliary/TypeTraits.hpp" #include "openPMD/backend/Writable.hpp" +#include +#include #include #include @@ -133,6 +138,21 @@ JSONIOHandlerImpl::JSONIOHandlerImpl( , m_originalExtension{std::move(originalExtension)} {} +#if openPMD_HAVE_MPI +JSONIOHandlerImpl::JSONIOHandlerImpl( + AbstractIOHandler *handler, + MPI_Comm comm, + // NOLINTNEXTLINE(performance-unnecessary-value-param) + [[maybe_unused]] openPMD::json::TracingJSON config, + FileFormat format, + std::string originalExtension) + : AbstractIOHandlerImpl(handler) + , m_communicator{comm} + , m_fileFormat{format} + , m_originalExtension{std::move(originalExtension)} +{} +#endif + JSONIOHandlerImpl::~JSONIOHandlerImpl() = default; std::future JSONIOHandlerImpl::flush() @@ -618,7 +638,11 @@ void JSONIOHandlerImpl::closeFile( auto fileIterator = m_files.find(writable); if (fileIterator != m_files.end()) { - putJsonContents(fileIterator->second); + auto it = putJsonContents(fileIterator->second); + if (it != m_jsonVals.end()) + { + m_jsonVals.erase(it); + } m_dirty.erase(fileIterator->second); // do not invalidate the file // it still exists, it is just not open @@ -1250,20 +1274,64 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) return it->second; } // read from file - auto [fh, fh_with_precision, _] = getFilehandle(file, Access::READ_ONLY); - (void)_; - std::shared_ptr res = std::make_shared(); - switch (m_fileFormat) + auto serialImplementation = [&file, this]() { + auto [fh, fh_with_precision, _] = + getFilehandle(file, Access::READ_ONLY); + (void)_; + std::shared_ptr res = + std::make_shared(); + switch (m_fileFormat) + { + case FileFormat::Json: + *fh_with_precision >> *res; + break; + case FileFormat::Toml: + *res = openPMD::json::tomlToJson( + toml::parse(*fh_with_precision, *file)); + break; + } + VERIFY(fh->good(), "[JSON] Failed reading from a file."); + return res; + }; +#if openPMD_HAVE_MPI + auto parallelImplementation = [&file, this](MPI_Comm comm) { + auto path = fullPath(*file); + std::string collectivelyReadRawData = + auxiliary::collective_file_read(path, comm); + std::shared_ptr res = + std::make_shared(); + switch (m_fileFormat) + { + case FileFormat::Json: + *res = nlohmann::json::parse(collectivelyReadRawData); + break; + case FileFormat::Toml: + std::istringstream istream( + collectivelyReadRawData.c_str(), + std::ios_base::binary | std::ios_base::in); + auto as_toml = toml::parse( + istream >> std::setprecision( + std::numeric_limits::digits10 + 1), + *file); + *res = openPMD::json::tomlToJson(as_toml); + break; + } + return res; + }; + std::shared_ptr res; + if (m_communicator.has_value()) { - case FileFormat::Json: - *fh_with_precision >> *res; - break; - case FileFormat::Toml: - *res = - openPMD::json::tomlToJson(toml::parse(*fh_with_precision, *file)); - break; + res = parallelImplementation(m_communicator.value()); } - VERIFY(fh->good(), "[JSON] Failed reading from a file."); + else + { + res = serialImplementation(); + } + +#else + auto res = serialImplementation(); +#endif + m_jsonVals.emplace(file, res); return res; } @@ -1275,21 +1343,26 @@ nlohmann::json &JSONIOHandlerImpl::obtainJsonContents(Writable *writable) return (*obtainJsonContents(file))[filePosition->id]; } -void JSONIOHandlerImpl::putJsonContents( +auto JSONIOHandlerImpl::putJsonContents( File const &filename, bool unsetDirty // = true -) + ) -> decltype(m_jsonVals)::iterator { VERIFY_ALWAYS( filename.valid(), "[JSON] File has been overwritten/deleted before writing"); auto it = m_jsonVals.find(filename); - if (it != m_jsonVals.end()) + if (it == m_jsonVals.end()) { + return it; + } + + (*it->second)["platform_byte_widths"] = platformSpecifics(); + + auto writeSingleFile = [this, &it](std::string const &writeThisFile) { auto [fh, _, fh_with_precision] = - getFilehandle(filename, Access::CREATE); + getFilehandle(File(writeThisFile), Access::CREATE); (void)_; - (*it->second)["platform_byte_widths"] = platformSpecifics(); switch (m_fileFormat) { @@ -1303,12 +1376,108 @@ void JSONIOHandlerImpl::putJsonContents( } VERIFY(fh->good(), "[JSON] Failed writing data to disk.") - m_jsonVals.erase(it); - if (unsetDirty) + }; + + auto serialImplementation = [&filename, &writeSingleFile]() { + writeSingleFile(*filename); + }; + +#if openPMD_HAVE_MPI + auto num_digits = [](unsigned n) -> unsigned { + constexpr auto max = std::numeric_limits::max(); + unsigned base_10 = 1; + unsigned res = 1; + while (base_10 < max) { - m_dirty.erase(filename); + base_10 *= 10; + if (n / base_10 == 0) + { + return res; + } + ++res; } + return res; + }; + + auto parallelImplementation = + [this, &filename, &writeSingleFile, &num_digits](MPI_Comm comm) { + auto path = fullPath(*filename); + auto dirpath = path + ".parallel"; + if (!auxiliary::create_directories(dirpath)) + { + throw std::runtime_error( + "Failed creating directory '" + dirpath + + "' for parallel JSON output"); + } + int rank = 0, size = 0; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + std::stringstream subfilePath; + // writeSingleFile will prepend the base dir + subfilePath << *filename << ".parallel/mpi_rank_" + << std::setw(num_digits(size - 1)) << std::setfill('0') + << rank << [&]() { + switch (m_fileFormat) + { + case FileFormat::Json: + return ".json"; + case FileFormat::Toml: + return ".toml"; + } + throw std::runtime_error("Unreachable!"); + }(); + writeSingleFile(subfilePath.str()); + if (rank == 0) + { + constexpr char const *readme_msg = R"( +This folder has been created by a parallel instance of the JSON backend in +openPMD. There is one JSON file for each parallel writer MPI rank. +The parallel JSON backend performs no metadata or data aggregation at all. + +This functionality is intended mainly for debugging and prototyping workflows. +There is no support in the openPMD-api for reading this folder as a single +dataset. For reading purposes, either pick a single .json file and read that, or +merge the .json files somehow (no tooling provided for this (yet)). +)"; + std::fstream readme_file; + readme_file.open( + dirpath + "/README.txt", + std::ios_base::out | std::ios_base::trunc); + readme_file << readme_msg + 1; + readme_file.close(); + if (!readme_file.good() && + !filename.fileState->printedReadmeWarningAlready) + { + std::cerr + << "[Warning] Something went wrong in trying to create " + "README file at '" + << dirpath + << "/README.txt'. Will ignore and continue. The README " + "message would have been:\n----------\n" + << readme_msg + 1 << "----------" << std::endl; + filename.fileState->printedReadmeWarningAlready = true; + } + } + }; + + std::shared_ptr res; + if (m_communicator.has_value()) + { + parallelImplementation(m_communicator.value()); + } + else + { + serialImplementation(); + } + +#else + serialImplementation(); +#endif + if (unsetDirty) + { + m_dirty.erase(filename); } + return it; } std::shared_ptr JSONIOHandlerImpl::setAndGetFilePosition( diff --git a/src/auxiliary/Filesystem.cpp b/src/auxiliary/Filesystem.cpp index cce80b9d17..564d266ee3 100644 --- a/src/auxiliary/Filesystem.cpp +++ b/src/auxiliary/Filesystem.cpp @@ -195,7 +195,8 @@ std::string collective_file_read(std::string const &path, MPI_Comm comm) if (!handle.good()) { throw std::runtime_error( - "Failed reading JSON config from file " + path + "."); + "[collective_file_read] Failed acessing file '" + path + + "' on MPI rank 0."); } stringLength = res.size() + 1; }