Skip to content

Commit

Permalink
Don't flush when opening an iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Apr 1, 2022
1 parent 1b83ca0 commit fb4dc64
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 51 deletions.
6 changes: 5 additions & 1 deletion include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ enum class FlushLevel : unsigned char
* CREATE_DATASET tasks.
* Attributes may or may not be flushed yet.
*/
SkeletonOnly
SkeletonOnly,
/**
* Only creates/opens files, nothing more
*/
CreateOrOpenFiles
};

namespace internal
Expand Down
19 changes: 11 additions & 8 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace internal
* containing this iteration.
*/
std::string filename;
bool beginStep = false;
};

class IterationData : public AttributableData
Expand Down Expand Up @@ -249,14 +250,16 @@ class Iteration : public Attributable
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
/*
* Control flow for read(), readFileBased(), readGroupBased() and
* read_impl():
* read() is called as the entry point. File-based and group-based
* Control flow for runDeferredParseAccess(), readFileBased(),
* readGroupBased() and read_impl():
* runDeferredParseAccess() is called as the entry point.
* File-based and group-based
* iteration layouts need to be parsed slightly differently:
* In file-based iteration layout, each iteration's file also contains
* attributes for the /data group. In group-based layout, those have
* already been parsed during opening of the Series.
* Hence, read() will call either readFileBased() or readGroupBased() to
* Hence, runDeferredParseAccess() will call either readFileBased() or
* readGroupBased() to
* allow for those different control flows.
* Finally, read_impl() is called which contains the common parsing
* logic for an iteration.
Expand All @@ -265,10 +268,10 @@ class Iteration : public Attributable
* Calling it on an Iteration not yet parsed is an error.
*
*/
void read();
void reread(std::string const &path);
void readFileBased(std::string filePath, std::string const &groupPath);
void readGorVBased(std::string const &groupPath);
void readFileBased(
std::string filePath, std::string const &groupPath, bool beginStep);
void readGorVBased(std::string const &groupPath, bool beginStep);
void read_impl(std::string const &groupPath);

/**
Expand All @@ -278,7 +281,7 @@ class Iteration : public Attributable
*
* @return AdvanceStatus
*/
AdvanceStatus beginStep();
AdvanceStatus beginStep(bool reread);

/**
* @brief End an IO step on the IO file (or file-like object)
Expand Down
1 change: 1 addition & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2614,6 +2614,7 @@ namespace detail

case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
/*
* Tasks have been given to ADIOS2, but we don't flush them
* yet. So, move everything to m_alreadyEnqueued to avoid
Expand Down
83 changes: 52 additions & 31 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,12 @@ Iteration &Iteration::open()
if (it.m_closed == CloseStatus::ParseAccessDeferred)
{
it.m_closed = CloseStatus::Open;
runDeferredParseAccess();
}
runDeferredParseAccess();
Series s = retrieveSeries();
// figure out my iteration number
auto begin = s.indexOf(*this);
s.openIteration(begin->first, *this);
// @todo, maybe collective here
IOHandler()->flush(internal::defaultFlushParams);
return *this;
}
Expand Down Expand Up @@ -238,7 +237,10 @@ void Iteration::flushFileBased(
s.openIteration(i, *this);
}

flush(flushParams);
if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles)
{
flush(flushParams);
}
}

void Iteration::flushGroupBased(
Expand All @@ -252,7 +254,10 @@ void Iteration::flushGroupBased(
IOHandler()->enqueue(IOTask(this, pCreate));
}

flush(flushParams);
if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles)
{
flush(flushParams);
}
}

void Iteration::flushVariableBased(
Expand All @@ -267,7 +272,10 @@ void Iteration::flushVariableBased(
this->setAttribute("snapshot", i);
}

flush(flushParams);
if (flushParams.flushLevel != FlushLevel::CreateOrOpenFiles)
{
flush(flushParams);
}
}

void Iteration::flush(internal::FlushParams const &flushParams)
Expand Down Expand Up @@ -327,26 +335,6 @@ void Iteration::deferParseAccess(DeferredParseAccess dr)
std::make_optional<DeferredParseAccess>(std::move(dr));
}

void Iteration::read()
{
auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path);
}
else
{
readGorVBased(deferred.path);
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
}

void Iteration::reread(std::string const &path)
{
if (get().m_deferredParseAccess.has_value())
Expand All @@ -359,18 +347,31 @@ void Iteration::reread(std::string const &path)
}

void Iteration::readFileBased(
std::string filePath, std::string const &groupPath)
std::string filePath, std::string const &groupPath, bool doBeginStep)
{
if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
auto series = retrieveSeries();

series.readOneIterationFileBased(filePath);

read_impl(groupPath);
}

void Iteration::readGorVBased(std::string const &groupPath)
void Iteration::readGorVBased(std::string const &groupPath, bool doBeginStep)
{

if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
read_impl(groupPath);
}

Expand Down Expand Up @@ -540,7 +541,7 @@ void Iteration::read_impl(std::string const &groupPath)
readAttributes(ReadMode::FullyReread);
}

AdvanceStatus Iteration::beginStep()
AdvanceStatus Iteration::beginStep(bool reread)
{
using IE = IterationEncoding;
auto series = retrieveSeries();
Expand All @@ -565,7 +566,8 @@ AdvanceStatus Iteration::beginStep()
}

// re-read -> new datasets might be available
if ((series.iterationEncoding() == IE::groupBased ||
if (reread &&
(series.iterationEncoding() == IE::groupBased ||
series.iterationEncoding() == IE::variableBased) &&
(this->IOHandler()->m_frontendAccess == Access::READ_ONLY ||
this->IOHandler()->m_frontendAccess == Access::READ_WRITE))
Expand Down Expand Up @@ -679,18 +681,37 @@ void Iteration::runDeferredParseAccess()
{
return;
}

auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();

auto oldAccess = IOHandler()->m_frontendAccess;
auto newAccess = const_cast<Access *>(&IOHandler()->m_frontendAccess);
*newAccess = Access::READ_WRITE;
try
{
read();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path, deferred.beginStep);
}
else
{
readGorVBased(deferred.path, deferred.beginStep);
}
}
catch (...)
{
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
throw;
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
}

Expand Down
10 changes: 6 additions & 4 deletions src/ReadIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* the step after parsing the file is ok.
*/
openIteration();
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
break;
case IterationEncoding::groupBased:
case IterationEncoding::variableBased:
Expand All @@ -72,7 +72,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* access to the file until now. Better to begin a step right away,
* otherwise we might get another step's data.
*/
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
openIteration();
break;
}
Expand Down Expand Up @@ -107,7 +107,8 @@ SeriesIterator &SeriesIterator::operator++()
case IE::variableBased: {
// since we are in group-based iteration layout, it does not
// matter which iteration we begin a step upon
AdvanceStatus status = currentIteration.beginStep();
AdvanceStatus status{};
status = currentIteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down Expand Up @@ -142,7 +143,8 @@ SeriesIterator &SeriesIterator::operator++()
using IE = IterationEncoding;
case IE::fileBased: {
auto &iteration = series.iterations[m_currentIteration];
AdvanceStatus status = iteration.beginStep();
AdvanceStatus status{};
status = iteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down
38 changes: 32 additions & 6 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,8 @@ void Series::readGorVBased(bool do_init)
auto readSingleIteration = [&series, &pOpen, this](
uint64_t index,
std::string path,
bool guardAgainstRereading) {
bool guardAgainstRereading,
bool beginStep) {
if (series.iterations.contains(index))
{
// maybe re-read
Expand All @@ -1068,7 +1069,7 @@ void Series::readGorVBased(bool do_init)
{
// parse for the first time, resp. delay the parsing process
Iteration &i = series.iterations[index];
i.deferParseAccess({path, index, false, ""});
i.deferParseAccess({path, index, false, "", beginStep});
if (!series.m_parseLazily)
{
i.runDeferredParseAccess();
Expand All @@ -1091,7 +1092,12 @@ void Series::readGorVBased(bool do_init)
for (auto const &it : *pList.paths)
{
uint64_t index = std::stoull(it);
readSingleIteration(index, it, true);
/*
* For now: parse a Series in RandomAccess mode.
* (beginStep = false)
* A streaming read mode might come in a future API addition.
*/
readSingleIteration(index, it, true, false);
}
break;
case IterationEncoding::variableBased: {
Expand All @@ -1100,7 +1106,11 @@ void Series::readGorVBased(bool do_init)
{
index = series.iterations.getAttribute("snapshot").get<uint64_t>();
}
readSingleIteration(index, "", false);
/*
* Variable-based iteration encoding relies on steps, so parsing must
* happen after opening the first step.
*/
readSingleIteration(index, "", false, true);
break;
}
}
Expand Down Expand Up @@ -1241,8 +1251,24 @@ AdvanceStatus Series::advance(
itData.m_closed = internal::CloseStatus::Open;
}

// @todo really collective?
flush_impl(begin, end, flushParams, /* flushIOHandler = */ false);
switch (mode)
{
case AdvanceMode::ENDSTEP:
flush_impl(begin, end, flushParams, /* flushIOHandler = */ false);
break;
case AdvanceMode::BEGINSTEP:
/*
* When beginning a step, there is nothing to flush yet.
* Data is not written in between steps.
* So only make sure that files are accessed.
*/
flush_impl(
begin,
end,
{FlushLevel::CreateOrOpenFiles},
/* flushIOHandler = */ false);
break;
}

if (oldCloseStatus == internal::CloseStatus::ClosedInFrontend)
{
Expand Down
2 changes: 1 addition & 1 deletion src/WriteIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key)
auto &res = shared->iterations[std::move(key)];
if (res.getStepStatus() == StepStatus::NoStep)
{
res.beginStep();
res.beginStep(/* reread = */ false);
res.setStepStatus(StepStatus::DuringStep);
}
return res;
Expand Down

0 comments on commit fb4dc64

Please sign in to comment.