Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for opening an iteration #1239

Merged
merged 6 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class unsupported_data_error : public std::runtime_error
* @brief Determine what items should be flushed upon Series::flush()
*
*/
enum class FlushLevel : unsigned char
// do not write `enum class FlushLevel : unsigned char` here since NVHPC
// does not compile it correctly
enum class FlushLevel
{
/**
* Flush operation that was triggered by user code.
Expand All @@ -84,7 +86,11 @@ enum class FlushLevel : unsigned char
* CREATE_DATASET tasks.
* Attributes may or may not be flushed yet.
*/
SkeletonOnly
SkeletonOnly,
/**
* Only creates/opens files, nothing more
*/
CreateOrOpenFiles
};

namespace internal
Expand Down
19 changes: 11 additions & 8 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace internal
* containing this iteration.
*/
std::string filename;
bool beginStep = false;
};

class IterationData : public AttributableData
Expand Down Expand Up @@ -249,14 +250,16 @@ class Iteration : public Attributable
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
/*
* Control flow for read(), readFileBased(), readGroupBased() and
* read_impl():
* read() is called as the entry point. File-based and group-based
* Control flow for runDeferredParseAccess(), readFileBased(),
* readGroupBased() and read_impl():
* runDeferredParseAccess() is called as the entry point.
* File-based and group-based
* iteration layouts need to be parsed slightly differently:
* In file-based iteration layout, each iteration's file also contains
* attributes for the /data group. In group-based layout, those have
* already been parsed during opening of the Series.
* Hence, read() will call either readFileBased() or readGroupBased() to
* Hence, runDeferredParseAccess() will call either readFileBased() or
* readGroupBased() to
* allow for those different control flows.
* Finally, read_impl() is called which contains the common parsing
* logic for an iteration.
Expand All @@ -265,10 +268,10 @@ class Iteration : public Attributable
* Calling it on an Iteration not yet parsed is an error.
*
*/
void read();
void reread(std::string const &path);
void readFileBased(std::string filePath, std::string const &groupPath);
void readGorVBased(std::string const &groupPath);
void readFileBased(
std::string filePath, std::string const &groupPath, bool beginStep);
void readGorVBased(std::string const &groupPath, bool beginStep);
void read_impl(std::string const &groupPath);

/**
Expand All @@ -278,7 +281,7 @@ class Iteration : public Attributable
*
* @return AdvanceStatus
*/
AdvanceStatus beginStep();
AdvanceStatus beginStep(bool reread);

/**
* @brief End an IO step on the IO file (or file-like object)
Expand Down
12 changes: 6 additions & 6 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ OPENPMD_private
* These members need to be shared pointers since distinct instances of
* Writable may share them.
*/
std::shared_ptr<AbstractFilePosition> abstractFilePosition;
std::shared_ptr<AbstractIOHandler> IOHandler;
internal::AttributableData *attributable;
Writable *parent;
bool dirty;
std::shared_ptr<AbstractFilePosition> abstractFilePosition = nullptr;
std::shared_ptr<AbstractIOHandler> IOHandler = nullptr;
internal::AttributableData *attributable = nullptr;
Writable *parent = nullptr;
bool dirty = true;
/**
* If parent is not null, then this is a vector of keys such that:
* &(*parent)[key_1]...[key_n] == this
Expand All @@ -146,6 +146,6 @@ OPENPMD_private
* Writable and its meaning within the current dataset.
*
*/
bool written;
bool written = false;
};
} // namespace openPMD
1 change: 1 addition & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,7 @@ namespace detail

case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
/*
* Tasks have been given to ADIOS2, but we don't flush them
* yet. So, move everything to m_alreadyEnqueued to avoid
Expand Down
101 changes: 70 additions & 31 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,12 @@ Iteration &Iteration::open()
if (it.m_closed == CloseStatus::ParseAccessDeferred)
{
it.m_closed = CloseStatus::Open;
runDeferredParseAccess();
}
runDeferredParseAccess();
Series s = retrieveSeries();
// figure out my iteration number
auto begin = s.indexOf(*this);
s.openIteration(begin->first, *this);
// @todo, maybe collective here
IOHandler()->flush(internal::defaultFlushParams);
return *this;
}
Expand Down Expand Up @@ -238,7 +237,16 @@ void Iteration::flushFileBased(
s.openIteration(i, *this);
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flushGroupBased(
Expand All @@ -252,7 +260,16 @@ void Iteration::flushGroupBased(
IOHandler()->enqueue(IOTask(this, pCreate));
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flushVariableBased(
Expand All @@ -267,7 +284,16 @@ void Iteration::flushVariableBased(
this->setAttribute("snapshot", i);
}

flush(flushParams);
switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flush(internal::FlushParams const &flushParams)
Expand Down Expand Up @@ -327,26 +353,6 @@ void Iteration::deferParseAccess(DeferredParseAccess dr)
std::make_optional<DeferredParseAccess>(std::move(dr));
}

void Iteration::read()
{
auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path);
}
else
{
readGorVBased(deferred.path);
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
}

void Iteration::reread(std::string const &path)
{
if (get().m_deferredParseAccess.has_value())
Expand All @@ -359,18 +365,31 @@ void Iteration::reread(std::string const &path)
}

void Iteration::readFileBased(
std::string filePath, std::string const &groupPath)
std::string filePath, std::string const &groupPath, bool doBeginStep)
{
if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
auto series = retrieveSeries();

series.readOneIterationFileBased(filePath);

read_impl(groupPath);
}

void Iteration::readGorVBased(std::string const &groupPath)
void Iteration::readGorVBased(std::string const &groupPath, bool doBeginStep)
{

if (doBeginStep)
{
/*
* beginStep() must take care to open files
*/
beginStep(/* reread = */ false);
}
read_impl(groupPath);
}

Expand Down Expand Up @@ -540,7 +559,7 @@ void Iteration::read_impl(std::string const &groupPath)
readAttributes(ReadMode::FullyReread);
}

AdvanceStatus Iteration::beginStep()
AdvanceStatus Iteration::beginStep(bool reread)
{
using IE = IterationEncoding;
auto series = retrieveSeries();
Expand All @@ -565,7 +584,8 @@ AdvanceStatus Iteration::beginStep()
}

// re-read -> new datasets might be available
if ((series.iterationEncoding() == IE::groupBased ||
if (reread &&
(series.iterationEncoding() == IE::groupBased ||
series.iterationEncoding() == IE::variableBased) &&
(this->IOHandler()->m_frontendAccess == Access::READ_ONLY ||
this->IOHandler()->m_frontendAccess == Access::READ_WRITE))
Expand Down Expand Up @@ -679,18 +699,37 @@ void Iteration::runDeferredParseAccess()
{
return;
}

auto &it = get();
if (!it.m_deferredParseAccess.has_value())
{
return;
}
auto const &deferred = it.m_deferredParseAccess.value();

auto oldAccess = IOHandler()->m_frontendAccess;
auto newAccess = const_cast<Access *>(&IOHandler()->m_frontendAccess);
*newAccess = Access::READ_WRITE;
try
{
read();
if (deferred.fileBased)
{
readFileBased(deferred.filename, deferred.path, deferred.beginStep);
}
else
{
readGorVBased(deferred.path, deferred.beginStep);
}
}
catch (...)
{
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
throw;
}
// reset this thing
it.m_deferredParseAccess = std::optional<DeferredParseAccess>();
*newAccess = oldAccess;
}

Expand Down
10 changes: 6 additions & 4 deletions src/ReadIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* the step after parsing the file is ok.
*/
openIteration();
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
break;
case IterationEncoding::groupBased:
case IterationEncoding::variableBased:
Expand All @@ -72,7 +72,7 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series))
* access to the file until now. Better to begin a step right away,
* otherwise we might get another step's data.
*/
status = it->second.beginStep();
status = it->second.beginStep(/* reread = */ true);
openIteration();
break;
}
Expand Down Expand Up @@ -107,7 +107,8 @@ SeriesIterator &SeriesIterator::operator++()
case IE::variableBased: {
// since we are in group-based iteration layout, it does not
// matter which iteration we begin a step upon
AdvanceStatus status = currentIteration.beginStep();
AdvanceStatus status{};
status = currentIteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down Expand Up @@ -142,7 +143,8 @@ SeriesIterator &SeriesIterator::operator++()
using IE = IterationEncoding;
case IE::fileBased: {
auto &iteration = series.iterations[m_currentIteration];
AdvanceStatus status = iteration.beginStep();
AdvanceStatus status{};
status = iteration.beginStep(/* reread = */ true);
if (status == AdvanceStatus::OVER)
{
*this = end();
Expand Down
Loading