Skip to content

ADIOS2 variable shape LocalValue #1601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ Explanation of the single keys:

* ``type`` supported ADIOS operator type, e.g. zfp, sz
* ``parameters`` is an associative map of string parameters for the operator (e.g. compression levels)
* ``adios2.dataset.shape`` (advanced): Specify the `dataset shape <https://adios2.readthedocs.io/en/v2.10.0/components/components.html#shapes>`_ for the ADIOS2 variable.
Note that variable shapes will generally imply a different way of interacting with a variable, and some variable shapes (such as *joined arrays*) may not be accessible via this parameter, but via different API calls instead.
This parameter's purpose to select different implementations for the same used API call.
Supported values by this parameter are:

* ``"global_array"`` (default): The variable is a (n-dimensional) array with a globally defined size. Local blocks, subsets of the global region, are written by parallel writers.
* ``"local_value"``: Each parallel writer contributes one single value to the dataset, joined into a 1-dimensional array.
Since there (currently) exists no dedicated API call for this shape in the openPMD-api, this setting is only useful as an optimization since "local value" variables participate in ADIOS2 metadata aggregation.
Can only be applied if the global shape (1-dimensional array the same length as number of parallel instances) and the local blocks (a single data item) are specified correctly.
Use global or joined arrays otherwise.

* ``adios2.use_span_based_put``: The openPMD-api exposes the `span-based Put() API <https://adios2.readthedocs.io/en/latest/components/components.html#put-modes-and-memory-contracts>`_ of ADIOS2 via an overload of ``RecordComponent::storeChunk()``.
This API is incompatible with compression operators as described above.
The openPMD-api will automatically use a fallback implementation for the span-based Put() API if any operator is added to a dataset.
Expand Down
20 changes: 20 additions & 0 deletions examples/10_streaming_read.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <openPMD/auxiliary/StringManip.hpp>
#include <openPMD/openPMD.hpp>

#include <algorithm>
Expand Down Expand Up @@ -55,6 +56,25 @@ int main()
extents[i] = rc.getExtent();
}

auto e_patches = iteration.particles["e"].particlePatches;
for (auto key :
{"numParticles", "numParticlesOffset", "offset", "extent"})
{
for (auto &rc : e_patches[key])
{
std::cout << "Chunks for '" << rc.second.myPath().openPMDPath()
<< "':";
for (auto const &chunk : rc.second.availableChunks())
{
std::cout << "\n\tRank " << chunk.sourceID << "\t"
<< auxiliary::vec_as_string(chunk.offset)
<< "\t– "
<< auxiliary::vec_as_string(chunk.extent);
}
std::cout << std::endl;
}
}

// The iteration can be closed in order to help free up resources.
// The iteration's content will be flushed automatically.
// An iteration once closed cannot (yet) be reopened.
Expand Down
90 changes: 83 additions & 7 deletions examples/10_streaming_write.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "openPMD/Series.hpp"
#include "openPMD/snapshots/Snapshots.hpp"
#include <openPMD/openPMD.hpp>

#include <algorithm>
#include <iostream>
#include <memory>
#include <numeric> // std::iota

#if openPMD_HAVE_MPI
#include <mpi.h>
#endif

using std::cout;
using namespace openPMD;

Expand All @@ -21,8 +23,22 @@ int main()
return 0;
}

int mpi_rank{0}, mpi_size{1};

#if openPMD_HAVE_MPI
MPI_Init(nullptr, nullptr);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
#endif

// open file for writing
Series series = Series("electrons.sst", Access::CREATE, R"(
Series series = Series(
"electrons.sst",
Access::CREATE,
#if openPMD_HAVE_MPI
MPI_COMM_WORLD,
#endif
R"(
{
"adios2": {
"engine": {
Expand All @@ -31,11 +47,13 @@ int main()
}
}
}
})");
})"

);

Datatype datatype = determineDatatype<position_t>();
constexpr unsigned long length = 10ul;
Extent global_extent = {length};
Extent global_extent = {mpi_size * length};
Dataset dataset = Dataset(datatype, global_extent);
std::shared_ptr<position_t> local_data(
new position_t[length], [](position_t const *ptr) { delete[] ptr; });
Expand All @@ -51,13 +69,67 @@ int main()
Iteration iteration = iterations[i];
Record electronPositions = iteration.particles["e"]["position"];

std::iota(local_data.get(), local_data.get() + length, i * length);
std::iota(
local_data.get(),
local_data.get() + length,
i * length * mpi_size + mpi_rank * length);
for (auto const &dim : {"x", "y", "z"})
{
RecordComponent pos = electronPositions[dim];
pos.resetDataset(dataset);
pos.storeChunk(local_data, Offset{0}, global_extent);
pos.storeChunk(local_data, Offset{length * mpi_rank}, {length});
}

// Use the `local_value` ADIOS2 dataset shape to send a dataset not via
// the data plane, but the control plane of ADIOS2 SST. This is
// advisable for datasets where each rank contributes only a single item
// since the control plane performs data aggregation, thus avoiding
// fully interconnected communication meshes for data that needs to be
// read by each reader. A local value dataset can only contain a single
// item per MPI rank, forming an array of length equal to the MPI size.
// https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes

auto e_patches = iteration.particles["e"].particlePatches;
auto numParticles = e_patches["numParticles"];
auto numParticlesOffset = e_patches["numParticlesOffset"];
for (auto rc : {&numParticles, &numParticlesOffset})
{
rc->resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
}
numParticles.storeChunk(
std::make_unique<unsigned long>(10), {size_t(mpi_rank)}, {1});
numParticlesOffset.storeChunk(
std::make_unique<unsigned long>(10 * ((unsigned long)mpi_rank)),
{size_t(mpi_rank)},
{1});
auto offset = e_patches["offset"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = offset[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>((unsigned long)mpi_rank),
{size_t(mpi_rank)},
{1});
}
auto extent = e_patches["extent"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = extent[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>(1), {size_t(mpi_rank)}, {1});
}

iteration.close();
}

Expand All @@ -69,6 +141,10 @@ int main()
*/
series.close();

#if openPMD_HAVE_MPI
MPI_Finalize();
#endif

return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
Expand Down
24 changes: 20 additions & 4 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/backend/Writable.hpp"
#include "openPMD/config.hpp"
#include <stdexcept>

#if openPMD_HAVE_ADIOS2
#include <adios2.h>
Expand Down Expand Up @@ -304,9 +303,12 @@ class ADIOS2IOHandlerImpl
adios2::Params params;
};

std::vector<ParameterizedOperator> defaultOperators;
// read operators can (currently) not be specified per dataset, so parse
// them once and then buffer them
std::vector<ParameterizedOperator> readOperators;

json::TracingJSON m_config;
std::optional<nlohmann::json> m_buffered_dataset_config;
static json::TracingJSON nullvalue;

template <typename Callback>
Expand Down Expand Up @@ -345,9 +347,19 @@ class ADIOS2IOHandlerImpl
// use m_config
std::optional<std::vector<ParameterizedOperator>> getOperators();

enum class Shape
{
GlobalArray,
LocalValue
};

template <typename Parameter>
std::vector<ParameterizedOperator> getDatasetOperators(
Parameter const &, Writable *, std::string const &varName);
auto parseDatasetConfig(
Parameter const &,
Writable *,
std::string const &varName,
std::vector<ParameterizedOperator> default_operators = {})
-> std::tuple<std::vector<ParameterizedOperator>, Shape>;

std::string fileSuffix(bool verbose = true) const;

Expand Down Expand Up @@ -546,6 +558,10 @@ class ADIOS2IOHandlerImpl
}
// TODO leave this check to ADIOS?
adios2::Dims shape = var.Shape();
if (shape == adios2::Dims{adios2::LocalValueDim})
{
return var;
}
auto actualDim = shape.size();
{
auto requiredDim = extent.size();
Expand Down
15 changes: 15 additions & 0 deletions include/openPMD/auxiliary/StringManip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ namespace auxiliary
return std::forward<S>(s);
}

/** Write a string representation of a vector or another iterable
* container to a stream.
*
* @param s The stream to write to.
* @param vec The vector or other iterable container.
* @return The modified stream. Each item is
* formatted using the default definition for operator<<().
*/
template <typename Stream, typename Vec>
auto write_vec_to_stream(Stream &&s, Vec const &vec) -> Stream &&
{
Expand All @@ -265,6 +273,13 @@ namespace auxiliary
return std::forward<Stream>(s);
}

/** Create a string representation of a vector or another iterable
* container.
*
* @param vec The vector or other iterable container.
* @return A string that shows the items of the container. Each item is
* formatted using the default definition for operator<<().
*/
template <typename Vec>
auto vec_as_string(Vec const &vec) -> std::string
{
Expand Down
39 changes: 37 additions & 2 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,25 @@ void WriteDataset::call(ADIOS2File &ba, detail::BufferedPut &bp)
std::nullopt,
ba.variables());

engine.Put(var, ptr);
// https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes
if (var.Shape() == adios2::Dims{adios2::LocalValueDim})
{
if (bp.param.extent != Extent{1})
{
throw error::OperationUnsupportedInBackend(
"ADIOS2",
"Can only write a single element to LocalValue "
"variables (extent == Extent{1}, but extent of '" +
bp.name + " was " +
auxiliary::vec_as_string(bp.param.extent) +
"').");
}
engine.Put(var, *ptr);
}
else
{
engine.Put(var, ptr);
}
}
else if constexpr (std::is_same_v<
ptr_type,
Expand Down Expand Up @@ -180,7 +198,24 @@ struct RunUniquePtrPut
bufferedPut.name,
std::nullopt,
ba.variables());
engine.Put(var, ptr);
// https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes
if (var.Shape() == adios2::Dims{adios2::LocalValueDim})
{
if (bufferedPut.extent != Extent{1})
{
throw error::OperationUnsupportedInBackend(
"ADIOS2",
"Can only write a single element to LocalValue "
"variables (extent == Extent{1}, but extent of '" +
bufferedPut.name + " was " +
auxiliary::vec_as_string(bufferedPut.extent) + "').");
}
engine.Put(var, *ptr);
}
else
{
engine.Put(var, ptr);
}
}

static constexpr char const *errorMsg = "RunUniquePtrPut";
Expand Down
Loading
Loading