Skip to content

Commit

Permalink
Fixes for opening an iteration (#1239)
Browse files Browse the repository at this point in the history
* Test
* Don't flush when opening an iteration
* CI fixes
* FlushLevel: Use default base class in NVC++
* clang-tidy: Define member defaults of Writable in-class
  • Loading branch information
franzpoeschel authored May 4, 2022
1 parent feefb0e commit 1f467a3
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 66 deletions.
10 changes: 8 additions & 2 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class unsupported_data_error : public std::runtime_error
* @brief Determine what items should be flushed upon Series::flush()
*
*/
enum class FlushLevel : unsigned char
// do not write `enum class FlushLevel : unsigned char` here since NVHPC
// does not compile it correctly
enum class FlushLevel
{
/**
* Flush operation that was triggered by user code.
Expand All @@ -84,7 +86,11 @@ enum class FlushLevel : unsigned char
* CREATE_DATASET tasks.
* Attributes may or may not be flushed yet.
*/
SkeletonOnly
SkeletonOnly,
/**
* Only creates/opens files, nothing more
*/
CreateOrOpenFiles
};

namespace internal
Expand Down
19 changes: 11 additions & 8 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace internal
* containing this iteration.
*/
std::string filename;
bool beginStep = false;
};

class IterationData : public AttributableData
Expand Down Expand Up @@ -257,14 +258,16 @@ class Iteration : public Attributable
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
/*
* Control flow for read(), readFileBased(), readGroupBased() and
* read_impl():
* read() is called as the entry point. File-based and group-based
* Control flow for runDeferredParseAccess(), readFileBased(),
* readGroupBased() and read_impl():
* runDeferredParseAccess() is called as the entry point.
* File-based and group-based
* iteration layouts need to be parsed slightly differently:
* In file-based iteration layout, each iteration's file also contains
* attributes for the /data group. In group-based layout, those have
* already been parsed during opening of the Series.
* Hence, read() will call either readFileBased() or readGroupBased() to
* Hence, runDeferredParseAccess() will call either readFileBased() or
* readGroupBased() to
* allow for those different control flows.
* Finally, read_impl() is called which contains the common parsing
* logic for an iteration.
Expand All @@ -273,10 +276,10 @@ class Iteration : public Attributable
* Calling it on an Iteration not yet parsed is an error.
*
*/
void read();
void reread(std::string const &path);
void readFileBased(std::string filePath, std::string const &groupPath);
void readGorVBased(std::string const &groupPath);
void readFileBased(
std::string filePath, std::string const &groupPath, bool beginStep);
void readGorVBased(std::string const &groupPath, bool beginStep);
void read_impl(std::string const &groupPath);

/**
Expand All @@ -286,7 +289,7 @@ class Iteration : public Attributable
*
* @return AdvanceStatus
*/
AdvanceStatus beginStep();
AdvanceStatus beginStep(bool reread);

/**
* @brief End an IO step on the IO file (or file-like object)
Expand Down
12 changes: 6 additions & 6 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ OPENPMD_private
* These members need to be shared pointers since distinct instances of
* Writable may share them.
*/
std::shared_ptr<AbstractFilePosition> abstractFilePosition;
std::shared_ptr<AbstractIOHandler> IOHandler;
internal::AttributableData *attributable;
Writable *parent;
bool dirty;
std::shared_ptr<AbstractFilePosition> abstractFilePosition = nullptr;
std::shared_ptr<AbstractIOHandler> IOHandler = nullptr;
internal::AttributableData *attributable = nullptr;
Writable *parent = nullptr;
bool dirty = true;
/**
* If parent is not null, then this is a vector of keys such that:
* &(*parent)[key_1]...[key_n] == this
Expand All @@ -146,6 +146,6 @@ OPENPMD_private
* Writable and its meaning within the current dataset.
*
*/
bool written;
bool written = false;
};
} // namespace openPMD
1 change: 1 addition & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,7 @@ namespace detail

case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
/*
* Tasks have been given to ADIOS2, but we don't flush them
* yet. So, move everything to m_alreadyEnqueued to avoid
Expand Down
101 changes: 70 additions & 31 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,12 @@ Iteration &Iteration::open()
if (it.m_closed == CloseStatus::ParseAccessDeferred)
{
it.m_closed = CloseStatus::Open;
runDeferredParseAccess();
}
runDeferredParseAccess();
Series s = retrieveSeries();
// figure out my iteration number
auto begin = s.indexOf(*this);
s.openIteration(begin->first, *this);
// @todo, maybe collective here
IOHandler()->flush(internal::defaultFlushParams);
return *this;
}
Expand Down Expand Up @@ -238,7 +237,16 @@ void Iteration::flushFileBased(
s.openIteration(i, *this);
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flushGroupBased(
Expand All @@ -252,7 +260,16 @@ void Iteration::flushGroupBased(
IOHandler()->enqueue(IOTask(this, pCreate));
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flushVariableBased(
Expand All @@ -267,7 +284,16 @@ void Iteration::flushVariableBased(
this->setAttribute("snapshot", i);
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flush(internal::FlushParams const &flushParams)
Expand Down Expand Up @@ -327,26 +353,6 @@ void Iteration::deferParseAccess(DeferredParseAccess dr)
std::make_optional<DeferredParseAccess>(std::move(dr));
}

void Iteration::read()
{
auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path);
}
else
{
readGorVBased(deferred.path);
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
}

void Iteration::reread(std::string const &path)
{
if (get().m_deferredParseAccess.has_value())
Expand All @@ -359,8 +365,15 @@ void Iteration::reread(std::string const &path)
}

void Iteration::readFileBased(
std::string filePath, std::string const &groupPath)
std::string filePath, std::string const &groupPath, bool doBeginStep)
{
if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
auto series = retrieveSeries();

series.readOneIterationFileBased(filePath);
Expand All @@ -369,9 +382,15 @@ void Iteration::readFileBased(
read_impl(groupPath);
}

void Iteration::readGorVBased(std::string const &groupPath)
void Iteration::readGorVBased(std::string const &groupPath, bool doBeginStep)
{

if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
read_impl(groupPath);
}

Expand Down Expand Up @@ -541,7 +560,7 @@ void Iteration::read_impl(std::string const &groupPath)
readAttributes(ReadMode::FullyReread);
}

AdvanceStatus Iteration::beginStep()
AdvanceStatus Iteration::beginStep(bool reread)
{
using IE = IterationEncoding;
auto series = retrieveSeries();
Expand All @@ -566,7 +585,8 @@ AdvanceStatus Iteration::beginStep()
}

// re-read -> new datasets might be available
if ((series.iterationEncoding() == IE::groupBased ||
if (reread &&
(series.iterationEncoding() == IE::groupBased ||
series.iterationEncoding() == IE::variableBased) &&
(this->IOHandler()->m_frontendAccess == Access::READ_ONLY ||
this->IOHandler()->m_frontendAccess == Access::READ_WRITE))
Expand Down Expand Up @@ -680,18 +700,37 @@ void Iteration::runDeferredParseAccess()
{
return;
}

auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();

auto oldAccess = IOHandler()->m_frontendAccess;
auto newAccess = const_cast<Access *>(&IOHandler()->m_frontendAccess);
*newAccess = Access::READ_WRITE;
try
{
read();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path, deferred.beginStep);
}
else
{
readGorVBased(deferred.path, deferred.beginStep);
}
}
catch (...)
{
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
throw;
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
}

Expand Down
10 changes: 6 additions & 4 deletions src/ReadIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* the step after parsing the file is ok.
*/
openIteration();
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
break;
case IterationEncoding::groupBased:
case IterationEncoding::variableBased:
Expand All @@ -72,7 +72,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* access to the file until now. Better to begin a step right away,
* otherwise we might get another step's data.
*/
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
openIteration();
break;
}
Expand Down Expand Up @@ -107,7 +107,8 @@ SeriesIterator &SeriesIterator::operator++()
case IE::variableBased: {
// since we are in group-based iteration layout, it does not
// matter which iteration we begin a step upon
AdvanceStatus status = currentIteration.beginStep();
AdvanceStatus status{};
status = currentIteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down Expand Up @@ -142,7 +143,8 @@ SeriesIterator &SeriesIterator::operator++()
using IE = IterationEncoding;
case IE::fileBased: {
auto &iteration = series.iterations[m_currentIteration];
AdvanceStatus status = iteration.beginStep();
AdvanceStatus status{};
status = iteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down
Loading

0 comments on commit 1f467a3

Please sign in to comment.