diff --git a/libs/full/collectives/include/hpx/collectives/all_gather.hpp b/libs/full/collectives/include/hpx/collectives/all_gather.hpp index 0e83de83217f..d9224414bc5b 100644 --- a/libs/full/collectives/include/hpx/collectives/all_gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_gather.hpp @@ -136,12 +136,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "all_gather"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -156,7 +151,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::all_gather_tag>::id(), + communication::all_gather_tag>::name(), which, generation, // step function (invoked for each get) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp index 43bdc4ce0d01..1fd006794ffa 100644 --- a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp @@ -142,12 +142,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "all_reduce"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -162,7 +157,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::all_reduce_tag>::id(), + communication::all_reduce_tag>::name(), which, generation, // step function (invoked for each get) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp index 055f326e7be3..e4d7648542da 100644 --- a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp @@ -137,12 +137,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "all_to_all"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -157,7 +152,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::all_to_all_tag>::id(), + communication::all_to_all_tag>::name(), which, generation, // step function (invoked for each get) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/broadcast.hpp b/libs/full/collectives/include/hpx/collectives/broadcast.hpp index b70fa309ca28..bd5c12ba47f5 100644 --- a/libs/full/collectives/include/hpx/collectives/broadcast.hpp +++ b/libs/full/collectives/include/hpx/collectives/broadcast.hpp @@ -219,12 +219,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "broadcast"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -239,7 +234,7 @@ namespace hpx::traits { return communicator.template handle_data( communication::communicator_data< - communication::broadcast_tag>::id(), + communication::broadcast_tag>::name(), which, generation, // no step function nullptr, @@ -257,7 +252,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::broadcast_tag>::id(), + communication::broadcast_tag>::name(), which, generation, // step function (invoked once for set) [&t](auto& data, std::size_t) { data[0] = HPX_FORWARD(T, t); }, diff --git a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp index bfc2d895ea40..a8821eea3c6f 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp @@ -35,8 +35,6 @@ namespace hpx::traits { namespace communication { - using operation_id_type = void const*; - // Retrieve name of the current communicator template struct communicator_data @@ -45,11 +43,6 @@ namespace hpx::traits { { return ""; } - - static constexpr operation_id_type id() noexcept - { - return nullptr; - } }; } // namespace communication } // namespace hpx::traits @@ -65,7 +58,15 @@ namespace hpx::collectives::detail { public: HPX_EXPORT communicator_server() noexcept; - HPX_EXPORT explicit communicator_server(std::size_t num_sites) noexcept; + HPX_EXPORT explicit communicator_server( + std::size_t num_sites, char const* basename) noexcept; + + communicator_server(communicator_server const&) = delete; + communicator_server(communicator_server&&) = delete; + communicator_server& operator=(communicator_server const&) = delete; + communicator_server& operator=(communicator_server&&) = delete; + + HPX_EXPORT ~communicator_server(); private: template @@ -245,18 +246,39 @@ namespace hpx::collectives::detail { return fut; } + template + bool set_operation_and_check_sequencing(Lock& l, char const* operation, + std::size_t which, std::size_t generation) + { + if (current_operation_ == nullptr) + { + if (on_ready_count_ != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data", + "communicator: {}: sequencing error, on_ready callback " + "was already invoked before the start of the " + "collective operation {}, which {}, generation {}.", + basename_, operation, which, generation); + } + current_operation_ = operation; + return true; + } + return false; + } + // Step will be invoked under lock for each site that checks in (either // set or get). // // Finalizer will be invoked under lock after all sites have checked in. template - auto handle_data( - hpx::traits::communication::operation_id_type operation, - std::size_t which, std::size_t generation, - [[maybe_unused]] Step&& step, Finalizer&& finalizer, + auto handle_data(char const* operation, std::size_t which, + std::size_t generation, [[maybe_unused]] Step&& step, + Finalizer&& finalizer, std::size_t num_values = static_cast(-1)) { - auto on_ready = [this, operation, which, num_values, + auto on_ready = [this, operation, which, generation, num_values, finalizer = HPX_FORWARD(Finalizer, finalizer)]( shared_future&& f) mutable { // This callback will be invoked once for each participating @@ -278,10 +300,12 @@ namespace hpx::collectives::detail { l.unlock(); HPX_THROW_EXCEPTION(hpx::error::invalid_status, "communicator::handle_data::on_ready", - "sequencing error, operation type mismatch: invoked " - "for {}, ongoing operation {}", - operation, - current_operation_ ? current_operation_ : "unknown"); + "communicator {}: sequencing error, operation type " + "mismatch: invoked for {}, ongoing operation {}, which " + "{}, generation {}.", + basename_, operation, + current_operation_ ? current_operation_ : "unknown", + which, generation); } // Verify that the number of invocations of this callback is in @@ -291,11 +315,12 @@ namespace hpx::collectives::detail { l.unlock(); HPX_THROW_EXCEPTION(hpx::error::invalid_status, "communicator::handle_data::on_ready", - "sequencing error, an excessive number of on_ready " - "callbacks have been invoked before the end of the " - "collective {} operation. Expected count {}, received " - "count {}.", - operation, on_ready_count_, num_sites_); + "communictor {}: sequencing error, an excessive " + "number of on_ready callbacks have been invoked before " + "the end of the collective operation {}, which {}, " + "generation {}. Expected count {}, received count {}.", + basename_, operation, which, generation, + on_ready_count_, num_sites_); } // On exit, keep track of number of invocations of this @@ -314,6 +339,7 @@ namespace hpx::collectives::detail { { HPX_UNUSED(this); HPX_UNUSED(which); + HPX_UNUSED(generation); HPX_UNUSED(num_values); HPX_UNUSED(finalizer); } @@ -324,33 +350,27 @@ namespace hpx::collectives::detail { // Verify that there is no overlap between different types of // operations on the same communicator. - if (current_operation_ == nullptr) - { - if (on_ready_count_ != 0) - { - l.unlock(); - HPX_THROW_EXCEPTION(hpx::error::invalid_status, - "communicator::handle_data", - "sequencing error, on_ready callback was already " - "invoked before the start of the collective {} " - "operation", - operation); - } - current_operation_ = operation; - } - else if (current_operation_ != operation) + set_operation_and_check_sequencing(l, operation, which, generation); + + auto f = get_future_and_synchronize( + generation, num_values, HPX_MOVE(on_ready), l); + + // We may have just finished a different operation, thus we have to + // possibly reset the operation type stored in this communicator. + if (current_operation_ != operation && + !set_operation_and_check_sequencing( + l, operation, which, generation)) { l.unlock(); HPX_THROW_EXCEPTION(hpx::error::invalid_status, "communicator::handle_data", - "sequencing error, operation type mismatch: invoked for " - "{}, ongoing operation {}", - operation, current_operation_); + "communicator {}: sequencing error, operation type " + "mismatch: invoked for {}, ongoing operation {}, which {}, " + "generation {}.", + basename_, operation, current_operation_, which, + generation); } - auto f = get_future_and_synchronize( - generation, num_values, HPX_MOVE(on_ready), l); - if constexpr (!std::is_same_v>) { // call provided step function for each invocation site @@ -360,7 +380,7 @@ namespace hpx::collectives::detail { // Make sure next generation is enabled only after previous // generation has finished executing. gate_.set(which, l, - [this, operation, generation]( + [this, operation, which, generation]( auto& l, auto& gate, error_code& ec) { // This callback is invoked synchronously once for each // collective operation after all data has been received and @@ -377,8 +397,10 @@ namespace hpx::collectives::detail { "communicator::handle_data", "sequencing error, not all on_ready callbacks have " "been invoked at the end of the collective {} " - "operation. Expected count {}, received count {}.", - operation, on_ready_count_, num_sites_); + "operation. Expected count {}, received count {}, " + "which {}, generation {}.", + *operation, on_ready_count_, num_sites_, which, + generation); return; } @@ -416,10 +438,10 @@ namespace hpx::collectives::detail { hpx::lcos::local::and_gate gate_; std::size_t const num_sites_; std::size_t on_ready_count_ = 0; - hpx::traits::communication::operation_id_type current_operation_ = - nullptr; + char const* current_operation_ = nullptr; bool needs_initialization_ = true; bool data_available_ = false; + char const* basename_ = nullptr; }; } // namespace hpx::collectives::detail diff --git a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp index f1b593f2045c..929aad09f55b 100644 --- a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp @@ -154,12 +154,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "exclusive_scan"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -175,7 +170,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::exclusive_scan_tag>::id(), + communication::exclusive_scan_tag>::name(), which, generation, // step function (invoked for each get) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/gather.hpp b/libs/full/collectives/include/hpx/collectives/gather.hpp index 76a7f3acbf9f..79b4f48329c8 100644 --- a/libs/full/collectives/include/hpx/collectives/gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/gather.hpp @@ -238,12 +238,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "gather"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -256,7 +251,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::gather_tag>::id(), + communication::gather_tag>::name(), which, generation, // step function (invoked once for get) [&t](auto& data, std::size_t which) { @@ -272,7 +267,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::gather_tag>::id(), + communication::gather_tag>::name(), which, generation, // step function (invoked for each set) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp index e7387cc1299d..9e1458afbd47 100644 --- a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp @@ -142,12 +142,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "inclusive_scan"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -163,7 +158,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::inclusive_scan_tag>::id(), + communication::inclusive_scan_tag>::name(), which, generation, // step function (invoked for each get) [&t](auto& data, std::size_t which) { diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index bf1684f31361..6b1725954fab 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -239,12 +239,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "reduce"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -259,7 +254,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::reduce_tag>::id(), + communication::reduce_tag>::name(), which, generation, // step function (invoked once for get) [&t](auto& data, std::size_t which) { @@ -287,7 +282,7 @@ namespace hpx::traits { { return communicator.template handle_data>( communication::communicator_data< - communication::reduce_tag>::id(), + communication::reduce_tag>::name(), which, generation, // step function (invoked for each set) [t = HPX_FORWARD(T, t)](auto& data, std::size_t which) mutable { diff --git a/libs/full/collectives/include/hpx/collectives/scatter.hpp b/libs/full/collectives/include/hpx/collectives/scatter.hpp index fb2ed7609678..b24f2844bac9 100644 --- a/libs/full/collectives/include/hpx/collectives/scatter.hpp +++ b/libs/full/collectives/include/hpx/collectives/scatter.hpp @@ -231,12 +231,7 @@ namespace hpx::traits { template <> struct communicator_data { - static constexpr char const* name() noexcept - { - return "scatter"; - } - - HPX_EXPORT static operation_id_type id() noexcept; + HPX_EXPORT static char const* name() noexcept; }; } // namespace communication @@ -251,7 +246,7 @@ namespace hpx::traits { return communicator.template handle_data( communication::communicator_data< - communication::scatter_tag>::id(), + communication::scatter_tag>::name(), which, generation, // step function (invoked once for get) nullptr, @@ -268,7 +263,7 @@ namespace hpx::traits { { return communicator.template handle_data( communication::communicator_data< - communication::scatter_tag>::id(), + communication::scatter_tag>::name(), which, generation, // step function (invoked once for set) [&t](auto& data, std::size_t) { data = HPX_MOVE(t); }, diff --git a/libs/full/collectives/src/all_gather.cpp b/libs/full/collectives/src/all_gather.cpp index cdcf847e29a4..b84d877d7dec 100644 --- a/libs/full/collectives/src/all_gather.cpp +++ b/libs/full/collectives/src/all_gather.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "all_gather"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/all_reduce.cpp b/libs/full/collectives/src/all_reduce.cpp index 8847c05f6532..409e17a9c1a0 100644 --- a/libs/full/collectives/src/all_reduce.cpp +++ b/libs/full/collectives/src/all_reduce.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "all_reduce"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/all_to_all.cpp b/libs/full/collectives/src/all_to_all.cpp index 482b968a98e3..4037cbcfadcc 100644 --- a/libs/full/collectives/src/all_to_all.cpp +++ b/libs/full/collectives/src/all_to_all.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "all_to_all"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/broadcast.cpp b/libs/full/collectives/src/broadcast.cpp index 87ddffa8b85f..578692fe31de 100644 --- a/libs/full/collectives/src/broadcast.cpp +++ b/libs/full/collectives/src/broadcast.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static const char* name = "broadcast"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index c2f97c9cb0e5..cdf70006b2ea 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -54,13 +54,17 @@ namespace hpx::collectives { HPX_ASSERT(false); // shouldn't ever be called } - communicator_server::communicator_server(std::size_t num_sites) noexcept + communicator_server::communicator_server( + std::size_t num_sites, char const* basename) noexcept : gate_(num_sites) , num_sites_(num_sites) + , basename_(basename) { HPX_ASSERT( num_sites != 0 && num_sites != static_cast(-1)); } + + communicator_server::~communicator_server() = default; } // namespace detail /////////////////////////////////////////////////////////////////////////// @@ -120,7 +124,7 @@ namespace hpx::collectives { if (this_site == root_site) { // create a new communicator - auto c = hpx::local_new(num_sites); + auto c = hpx::local_new(num_sites, basename); // register the communicator's id using the given basename, this // keeps the communicator alive @@ -173,7 +177,7 @@ namespace hpx::collectives { if (this_site == root_site) { // create a new communicator - auto c = hpx::local_new(num_sites); + auto c = hpx::local_new(num_sites, basename); // register the communicator's id using the given basename, this // keeps the communicator alive diff --git a/libs/full/collectives/src/exclusive_scan.cpp b/libs/full/collectives/src/exclusive_scan.cpp index 8a8d43864bfc..7eeb77ecee46 100644 --- a/libs/full/collectives/src/exclusive_scan.cpp +++ b/libs/full/collectives/src/exclusive_scan.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "exclusive_scan"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/gather.cpp b/libs/full/collectives/src/gather.cpp index badfd86c0beb..9e74e2bf6c73 100644 --- a/libs/full/collectives/src/gather.cpp +++ b/libs/full/collectives/src/gather.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "gather"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/inclusive_scan.cpp b/libs/full/collectives/src/inclusive_scan.cpp index b75e77e3b9c8..77e08c04e54a 100644 --- a/libs/full/collectives/src/inclusive_scan.cpp +++ b/libs/full/collectives/src/inclusive_scan.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "inclusive_scan"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/reduce.cpp b/libs/full/collectives/src/reduce.cpp index 444ca2a2d965..23561defa064 100644 --- a/libs/full/collectives/src/reduce.cpp +++ b/libs/full/collectives/src/reduce.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "reduce"; + return name; } } // namespace hpx::traits::communication diff --git a/libs/full/collectives/src/scatter.cpp b/libs/full/collectives/src/scatter.cpp index 4c7c9c78991c..b8e61757395a 100644 --- a/libs/full/collectives/src/scatter.cpp +++ b/libs/full/collectives/src/scatter.cpp @@ -10,16 +10,14 @@ #include -#include - namespace hpx::traits::communication { // This is explicitly instantiated to ensure that the id is stable across // shared libraries. - operation_id_type communicator_data::id() noexcept + char const* communicator_data::name() noexcept { - static std::uint8_t id = 0; - return &id; + static char const* name = "scatter"; + return name; } } // namespace hpx::traits::communication