diff --git a/docs/source/usage/workflow.rst b/docs/source/usage/workflow.rst index 61ef593a2e..ec44e2e70f 100644 --- a/docs/source/usage/workflow.rst +++ b/docs/source/usage/workflow.rst @@ -3,6 +3,49 @@ Workflow ======== +Storing and reading chunks +-------------------------- + +1. **Chunks within an n-dimensional dataset** + + Most commonly, chunks within an n-dimensional dataset are identified by their offset and extent. + The extent is the size of the chunk in each dimension, NOT the absolute coordinate within the entire dataset. + + In the Python API, this is modeled to conform to the conventional ``__setitem__``/``__getitem__`` protocol. + +2. **Joined arrays (write only)** + + (Currently) only supported in ADIOS2 no older than v2.9.0 under the conditions listed in the `ADIOS2 documentation on joined arrays `_. + + In some cases, the concrete chunk within a dataset does not matter and the computation of indexes is a needless computational and mental overhead. + This commonly occurs for particle data which the openPMD-standard models as a list of particles. + The order of particles does not matter greatly, and making different parallel processes agree on indexing is error-prone boilerplate. + + In such a case, at most one *joined dimension* can be specified in the Dataset, e.g. ``{Dataset::JOINED_DIMENSION, 128, 128}`` (3D for the sake of explanation, particle data would normally be 1D). + The chunk is then stored by specifying an empty offset vector ``{}``. + The chunk extent vector must be equivalent to the global extent in all non-joined dimensions (i.e. joined arrays allow no further sub-chunking other than concatenation along the joined dimension). + The joined dimension of the extent vector specifies the extent that this piece should have along the joined dimension. + In the Python API, the slice-based setter syntax can be used as an abbreviation since the necessary information is determined from the passed array, e.g. ``record_component[()] = local_data``. + The global extent of the dataset along the joined dimension will then be the sum of all local chunk extents along the joined dimension. + + Since openPMD follows a struct-of-array layout of data, it is important not to lose correlation of data between components. E.g., joining an array must take care that ``particles/e/position/x`` and ``particles/e/position/y`` are joined in uniform way. + + The openPMD-api makes the **following guarantee**: + + Consider a Series written from ``N`` parallel processes between two (collective) flush points. For each parallel process ``n`` and dataset ``D``, let: + + * ``chunk(D, n, i)`` be the ``i``'th chunk written to dataset ``D`` on process ``n`` + * ``num_chunks(D, n)`` be the count of chunks written by ``n`` to ``D`` + * ``joined_index(D, c)`` be the index of chunk ``c`` in the joining order of ``D`` + + Then for any two datasets ``x`` and ``y``: + + * If for any parallel process ``n`` the condition holds that ``num_chunks(x, n) = num_chunks(y, n)`` (between the two flush points!)... + * ...then for any parallel process ``n`` and chunk index ``i`` less than ``num_chunks(x, n)``: ``joined_index(x, chunk(x, n, i)) = joined_index(y, chunk(y, n, i))``. + + **TLDR:** Writing chunks to two joined arrays in synchronous way (**1.** same order of store operations and **2.** between the same flush operations) will result in the same joining order in both arrays. + + Access modes ------------ diff --git a/examples/5_write_parallel.py b/examples/5_write_parallel.py index ace0cd6e63..8574c1d66e 100644 --- a/examples/5_write_parallel.py +++ b/examples/5_write_parallel.py @@ -14,13 +14,21 @@ import numpy as np import openpmd_api as io +try: + import adios2 + from packaging import version + USE_JOINED_DIMENSION = \ + version.parse(adios2.__version__) >= version.parse('2.9.0') +except ImportError: + USE_JOINED_DIMENSION = False + if __name__ == "__main__": # also works with any other MPI communicator comm = MPI.COMM_WORLD # global data set to write: [MPI_Size * 10, 300] # each rank writes a 10x300 slice with its MPI rank as values - local_value = comm.size + local_value = comm.rank local_data = np.ones(10 * 300, dtype=np.double).reshape(10, 300) * local_value if 0 == comm.rank: @@ -29,7 +37,9 @@ # open file for writing series = io.Series( - "../samples/5_parallel_write_py.h5", + "../samples/5_parallel_write_py.bp" + if USE_JOINED_DIMENSION + else "../samples/5_parallel_write_py.h5", io.Access.create, comm ) @@ -51,7 +61,9 @@ meshes["mymesh"] # example 1D domain decomposition in first index - global_extent = [comm.size * 10, 300] + global_extent = [io.Dataset.JOINED_DIMENSION, 300] \ + if USE_JOINED_DIMENSION else [comm.size * 10, 300] + dataset = io.Dataset(local_data.dtype, global_extent) if 0 == comm.rank: @@ -64,7 +76,15 @@ "mymesh in iteration 1") # example shows a 1D domain decomposition in first index - mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data + + if USE_JOINED_DIMENSION: + # explicit API + # mymesh.store_chunk(local_data, [], [10, 300]) + mymesh[:, :] = local_data + # or short: + # mymesh[()] = local_data + else: + mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data if 0 == comm.rank: print("Registered a single chunk per MPI rank containing its " "contribution, ready to write content to disk") diff --git a/include/openPMD/Dataset.hpp b/include/openPMD/Dataset.hpp index 8757a3cf0a..0032888541 100644 --- a/include/openPMD/Dataset.hpp +++ b/include/openPMD/Dataset.hpp @@ -22,7 +22,9 @@ #include "openPMD/Datatype.hpp" +#include #include +#include #include #include #include @@ -37,6 +39,11 @@ class Dataset friend class RecordComponent; public: + enum : std::uint64_t + { + JOINED_DIMENSION = std::numeric_limits::max() + }; + Dataset(Datatype, Extent, std::string options = "{}"); /** @@ -53,5 +60,9 @@ class Dataset Datatype dtype; uint8_t rank; std::string options = "{}"; //!< backend-dependent JSON configuration + + bool empty() const; + + std::optional joinedDimension() const; }; } // namespace openPMD diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index 66c8c7a466..c23fb81d49 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -60,6 +60,8 @@ namespace openPMD { #if openPMD_HAVE_ADIOS2 +std::optional joinedDimension(adios2::Dims const &dims); + class ADIOS2IOHandler; namespace detail @@ -443,12 +445,35 @@ class ADIOS2IOHandlerImpl std::to_string(actualDim) + ")"); } } - for (unsigned int i = 0; i < actualDim; i++) + auto joinedDim = joinedDimension(shape); + if (joinedDim.has_value()) { - if (offset[i] + extent[i] > shape[i]) + if (!offset.empty()) { throw std::runtime_error( - "[ADIOS2] Dataset access out of bounds."); + "[ADIOS2] Offset must be an empty vector in case of joined " + "array."); + } + for (unsigned int i = 0; i < actualDim; i++) + { + if (*joinedDim != i && extent[i] != shape[i]) + { + throw std::runtime_error( + "[ADIOS2] store_chunk extent of non-joined dimensions " + "must be equivalent to the total extent."); + } + } + } + else + { + for (unsigned int i = 0; i < actualDim; i++) + { + if (!(joinedDim.has_value() && *joinedDim == i) && + offset[i] + extent[i] > shape[i]) + { + throw std::runtime_error( + "[ADIOS2] Dataset access out of bounds."); + } } } diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index d2fc05f379..8334c67314 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -326,6 +326,7 @@ struct OPENPMDAPI_EXPORT Parameter Extent extent = {}; Datatype dtype = Datatype::UNDEFINED; std::string options = "{}"; + std::optional joinedDimension; /** Warn about unused JSON paramters * diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index 3a924dfcc9..deec871d9c 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -537,6 +537,11 @@ OPENPMD_protected } void readBase(bool require_unit_si); + + template + void verifyChunk(Offset const &, Extent const &) const; + + void verifyChunk(Datatype, Offset const &, Extent const &) const; }; // RecordComponent } // namespace openPMD diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index e8ba6006ab..91498b05eb 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -259,8 +259,17 @@ RecordComponent::storeChunk(T_ContiguousContainer &data, Offset o, Extent e) // default arguments // offset = {0u}: expand to right dim {0u, 0u, ...} Offset offset = o; - if (o.size() == 1u && o.at(0) == 0u && dim > 1u) - offset = Offset(dim, 0u); + if (o.size() == 1u && o.at(0) == 0u) + { + if (joinedDimension().has_value()) + { + offset.clear(); + } + else if (dim > 1u) + { + offset = Offset(dim, 0u); + } + } // extent = {-1u}: take full size Extent extent(dim, 1u); @@ -278,38 +287,7 @@ template inline DynamicMemoryView RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) { - if (constant()) - throw std::runtime_error( - "Chunks cannot be written for a constant RecordComponent."); - if (empty()) - throw std::runtime_error( - "Chunks cannot be written for an empty RecordComponent."); - Datatype dtype = determineDatatype(); - if (dtype != getDatatype()) - { - std::ostringstream oss; - oss << "Datatypes of chunk data (" << dtype - << ") and record component (" << getDatatype() << ") do not match."; - throw std::runtime_error(oss.str()); - } - uint8_t dim = getDimensionality(); - if (e.size() != dim || o.size() != dim) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << o.size() << "D, " - << "extent=" << e.size() << "D) " - << "and record component (" << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } - Extent dse = getExtent(); - for (uint8_t i = 0; i < dim; ++i) - if (dse[i] < o[i] + e[i]) - throw std::runtime_error( - "Chunk does not reside inside dataset (Dimension on index " + - std::to_string(i) + ". DS: " + std::to_string(dse[i]) + - " - Chunk: " + std::to_string(o[i] + e[i]) + ")"); + verifyChunk(o, e); /* * The openPMD backend might not yet know about this dataset. @@ -334,6 +312,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) dCreate.name = rc.m_name; dCreate.extent = getExtent(); dCreate.dtype = getDatatype(); + dCreate.joinedDimension = joinedDimension(); if (!rc.m_dataset.has_value()) { throw error::WrongAPIUsage( @@ -407,4 +386,9 @@ auto RecordComponent::visit(Args &&...args) getDatatype(), *this, std::forward(args)...); } +template +void RecordComponent::verifyChunk(Offset const &o, Extent const &e) const +{ + verifyChunk(determineDatatype(), o, e); +} } // namespace openPMD diff --git a/include/openPMD/backend/BaseRecordComponent.hpp b/include/openPMD/backend/BaseRecordComponent.hpp index 0288e9bb9a..fe4490830d 100644 --- a/include/openPMD/backend/BaseRecordComponent.hpp +++ b/include/openPMD/backend/BaseRecordComponent.hpp @@ -143,6 +143,8 @@ class BaseRecordComponent : virtual public Attributable */ bool constant() const; + std::optional joinedDimension() const; + /** * Get data chunks that are available to be loaded from the backend. * Note that this is backend-dependent information and the returned diff --git a/include/openPMD/backend/PatchRecordComponent.hpp b/include/openPMD/backend/PatchRecordComponent.hpp index ff8b330c2f..4620f9be0b 100644 --- a/include/openPMD/backend/PatchRecordComponent.hpp +++ b/include/openPMD/backend/PatchRecordComponent.hpp @@ -20,6 +20,7 @@ */ #pragma once +#include "openPMD/Error.hpp" #include "openPMD/RecordComponent.hpp" #include "openPMD/auxiliary/ShareRawInternal.hpp" #include "openPMD/backend/BaseRecordComponent.hpp" @@ -85,6 +86,9 @@ class PatchRecordComponent : public RecordComponent template void store(uint64_t idx, T); + template + void store(T); + // clang-format off OPENPMD_private // clang-format on @@ -180,4 +184,33 @@ inline void PatchRecordComponent::store(uint64_t idx, T data) auto &rc = get(); rc.m_chunks.push(IOTask(this, std::move(dWrite))); } + +template +inline void PatchRecordComponent::store(T data) +{ + Datatype dtype = determineDatatype(); + if (dtype != getDatatype()) + { + std::ostringstream oss; + oss << "Datatypes of patch data (" << dtype << ") and dataset (" + << getDatatype() << ") do not match."; + throw std::runtime_error(oss.str()); + } + + if (!joinedDimension().has_value()) + { + throw error::WrongAPIUsage( + "[PatchRecordComponent::store] API call without explicit " + "specification of index only allowed when a joined dimension is " + "specified."); + } + + Parameter dWrite; + dWrite.offset = {}; + dWrite.extent = {1}; + dWrite.dtype = dtype; + dWrite.data = std::make_shared(data); + auto &rc = get(); + rc.m_chunks.push(IOTask(this, std::move(dWrite))); +} } // namespace openPMD diff --git a/src/Dataset.cpp b/src/Dataset.cpp index 662bd2d29f..c1546e9ef0 100644 --- a/src/Dataset.cpp +++ b/src/Dataset.cpp @@ -19,6 +19,7 @@ * If not, see . */ #include "openPMD/Dataset.hpp" +#include "openPMD/Error.hpp" #include #include @@ -30,6 +31,9 @@ Dataset::Dataset(Datatype d, Extent e, std::string options_in) { // avoid initialization order issues rank = static_cast(extent.size()); + // Call this in order to have early error message in case of wrong + // specification of joined dimensions + joinedDimension(); } Dataset::Dataset(Extent e) : Dataset(Datatype::UNDEFINED, std::move(e)) @@ -49,4 +53,41 @@ Dataset &Dataset::extend(Extent newExtents) extent = newExtents; return *this; } + +bool Dataset::empty() const +{ + auto jd = joinedDimension(); + for (size_t i = 0; i < extent.size(); ++i) + { + if (extent[i] == 0 && (!jd.has_value() || jd.value() != i)) + { + return true; + } + } + return false; +} + +std::optional Dataset::joinedDimension() const +{ + std::optional res; + for (size_t i = 0; i < extent.size(); ++i) + { + if (extent[i] == JOINED_DIMENSION) + { + if (res.has_value()) + { + throw error::WrongAPIUsage( + "Must specify JOINED_DIMENSION at most once (found at " + "indices " + + std::to_string(res.value()) + " and " + std::to_string(i) + + ")"); + } + else + { + res = i; + } + } + } + return res; +} } // namespace openPMD diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 08fac073cf..839f59dd03 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -68,6 +68,18 @@ namespace openPMD #if openPMD_HAVE_ADIOS2 +std::optional joinedDimension(adios2::Dims const &dims) +{ + for (size_t i = 0; i < dims.size(); ++i) + { + if (dims[i] == adios2::JoinedDim) + { + return i; + } + } + return std::nullopt; +} + #if openPMD_HAVE_MPI ADIOS2IOHandlerImpl::ADIOS2IOHandlerImpl( @@ -708,6 +720,13 @@ void ADIOS2IOHandlerImpl::createDataset( "[ADIOS2] Creating a dataset in a file opened as read " "only is not possible."); } +#if !openPMD_HAS_ADIOS_2_9 + if (parameters.joinedDimension.has_value()) + { + error::throwOperationUnsupportedInBackend( + "ADIOS1", "Joined Arrays require ADIOS2 >= v2.9"); + } +#endif if (!writable->written) { /* Sanitize name */ @@ -741,8 +760,11 @@ void ADIOS2IOHandlerImpl::createDataset( varName + "' remain unused:\n"); // cast from openPMD::Extent to adios2::Dims - adios2::Dims const shape( - parameters.extent.begin(), parameters.extent.end()); + adios2::Dims shape(parameters.extent.begin(), parameters.extent.end()); + if (auto jd = parameters.joinedDimension; jd.has_value()) + { + shape[jd.value()] = adios2::JoinedDim; + } auto &fileData = getFileData(file, IfFileNotOpen::ThrowError); diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index ac4fcaddf9..7caac73776 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -446,6 +446,12 @@ void HDF5IOHandlerImpl::createDataset( "[HDF5] Creating a dataset in a file opened as read only is not " "possible."); + if (parameters.joinedDimension.has_value()) + { + error::throwOperationUnsupportedInBackend( + "ADIOS1", "Joined Arrays currently only supported in ADIOS2"); + } + if (!writable->written) { /* Sanitize name */ diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index a4e1bb39ab..36d153de6a 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -260,6 +260,12 @@ void JSONIOHandlerImpl::createDataset( "[JSON] Creating a dataset in a file opened as read only is not " "possible."); } + if (parameter.joinedDimension.has_value()) + { + error::throwOperationUnsupportedInBackend( + "ADIOS1", "Joined Arrays currently only supported in ADIOS2"); + } + if (!writable->written) { /* Sanitize name */ diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 3cca47106a..a8f7d734ba 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -95,10 +95,7 @@ RecordComponent &RecordComponent::resetDataset(Dataset d) } // if( d.extent.empty() ) // throw std::runtime_error("Dataset extent must be at least 1D."); - if (std::any_of( - d.extent.begin(), d.extent.end(), [](Extent::value_type const &i) { - return i == 0u; - })) + if (d.empty()) return makeEmpty(std::move(d)); rc.m_isEmpty = false; @@ -299,6 +296,7 @@ void RecordComponent::flush( dCreate.extent = getExtent(); dCreate.dtype = getDatatype(); dCreate.options = rc.m_dataset.value().options; + dCreate.joinedDimension = joinedDimension(); IOHandler()->enqueue(IOTask(this, dCreate)); } } @@ -452,6 +450,21 @@ bool RecordComponent::dirtyRecursive() const void RecordComponent::storeChunk( auxiliary::WriteBuffer buffer, Datatype dtype, Offset o, Extent e) +{ + verifyChunk(dtype, o, e); + + Parameter dWrite; + dWrite.offset = std::move(o); + dWrite.extent = std::move(e); + dWrite.dtype = dtype; + /* std::static_pointer_cast correctly reference-counts the pointer */ + dWrite.data = std::move(buffer); + auto &rc = get(); + rc.m_chunks.push(IOTask(this, std::move(dWrite))); +} + +void RecordComponent::verifyChunk( + Datatype dtype, Offset const &o, Extent const &e) const { if (constant()) throw std::runtime_error( @@ -467,32 +480,59 @@ void RecordComponent::storeChunk( throw std::runtime_error(oss.str()); } uint8_t dim = getDimensionality(); - if (e.size() != dim || o.size() != dim) - { - std::ostringstream oss; - oss << "Dimensionality of chunk (" - << "offset=" << o.size() << "D, " - << "extent=" << e.size() << "D) " - << "and record component (" << int(dim) << "D) " - << "do not match."; - throw std::runtime_error(oss.str()); - } Extent dse = getExtent(); - for (uint8_t i = 0; i < dim; ++i) - if (dse[i] < o[i] + e[i]) - throw std::runtime_error( - "Chunk does not reside inside dataset (Dimension on index " + - std::to_string(i) + ". DS: " + std::to_string(dse[i]) + - " - Chunk: " + std::to_string(o[i] + e[i]) + ")"); - Parameter dWrite; - dWrite.offset = o; - dWrite.extent = e; - dWrite.dtype = dtype; - /* std::static_pointer_cast correctly reference-counts the pointer */ - dWrite.data = std::move(buffer); - auto &rc = get(); - rc.m_chunks.push(IOTask(this, std::move(dWrite))); + if (auto jd = joinedDimension(); jd.has_value()) + { + if (o.size() != 0) + { + std::ostringstream oss; + oss << "Joined array: Must specify an empty offset (given: " + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D)."; + throw std::runtime_error(oss.str()); + } + if (e.size() != dim) + { + std::ostringstream oss; + oss << "Joined array: Dimensionalities of chunk extent and dataset " + "extent must be equivalent (given: " + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D)."; + throw std::runtime_error(oss.str()); + } + for (size_t i = 0; i < dim; ++i) + { + if (i != jd.value() && e[i] != dse[i]) + { + throw std::runtime_error( + "Joined array: Chunk extent on non-joined dimensions must " + "be equivalent to dataset extents (Dimension on index " + + std::to_string(i) + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + ")"); + } + } + } + else + { + if (e.size() != dim || o.size() != dim) + { + std::ostringstream oss; + oss << "Dimensionality of chunk (" + << "offset=" << o.size() << "D, " + << "extent=" << e.size() << "D) " + << "and record component (" << int(dim) << "D) " + << "do not match."; + throw std::runtime_error(oss.str()); + } + for (uint8_t i = 0; i < dim; ++i) + if (dse[i] < o[i] + e[i]) + throw std::runtime_error( + "Chunk does not reside inside dataset (Dimension on " + "index " + + std::to_string(i) + ". DS: " + std::to_string(dse[i]) + + " - Chunk: " + std::to_string(o[i] + e[i]) + ")"); + } } namespace diff --git a/src/backend/BaseRecordComponent.cpp b/src/backend/BaseRecordComponent.cpp index 96b38beed5..839cdc55a6 100644 --- a/src/backend/BaseRecordComponent.cpp +++ b/src/backend/BaseRecordComponent.cpp @@ -65,6 +65,19 @@ bool BaseRecordComponent::constant() const return get().m_isConstant; } +std::optional BaseRecordComponent::joinedDimension() const +{ + auto &rc = get(); + if (rc.m_dataset.has_value()) + { + return rc.m_dataset.value().joinedDimension(); + } + else + { + return false; + } +} + ChunkTable BaseRecordComponent::availableChunks() { auto &rc = get(); diff --git a/src/backend/PatchRecordComponent.cpp b/src/backend/PatchRecordComponent.cpp index 748f1e1bd9..9252eb19ad 100644 --- a/src/backend/PatchRecordComponent.cpp +++ b/src/backend/PatchRecordComponent.cpp @@ -42,14 +42,11 @@ PatchRecordComponent &PatchRecordComponent::resetDataset(Dataset d) "written."); if (d.extent.empty()) throw std::runtime_error("Dataset extent must be at least 1D."); - if (std::any_of( - d.extent.begin(), d.extent.end(), [](Extent::value_type const &i) { - return i == 0u; - })) + if (d.empty()) throw std::runtime_error( "Dataset extent must not be zero in any dimension."); - get().m_dataset = d; + get().m_dataset = std::move(d); dirty() = true; return *this; } diff --git a/src/binding/python/Dataset.cpp b/src/binding/python/Dataset.cpp index 656cd59ea8..70d85721f2 100644 --- a/src/binding/python/Dataset.cpp +++ b/src/binding/python/Dataset.cpp @@ -27,58 +27,66 @@ void init_Dataset(py::module &m) { - py::class_(m, "Dataset") + auto pyDataset = + py::class_(m, "Dataset") + .def( + py::init(), + py::arg("dtype"), + py::arg("extent")) + .def(py::init(), py::arg("extent")) + .def( + py::init([](py::dtype dt, Extent const &e) { + auto const d = dtype_from_numpy(std::move(dt)); + return new Dataset{d, e}; + }), + py::arg("dtype"), + py::arg("extent")) + .def( + py::init(), + py::arg("dtype"), + py::arg("extent"), + py::arg("options")) + .def( + py::init([](py::dtype dt, Extent e, std::string options) { + auto const d = dtype_from_numpy(std::move(dt)); + return new Dataset{d, std::move(e), std::move(options)}; + }), + py::arg("dtype"), + py::arg("extent"), + py::arg("options")) - .def(py::init(), py::arg("dtype"), py::arg("extent")) - .def(py::init(), py::arg("extent")) - .def( - py::init([](py::dtype dt, Extent const &e) { - auto const d = dtype_from_numpy(std::move(dt)); - return new Dataset{d, e}; - }), - py::arg("dtype"), - py::arg("extent")) - .def( - py::init(), - py::arg("dtype"), - py::arg("extent"), - py::arg("options")) - .def( - py::init([](py::dtype dt, Extent const &e, std::string options) { - auto const d = dtype_from_numpy(std::move(dt)); - return new Dataset{d, e, std::move(options)}; - }), - py::arg("dtype"), - py::arg("extent"), - py::arg("options")) - - .def( - "__repr__", - [](const Dataset &d) { - std::stringstream stream; - stream << ""; - } - else - { - auto begin = d.extent.begin(); - stream << '[' << *begin++; - for (; begin != d.extent.end(); ++begin) + .def( + "__repr__", + [](const Dataset &d) { + std::stringstream stream; + stream << ""; + } + else { - stream << ", " << *begin; + auto begin = d.extent.begin(); + stream << '[' << *begin++; + for (; begin != d.extent.end(); ++begin) + { + stream << ", " << *begin; + } + stream << "]>"; } - stream << "]>"; - } - return stream.str(); - }) + return stream.str(); + }) - .def_readonly("extent", &Dataset::extent) - .def("extend", &Dataset::extend) - .def_readonly("rank", &Dataset::rank) - .def_property_readonly( - "dtype", [](const Dataset &d) { return dtype_to_numpy(d.dtype); }) - .def_readwrite("options", &Dataset::options); + .def_property_readonly( + "joined_dimension", &Dataset::joinedDimension) + .def_readonly("extent", &Dataset::extent) + .def("extend", &Dataset::extend) + .def_readonly("rank", &Dataset::rank) + .def_property_readonly( + "dtype", + [](const Dataset &d) { return dtype_to_numpy(d.dtype); }) + .def_readwrite("options", &Dataset::options); + pyDataset.attr("JOINED_DIMENSION") = + py::int_(uint64_t(Dataset::JOINED_DIMENSION)); } diff --git a/src/binding/python/PatchRecordComponent.cpp b/src/binding/python/PatchRecordComponent.cpp index 3454d76222..311272f5b6 100644 --- a/src/binding/python/PatchRecordComponent.cpp +++ b/src/binding/python/PatchRecordComponent.cpp @@ -189,12 +189,14 @@ void init_PatchRecordComponent(py::module &m) // allowed python intrinsics, after (!) buffer matching .def( "store", - &PatchRecordComponent::store, + py::overload_cast( + &PatchRecordComponent::store), py::arg("idx"), py::arg("data")) .def( "store", - &PatchRecordComponent::store, + py::overload_cast( + &PatchRecordComponent::store), py::arg("idx"), py::arg("data")) diff --git a/src/binding/python/RecordComponent.cpp b/src/binding/python/RecordComponent.cpp index 37ad9a7cff..af58f0d2c8 100644 --- a/src/binding/python/RecordComponent.cpp +++ b/src/binding/python/RecordComponent.cpp @@ -18,12 +18,15 @@ * and the GNU Lesser General Public License along with openPMD-api. * If not, see . */ +#include +#include #include #include #include #include "openPMD/DatatypeHelpers.hpp" #include "openPMD/Error.hpp" +#include "openPMD/RecordComponent.hpp" #include "openPMD/Series.hpp" #include "openPMD/backend/BaseRecordComponent.hpp" @@ -40,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -111,14 +115,48 @@ inline std::tuple> parseTupleSlices( py::slice slice = py::cast(slices[i]); size_t start, stop, step, slicelength; + auto mocked_extent = full_extent.at(curAxis); + // py::ssize_t is a signed type, so we will need to use another + // magic number for JOINED_DIMENSION in this computation, since the + // C++ API's JOINED_DIMENSION would be interpreted as a negative + // index + bool undo_mocked_extent = false; + constexpr auto PYTHON_JOINED_DIMENSION = + std::numeric_limits::max() - 1; + if (mocked_extent == Dataset::JOINED_DIMENSION) + { + undo_mocked_extent = true; + mocked_extent = PYTHON_JOINED_DIMENSION; + } if (!slice.compute( - full_extent.at(curAxis), - &start, - &stop, - &step, - &slicelength)) + mocked_extent, &start, &stop, &step, &slicelength)) throw py::error_already_set(); + if (undo_mocked_extent) + { + // do the same calculation again, but with another global extent + // (that is not smaller than the previous in order to avoid + // cutting off the range) + // this is to avoid the unlikely case + // that the mocked alternative value is actually the intended + // one + size_t start2, stop2, step2, slicelength2; + if (!slice.compute( + mocked_extent + 1, + &start2, + &stop2, + &step2, + &slicelength2)) + throw py::error_already_set(); + if (slicelength == slicelength2) + { + // slicelength was given as an absolute value and + // accidentally hit our mocked value + // --> keep that value + undo_mocked_extent = false; + } + } + // TODO PySlice_AdjustIndices: Python 3.6.1+ // Adjust start/end slice indices assuming a sequence of the // specified length. Out of bounds indices are clipped in a @@ -132,7 +170,10 @@ inline std::tuple> parseTupleSlices( // verified for size later in C++ API offset.at(curAxis) = start; - extent.at(curAxis) = slicelength; // stop - start; + extent.at(curAxis) = + undo_mocked_extent && slicelength == PYTHON_JOINED_DIMENSION + ? Dataset::JOINED_DIMENSION + : slicelength; // stop - start; continue; } @@ -187,6 +228,59 @@ inline std::tuple> parseTupleSlices( return std::make_tuple(offset, extent, flatten); } +inline std::tuple> parseJoinedTupleSlices( + uint8_t const ndim, + Extent const &full_extent, + py::tuple const &slices, + size_t joined_dim, + py::array const &a) +{ + + std::vector flatten; + Offset offset; + Extent extent; + std::tie(offset, extent, flatten) = + parseTupleSlices(ndim, full_extent, slices); + for (size_t i = 0; i < ndim; ++i) + { + if (offset.at(i) != 0) + { + throw std::runtime_error( + "Joined array: Cannot use non-zero offset in store_chunk " + "(offset[" + + std::to_string(i) + "] = " + std::to_string(offset[i]) + ")."); + } + if (flatten.at(i)) + { + throw std::runtime_error( + "Flattened slices unimplemented for joined arrays."); + } + + if (i == joined_dim) + { + if (extent.at(i) == 0 || extent.at(i) == Dataset::JOINED_DIMENSION) + { + extent[i] = a.shape()[i]; + } + } + else + { + if (extent.at(i) != full_extent.at(i)) + { + throw std::runtime_error( + "Joined array: Must use full extent in store_chunk for " + "non-joined dimension " + "(local_extent[" + + std::to_string(i) + "] = " + std::to_string(extent[i]) + + " != global_extent[" + std::to_string(i) + + "] = " + std::to_string(full_extent[i]) + ")."); + } + } + } + offset.clear(); + return std::make_tuple(offset, extent, flatten); +} + /** Check an array is a contiguous buffer * * Required are contiguous buffers for store and load @@ -265,23 +359,52 @@ inline void store_chunk( "in record component (") + std::to_string(r_shape.size()) + std::string("D)")); - for (auto d = 0; d < a.ndim(); ++d) + if (auto joined_dim = r.joinedDimension(); joined_dim.has_value()) { - // selection causes overflow of r - if (offset.at(d) + extent.at(d) > r_shape.at(d)) - throw py::index_error( - std::string("slice ") + std::to_string(offset.at(d)) + - std::string(":") + std::to_string(extent.at(d)) + - std::string(" is out of bounds for axis ") + std::to_string(d) + - std::string(" with size ") + std::to_string(r_shape.at(d))); - // underflow of selection in r for given a - if (s_shape.at(d) != std::uint64_t(a.shape()[d])) - throw py::index_error( - std::string("size of chunk (") + std::to_string(a.shape()[d]) + - std::string(") for axis ") + std::to_string(d) + - std::string(" does not match selection ") + - std::string("size in record component (") + - std::to_string(s_extent.at(d)) + std::string(")")); + for (py::ssize_t d = 0; d < a.ndim(); ++d) + { + // selection causes overflow of r + if (d != py::ssize_t(*joined_dim) && extent.at(d) != r_shape.at(d)) + throw py::index_error( + std::string("selection for axis ") + std::to_string(d) + + " of record component with joined dimension " + + std::to_string(*joined_dim) + + " must be equivalent to its global extent " + + std::to_string(extent.at(d)) + ", but was " + + std::to_string(r_shape.at(d)) + "."); + // underflow of selection in r for given a + if (s_shape.at(d) != std::uint64_t(a.shape()[d])) + throw py::index_error( + std::string("size of chunk (") + + std::to_string(a.shape()[d]) + std::string(") for axis ") + + std::to_string(d) + + std::string(" does not match selection ") + + std::string("size in record component (") + + std::to_string(s_extent.at(d)) + std::string(")")); + } + } + else + { + for (auto d = 0; d < a.ndim(); ++d) + { + // selection causes overflow of r + if (offset.at(d) + extent.at(d) > r_shape.at(d)) + throw py::index_error( + std::string("slice ") + std::to_string(offset.at(d)) + + std::string(":") + std::to_string(extent.at(d)) + + std::string(" is out of bounds for axis ") + + std::to_string(d) + std::string(" with size ") + + std::to_string(r_shape.at(d))); + // underflow of selection in r for given a + if (s_shape.at(d) != std::uint64_t(a.shape()[d])) + throw py::index_error( + std::string("size of chunk (") + + std::to_string(a.shape()[d]) + std::string(") for axis ") + + std::to_string(d) + + std::string(" does not match selection ") + + std::string("size in record component (") + + std::to_string(s_extent.at(d)) + std::string(")")); + } } check_buffer_is_contiguous(a); @@ -359,8 +482,17 @@ store_chunk(RecordComponent &r, py::array &a, py::tuple const &slices) Offset offset; Extent extent; std::vector flatten; - std::tie(offset, extent, flatten) = - parseTupleSlices(ndim, full_extent, slices); + if (auto joined_dimension = r.joinedDimension(); + joined_dimension.has_value()) + { + std::tie(offset, extent, flatten) = parseJoinedTupleSlices( + ndim, full_extent, slices, *joined_dimension, a); + } + else + { + std::tie(offset, extent, flatten) = + parseTupleSlices(ndim, full_extent, slices); + } store_chunk(r, a, offset, extent, flatten); } diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 0b7f3e672a..84f868a059 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1785,4 +1785,160 @@ TEST_CASE("unavailable_backend", "[core][parallel]") } #endif } + +void joined_dim(std::string const &ext) +{ + using type = float; + using patchType = uint64_t; + constexpr size_t patches_per_rank = 5; + constexpr size_t length_of_patch = 10; + + int size{-1}; + int rank{-1}; + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + { + Series s( + "../samples/joinedDimParallel." + ext, + Access::CREATE, + MPI_COMM_WORLD); + std::vector> writeFrom(patches_per_rank); + + auto it = s.writeIterations()[100]; + + Dataset numParticlesDS( + determineDatatype(), {Dataset::JOINED_DIMENSION}); + auto numParticles = + it.particles["e"] + .particlePatches["numParticles"][RecordComponent::SCALAR]; + auto numParticlesOffset = + it.particles["e"] + .particlePatches["numParticlesOffset"][RecordComponent::SCALAR]; + numParticles.resetDataset(numParticlesDS); + numParticlesOffset.resetDataset(numParticlesDS); + + auto patchOffset = it.particles["e"].particlePatches["offset"]["x"]; + auto patchExtent = it.particles["e"].particlePatches["extent"]["x"]; + Dataset particlePatchesDS( + determineDatatype(), {Dataset::JOINED_DIMENSION}); + patchOffset.resetDataset(particlePatchesDS); + patchExtent.resetDataset(particlePatchesDS); + + float start_value = rank * patches_per_rank * length_of_patch; + for (size_t i = 0; i < 5; ++i) + { + writeFrom[i] = UniquePtrWithLambda( + new type[length_of_patch], + [](auto const *ptr) { delete[] ptr; }); + std::iota( + writeFrom[i].get(), + writeFrom[i].get() + 10, + start_value + length_of_patch * i); + patchOffset.store(start_value + length_of_patch * i); + } + + auto epx = it.particles["e"]["position"]["x"]; + Dataset ds(determineDatatype(), {Dataset::JOINED_DIMENSION}); + epx.resetDataset(ds); + + size_t counter = 0; + for (auto &chunk : writeFrom) + { + epx.storeChunk(std::move(chunk), {}, {length_of_patch}); + numParticles.store(length_of_patch); + /* + * For the sake of the test case, we know that the + * numParticlesOffset has this value. In general, the purpose of the + * joined array is that we don't need to know these values, so the + * specification of particle patches is somewhat difficult. + */ + numParticlesOffset.store( + start_value + counter++ * length_of_patch); + patchExtent.store(10); + } + writeFrom.clear(); + it.close(); + s.close(); + } + + { + Series s( + "../samples/joinedDimParallel." + ext, + Access::READ_ONLY, + MPI_COMM_WORLD); + auto it = s.iterations[100]; + auto e = it.particles["e"]; + + auto particleData = e["position"]["x"].loadChunk(); + auto numParticles = + e.particlePatches["numParticles"][RecordComponent::SCALAR] + .load(); + auto numParticlesOffset = + e.particlePatches["numParticlesOffset"][RecordComponent::SCALAR] + .load(); + auto patchOffset = e.particlePatches["offset"]["x"].load(); + auto patchExtent = e.particlePatches["extent"]["x"].load(); + + it.close(); + + // check validity of particle patches + auto numPatches = + e.particlePatches["numParticlesOffset"][RecordComponent::SCALAR] + .getExtent()[0]; + REQUIRE( + e.particlePatches["numParticles"][RecordComponent::SCALAR] + .getExtent()[0] == numPatches); + for (size_t i = 0; i < numPatches; ++i) + { + for (size_t j = 0; j < numParticles.get()[i]; ++j) + { + REQUIRE( + patchOffset.get()[i] <= + particleData.get()[numParticlesOffset.get()[i] + j]); + REQUIRE( + particleData.get()[numParticlesOffset.get()[i] + j] < + patchOffset.get()[i] + patchExtent.get()[i]); + } + } + + /* + * Check that joined array joins early writes before later writes from + * the same rank + */ + for (size_t i = 0; i < size * length_of_patch * patches_per_rank; ++i) + { + REQUIRE(float(i) == particleData.get()[i]); + } + for (size_t i = 0; i < size * patches_per_rank; ++i) + { + REQUIRE(length_of_patch * i == numParticlesOffset.get()[i]); + REQUIRE(type(length_of_patch * i) == patchOffset.get()[i]); + } + } +} + +TEST_CASE("joined_dim", "[parallel]") +{ +#if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ + 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ + 209000000 + constexpr char const *supportsJoinedDims[] = {"bp", "bp4", "bp5"}; +#else + // no zero-size arrays + std::vector supportsJoinedDims; +#endif + for (auto const &t : testedFileExtensions()) + { + for (auto const supported : supportsJoinedDims) + { + if (t == supported) + { + joined_dim(t); + break; + } + } + } +} + #endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index e0d4c75348..f74a81a490 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -7371,3 +7371,147 @@ TEST_CASE("groupbased_read_write", "[serial]") groupbased_read_write("toml"); } } + +void joined_dim(std::string const &ext) +{ + using type = float; + using patchType = uint64_t; + constexpr size_t patches_per_rank = 5; + constexpr size_t length_of_patch = 10; + + { + Series s("../samples/joinedDimParallel." + ext, Access::CREATE); + std::vector> writeFrom(patches_per_rank); + + auto it = s.writeIterations()[100]; + + Dataset numParticlesDS( + determineDatatype(), {Dataset::JOINED_DIMENSION}); + auto numParticles = + it.particles["e"] + .particlePatches["numParticles"][RecordComponent::SCALAR]; + auto numParticlesOffset = + it.particles["e"] + .particlePatches["numParticlesOffset"][RecordComponent::SCALAR]; + numParticles.resetDataset(numParticlesDS); + numParticlesOffset.resetDataset(numParticlesDS); + + auto patchOffset = it.particles["e"].particlePatches["offset"]["x"]; + auto patchExtent = it.particles["e"].particlePatches["extent"]["x"]; + Dataset particlePatchesDS( + determineDatatype(), {Dataset::JOINED_DIMENSION}); + patchOffset.resetDataset(particlePatchesDS); + patchExtent.resetDataset(particlePatchesDS); + + for (size_t i = 0; i < 5; ++i) + { + writeFrom[i] = UniquePtrWithLambda( + new type[length_of_patch], + [](auto const *ptr) { delete[] ptr; }); + std::iota( + writeFrom[i].get(), + writeFrom[i].get() + 10, + length_of_patch * i); + patchOffset.store(length_of_patch * i); + } + + auto epx = it.particles["e"]["position"]["x"]; + Dataset ds(determineDatatype(), {Dataset::JOINED_DIMENSION}); + epx.resetDataset(ds); + + size_t counter = 0; + for (auto &chunk : writeFrom) + { + epx.storeChunk(std::move(chunk), {}, {length_of_patch}); + numParticles.store(length_of_patch); + /* + * For the sake of the test case, we know that the + * numParticlesOffset has this value. In general, the purpose of the + * joined array is that we don't need to know these values, so the + * specification of particle patches is somewhat difficult. + */ + numParticlesOffset.store(counter++ * length_of_patch); + patchExtent.store(10); + } + writeFrom.clear(); + it.close(); + s.close(); + } + + { + Series s("../samples/joinedDimParallel." + ext, Access::READ_ONLY); + auto it = s.iterations[100]; + auto e = it.particles["e"]; + + auto particleData = e["position"]["x"].loadChunk(); + auto numParticles = + e.particlePatches["numParticles"][RecordComponent::SCALAR] + .load(); + auto numParticlesOffset = + e.particlePatches["numParticlesOffset"][RecordComponent::SCALAR] + .load(); + auto patchOffset = e.particlePatches["offset"]["x"].load(); + auto patchExtent = e.particlePatches["extent"]["x"].load(); + + it.close(); + + // check validity of particle patches + auto numPatches = + e.particlePatches["numParticlesOffset"][RecordComponent::SCALAR] + .getExtent()[0]; + REQUIRE( + e.particlePatches["numParticles"][RecordComponent::SCALAR] + .getExtent()[0] == numPatches); + for (size_t i = 0; i < numPatches; ++i) + { + for (size_t j = 0; j < numParticles.get()[i]; ++j) + { + REQUIRE( + patchOffset.get()[i] <= + particleData.get()[numParticlesOffset.get()[i] + j]); + REQUIRE( + particleData.get()[numParticlesOffset.get()[i] + j] < + patchOffset.get()[i] + patchExtent.get()[i]); + } + } + + /* + * Check that: + * 1. Joined array joins writes from lower ranks before higher ranks + * 2. Joined array joins early writes before later writes from the same + * rank + */ + for (size_t i = 0; i < length_of_patch * patches_per_rank; ++i) + { + REQUIRE(float(i) == particleData.get()[i]); + } + for (size_t i = 0; i < patches_per_rank; ++i) + { + REQUIRE(length_of_patch * i == numParticlesOffset.get()[i]); + REQUIRE(type(length_of_patch * i) == patchOffset.get()[i]); + } + } +} + +TEST_CASE("joined_dim", "[serial]") +{ +#if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ + 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ + 209000000 + constexpr char const *supportsJoinedDims[] = {"bp", "bp4", "bp5"}; +#else + // no zero-size arrays + std::vector supportsJoinedDims; +#endif + for (auto const &t : testedFileExtensions()) + { + for (auto const supported : supportsJoinedDims) + { + if (t == supported) + { + joined_dim(t); + break; + } + } + } +}