diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index ff8b2fcccb..d253cd7731 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -84,7 +84,11 @@ enum class FlushLevel : unsigned char * CREATE_DATASET tasks. * Attributes may or may not be flushed yet. */ - SkeletonOnly + SkeletonOnly, + /** + * Only creates/opens files, nothing more + */ + CreateOrOpenFiles }; namespace internal diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index 3d8ccb4567..2a3429d439 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -72,6 +72,7 @@ namespace internal * containing this iteration. */ std::string filename; + bool beginStep = false; }; class IterationData : public AttributableData @@ -249,14 +250,16 @@ class Iteration : public Attributable void flush(internal::FlushParams const &); void deferParseAccess(internal::DeferredParseAccess); /* - * Control flow for read(), readFileBased(), readGroupBased() and - * read_impl(): - * read() is called as the entry point. File-based and group-based + * Control flow for runDeferredParseAccess(), readFileBased(), + * readGroupBased() and read_impl(): + * runDeferredParseAccess() is called as the entry point. + * File-based and group-based * iteration layouts need to be parsed slightly differently: * In file-based iteration layout, each iteration's file also contains * attributes for the /data group. In group-based layout, those have * already been parsed during opening of the Series. - * Hence, read() will call either readFileBased() or readGroupBased() to + * Hence, runDeferredParseAccess() will call either readFileBased() or + * readGroupBased() to * allow for those different control flows. * Finally, read_impl() is called which contains the common parsing * logic for an iteration. @@ -265,10 +268,10 @@ class Iteration : public Attributable * Calling it on an Iteration not yet parsed is an error. * */ - void read(); void reread(std::string const &path); - void readFileBased(std::string filePath, std::string const &groupPath); - void readGorVBased(std::string const &groupPath); + void readFileBased( + std::string filePath, std::string const &groupPath, bool beginStep); + void readGorVBased(std::string const &groupPath, bool beginStep); void read_impl(std::string const &groupPath); /** @@ -278,7 +281,7 @@ class Iteration : public Attributable * * @return AdvanceStatus */ - AdvanceStatus beginStep(); + AdvanceStatus beginStep(bool reread); /** * @brief End an IO step on the IO file (or file-like object) diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 5afa60628a..76e52ffab5 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -2614,6 +2614,7 @@ namespace detail case FlushLevel::InternalFlush: case FlushLevel::SkeletonOnly: + case FlushLevel::CreateOrOpenFiles: /* * Tasks have been given to ADIOS2, but we don't flush them * yet. So, move everything to m_alreadyEnqueued to avoid diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 73c8a46847..4ffadd9c67 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -148,13 +148,12 @@ Iteration &Iteration::open() if (it.m_closed == CloseStatus::ParseAccessDeferred) { it.m_closed = CloseStatus::Open; + runDeferredParseAccess(); } - runDeferredParseAccess(); Series s = retrieveSeries(); // figure out my iteration number auto begin = s.indexOf(*this); s.openIteration(begin->first, *this); - // @todo, maybe collective here IOHandler()->flush(internal::defaultFlushParams); return *this; } @@ -238,7 +237,10 @@ void Iteration::flushFileBased( s.openIteration(i, *this); } - flush(flushParams); + if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles) + { + flush(flushParams); + } } void Iteration::flushGroupBased( @@ -252,7 +254,10 @@ void Iteration::flushGroupBased( IOHandler()->enqueue(IOTask(this, pCreate)); } - flush(flushParams); + if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles) + { + flush(flushParams); + } } void Iteration::flushVariableBased( @@ -267,7 +272,10 @@ void Iteration::flushVariableBased( this->setAttribute("snapshot", i); } - flush(flushParams); + if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles) + { + flush(flushParams); + } } void Iteration::flush(internal::FlushParams const &flushParams) @@ -327,26 +335,6 @@ void Iteration::deferParseAccess(DeferredParseAccess dr) std::make_optional(std::move(dr)); } -void Iteration::read() -{ - auto &it = get(); - if (!it.m_deferredParseAccess.has_value()) - { - return; - } - auto const &deferred = it.m_deferredParseAccess.value(); - if (deferred.fileBased) - { - readFileBased(deferred.filename, deferred.path); - } - else - { - readGorVBased(deferred.path); - } - // reset this thing - it.m_deferredParseAccess = std::optional(); -} - void Iteration::reread(std::string const &path) { if (get().m_deferredParseAccess.has_value()) @@ -359,8 +347,15 @@ void Iteration::reread(std::string const &path) } void Iteration::readFileBased( - std::string filePath, std::string const &groupPath) + std::string filePath, std::string const &groupPath, bool doBeginStep) { + if (doBeginStep) + { + /* + * beginStep() must take care to open files + */ + beginStep(/* reread = */ false); + } auto series = retrieveSeries(); series.readOneIterationFileBased(filePath); @@ -368,9 +363,15 @@ void Iteration::readFileBased( read_impl(groupPath); } -void Iteration::readGorVBased(std::string const &groupPath) +void Iteration::readGorVBased(std::string const &groupPath, bool doBeginStep) { - + if (doBeginStep) + { + /* + * beginStep() must take care to open files + */ + beginStep(/* reread = */ false); + } read_impl(groupPath); } @@ -540,7 +541,7 @@ void Iteration::read_impl(std::string const &groupPath) readAttributes(ReadMode::FullyReread); } -AdvanceStatus Iteration::beginStep() +AdvanceStatus Iteration::beginStep(bool reread) { using IE = IterationEncoding; auto series = retrieveSeries(); @@ -565,7 +566,8 @@ AdvanceStatus Iteration::beginStep() } // re-read -> new datasets might be available - if ((series.iterationEncoding() == IE::groupBased || + if (reread && + (series.iterationEncoding() == IE::groupBased || series.iterationEncoding() == IE::variableBased) && (this->IOHandler()->m_frontendAccess == Access::READ_ONLY || this->IOHandler()->m_frontendAccess == Access::READ_WRITE)) @@ -679,18 +681,37 @@ void Iteration::runDeferredParseAccess() { return; } + + auto &it = get(); + if (!it.m_deferredParseAccess.has_value()) + { + return; + } + auto const &deferred = it.m_deferredParseAccess.value(); + auto oldAccess = IOHandler()->m_frontendAccess; auto newAccess = const_cast(&IOHandler()->m_frontendAccess); *newAccess = Access::READ_WRITE; try { - read(); + if (deferred.fileBased) + { + readFileBased(deferred.filename, deferred.path, deferred.beginStep); + } + else + { + readGorVBased(deferred.path, deferred.beginStep); + } } catch (...) { + // reset this thing + it.m_deferredParseAccess = std::optional(); *newAccess = oldAccess; throw; } + // reset this thing + it.m_deferredParseAccess = std::optional(); *newAccess = oldAccess; } diff --git a/src/ReadIterations.cpp b/src/ReadIterations.cpp index bd70a91366..b677d97535 100644 --- a/src/ReadIterations.cpp +++ b/src/ReadIterations.cpp @@ -63,7 +63,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) * the step after parsing the file is ok. */ openIteration(); - status = it->second.beginStep(); + status = it->second.beginStep(/* reread = */ true); break; case IterationEncoding::groupBased: case IterationEncoding::variableBased: @@ -72,7 +72,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) * access to the file until now. Better to begin a step right away, * otherwise we might get another step's data. */ - status = it->second.beginStep(); + status = it->second.beginStep(/* reread = */ true); openIteration(); break; } @@ -107,7 +107,8 @@ SeriesIterator &SeriesIterator::operator++() case IE::variableBased: { // since we are in group-based iteration layout, it does not // matter which iteration we begin a step upon - AdvanceStatus status = currentIteration.beginStep(); + AdvanceStatus status{}; + status = currentIteration.beginStep(/* reread = */ true); if (status == AdvanceStatus::OVER) { *this = end(); @@ -142,7 +143,8 @@ SeriesIterator &SeriesIterator::operator++() using IE = IterationEncoding; case IE::fileBased: { auto &iteration = series.iterations[m_currentIteration]; - AdvanceStatus status = iteration.beginStep(); + AdvanceStatus status{}; + status = iteration.beginStep(/* reread = */ true); if (status == AdvanceStatus::OVER) { *this = end(); diff --git a/src/Series.cpp b/src/Series.cpp index 68059e2af9..df86d27518 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1046,7 +1046,8 @@ void Series::readGorVBased(bool do_init) auto readSingleIteration = [&series, &pOpen, this]( uint64_t index, std::string path, - bool guardAgainstRereading) { + bool guardAgainstRereading, + bool beginStep) { if (series.iterations.contains(index)) { // maybe re-read @@ -1068,7 +1069,7 @@ void Series::readGorVBased(bool do_init) { // parse for the first time, resp. delay the parsing process Iteration &i = series.iterations[index]; - i.deferParseAccess({path, index, false, ""}); + i.deferParseAccess({path, index, false, "", beginStep}); if (!series.m_parseLazily) { i.runDeferredParseAccess(); @@ -1091,7 +1092,12 @@ void Series::readGorVBased(bool do_init) for (auto const &it : *pList.paths) { uint64_t index = std::stoull(it); - readSingleIteration(index, it, true); + /* + * For now: parse a Series in RandomAccess mode. + * (beginStep = false) + * A streaming read mode might come in a future API addition. + */ + readSingleIteration(index, it, true, false); } break; case IterationEncoding::variableBased: { @@ -1100,7 +1106,11 @@ void Series::readGorVBased(bool do_init) { index = series.iterations.getAttribute("snapshot").get(); } - readSingleIteration(index, "", false); + /* + * Variable-based iteration encoding relies on steps, so parsing must + * happen after opening the first step. + */ + readSingleIteration(index, "", false, true); break; } } @@ -1241,8 +1251,24 @@ AdvanceStatus Series::advance( itData.m_closed = internal::CloseStatus::Open; } - // @todo really collective? - flush_impl(begin, end, flushParams, /* flushIOHandler = */ false); + switch (mode) + { + case AdvanceMode::ENDSTEP: + flush_impl(begin, end, flushParams, /* flushIOHandler = */ false); + break; + case AdvanceMode::BEGINSTEP: + /* + * When beginning a step, there is nothing to flush yet. + * Data is not written in between steps. + * So only make sure that files are accessed. + */ + flush_impl( + begin, + end, + {FlushLevel::CreateOrOpenFiles}, + /* flushIOHandler = */ false); + break; + } if (oldCloseStatus == internal::CloseStatus::ClosedInFrontend) { diff --git a/src/WriteIterations.cpp b/src/WriteIterations.cpp index be2e72f47f..597298b80e 100644 --- a/src/WriteIterations.cpp +++ b/src/WriteIterations.cpp @@ -68,7 +68,7 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key) auto &res = shared->iterations[std::move(key)]; if (res.getStepStatus() == StepStatus::NoStep) { - res.beginStep(); + res.beginStep(/* reread = */ false); res.setStepStatus(StepStatus::DuringStep); } return res;