Skip to content

Commit

Permalink
Using unique ids to identify collective operations
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jan 13, 2024
1 parent f53dec1 commit c2e4cb0
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 60 deletions.
11 changes: 10 additions & 1 deletion libs/full/collectives/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,23 @@ set(collectives_compat_headers

# Default location is $HPX_ROOT/libs/collectives/src
set(collectives_sources
all_gather.cpp
all_reduce.cpp
all_to_all.cpp
barrier.cpp
broadcast.cpp
create_communication_set.cpp
channel_communicator.cpp
create_communicator.cpp
latch.cpp
detail/barrier_node.cpp
detail/channel_communicator_server.cpp
detail/communication_set_node.cpp
exclusive_scan.cpp
gather.cpp
inclusive_scan.cpp
latch.cpp
reduce.cpp
scatter.cpp
)

include(HPX_AddModule)
Expand Down
16 changes: 11 additions & 5 deletions libs/full/collectives/include/hpx/collectives/all_gather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,19 @@ namespace hpx { namespace collectives {
namespace hpx::traits {

namespace communication {

struct all_gather_tag;

template <>
constexpr char const* communicator_name<all_gather_tag>() noexcept
struct communicator_data<all_gather_tag>
{
return "all_gather";
}
static constexpr char const* name() noexcept
{
return "all_gather";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -149,8 +155,8 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::all_gather_tag>(),
communication::communicator_data<
communication::all_gather_tag>::id(),
which, generation,
// step function (invoked for each get)
[&t](auto& data, std::size_t which) {
Expand Down
15 changes: 10 additions & 5 deletions libs/full/collectives/include/hpx/collectives/all_reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,15 @@ namespace hpx::traits {
struct all_reduce_tag;

template <>
constexpr char const* communicator_name<all_reduce_tag>() noexcept
struct communicator_data<all_reduce_tag>
{
return "all_reduce";
}
static constexpr char const* name() noexcept
{
return "all_reduce";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -156,8 +161,8 @@ namespace hpx::traits {
std::size_t generation, T&& t, F&& op)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::all_reduce_tag>(),
communication::communicator_data<
communication::all_reduce_tag>::id(),
which, generation,
// step function (invoked for each get)
[&t](auto& data, std::size_t which) {
Expand Down
15 changes: 10 additions & 5 deletions libs/full/collectives/include/hpx/collectives/all_to_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ namespace hpx::traits {
struct all_to_all_tag;

template <>
constexpr char const* communicator_name<all_to_all_tag>() noexcept
struct communicator_data<all_to_all_tag>
{
return "all_to_all";
}
static constexpr char const* name() noexcept
{
return "all_to_all";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -151,8 +156,8 @@ namespace hpx::traits {
std::size_t generation, std::vector<T>&& t)
{
return communicator.template handle_data<std::vector<T>>(
communication::communicator_name<
communication::all_to_all_tag>(),
communication::communicator_data<
communication::all_to_all_tag>::id(),
which, generation,
// step function (invoked for each get)
[&t](auto& data, std::size_t which) {
Expand Down
19 changes: 12 additions & 7 deletions libs/full/collectives/include/hpx/collectives/broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,15 @@ namespace hpx::traits {
struct broadcast_tag;

template <>
constexpr char const* communicator_name<broadcast_tag>() noexcept
struct communicator_data<broadcast_tag>
{
return "broadcast";
}
static constexpr char const* name() noexcept
{
return "broadcast";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

template <typename Communicator>
Expand All @@ -233,8 +238,8 @@ namespace hpx::traits {
using data_type = typename Result::result_type;

return communicator.template handle_data<data_type>(
communication::communicator_name<
communication::broadcast_tag>(),
communication::communicator_data<
communication::broadcast_tag>::id(),
which, generation,
// no step function
nullptr,
Expand All @@ -251,8 +256,8 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::broadcast_tag>(),
communication::communicator_data<
communication::broadcast_tag>::id(),
which, generation,
// step function (invoked once for set)
[&t](auto& data, std::size_t) { data[0] = HPX_FORWARD(T, t); },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <hpx/type_support/unused.hpp>

#include <cstddef>
#include <cstring>
#include <mutex>
#include <type_traits>
#include <utility>
Expand All @@ -35,12 +34,22 @@ namespace hpx::traits {

namespace communication {

using operation_id_type = void const*;

// Retrieve name of the current communicator
template <typename Operation>
constexpr char const* communicator_name() noexcept
struct communicator_data
{
return "<unknown>";
}
static constexpr char const* name() noexcept
{
return "<unknown>";
}

static constexpr operation_id_type id() noexcept
{
return nullptr;
}
};
} // namespace communication
} // namespace hpx::traits

Expand Down Expand Up @@ -70,15 +79,17 @@ namespace hpx::collectives::detail {
{
LHPX_(info, " [COL] ")
.format("{}(>>> {}): which({}), generation({})", op,
traits::communication::communicator_name<Operation>(),
traits::communication::communicator_data<
Operation>::name(),
which, generation);
}

~logging_helper()
{
LHPX_(info, " [COL] ")
.format("{}(<<< {}): which({}), generation({})", op_,
traits::communication::communicator_name<Operation>(),
traits::communication::communicator_data<
Operation>::name(),
which_, generation_);
}

Expand Down Expand Up @@ -259,9 +270,10 @@ namespace hpx::collectives::detail {
//
// Finalizer will be invoked under lock after all sites have checked in.
template <typename Data, typename Step, typename Finalizer>
auto handle_data(char const* operation, std::size_t which,
std::size_t generation, [[maybe_unused]] Step&& step,
Finalizer&& finalizer,
auto handle_data(
hpx::traits::communication::operation_id_type operation,
std::size_t which, std::size_t generation,
[[maybe_unused]] Step&& step, Finalizer&& finalizer,
std::size_t num_values = static_cast<std::size_t>(-1))
{
auto on_ready = [this, operation, which, num_values,
Expand All @@ -281,7 +293,7 @@ namespace hpx::collectives::detail {
// Verify that there is no overlap between different types of
// operations on the same communicator.
if (current_operation_ == nullptr ||
std::strcmp(current_operation_, operation) != 0)
current_operation_ != operation)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::error::invalid_status,
Expand Down Expand Up @@ -345,7 +357,7 @@ namespace hpx::collectives::detail {
}
current_operation_ = operation;
}
else if (std::strcmp(current_operation_, operation) != 0)
else if (current_operation_ != operation)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::error::invalid_status,
Expand Down Expand Up @@ -423,7 +435,8 @@ namespace hpx::collectives::detail {
hpx::lcos::local::and_gate gate_;
std::size_t const num_sites_;
std::size_t on_ready_count_ = 0;
char const* current_operation_ = nullptr;
hpx::traits::communication::operation_id_type current_operation_ =
nullptr;
bool needs_initialization_ = true;
bool data_available_ = false;
};
Expand Down
15 changes: 10 additions & 5 deletions libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,15 @@ namespace hpx::traits {
struct exclusive_scan_tag;

template <>
constexpr char const* communicator_name<exclusive_scan_tag>() noexcept
struct communicator_data<exclusive_scan_tag>
{
return "exclusive_scan";
}
static constexpr char const* name() noexcept
{
return "exclusive_scan";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -169,8 +174,8 @@ namespace hpx::traits {
std::size_t generation, T&& t, F&& op)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::exclusive_scan_tag>(),
communication::communicator_data<
communication::exclusive_scan_tag>::id(),
which, generation,
// step function (invoked for each get)
[&t](auto& data, std::size_t which) {
Expand Down
17 changes: 12 additions & 5 deletions libs/full/collectives/include/hpx/collectives/gather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,15 @@ namespace hpx::traits {
struct gather_tag;

template <>
constexpr char const* communicator_name<gather_tag>() noexcept
struct communicator_data<gather_tag>
{
return "gather";
}
static constexpr char const* name() noexcept
{
return "gather";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

template <typename Communicator>
Expand All @@ -250,7 +255,8 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<communication::gather_tag>(),
communication::communicator_data<
communication::gather_tag>::id(),
which, generation,
// step function (invoked once for get)
[&t](auto& data, std::size_t which) {
Expand All @@ -265,7 +271,8 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<communication::gather_tag>(),
communication::communicator_data<
communication::gather_tag>::id(),
which, generation,
// step function (invoked for each set)
[&t](auto& data, std::size_t which) {
Expand Down
15 changes: 10 additions & 5 deletions libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,15 @@ namespace hpx::traits {
struct inclusive_scan_tag;

template <>
constexpr char const* communicator_name<inclusive_scan_tag>() noexcept
struct communicator_data<inclusive_scan_tag>
{
return "inclusive_scan";
}
static constexpr char const* name() noexcept
{
return "inclusive_scan";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -157,8 +162,8 @@ namespace hpx::traits {
std::size_t generation, T&& t, F&& op)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::inclusive_scan_tag>(),
communication::communicator_data<
communication::inclusive_scan_tag>::id(),
which, generation,
// step function (invoked for each get)
[&t](auto& data, std::size_t which) {
Expand Down
17 changes: 12 additions & 5 deletions libs/full/collectives/include/hpx/collectives/reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,15 @@ namespace hpx::traits {
struct reduce_tag;

template <>
constexpr char const* communicator_name<reduce_tag>() noexcept
struct communicator_data<reduce_tag>
{
return "reduce";
}
static constexpr char const* name() noexcept
{
return "reduce";
}

HPX_EXPORT static operation_id_type id() noexcept;
};
} // namespace communication

///////////////////////////////////////////////////////////////////////////
Expand All @@ -253,7 +258,8 @@ namespace hpx::traits {
std::size_t generation, T&& t, F&& op)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<communication::reduce_tag>(),
communication::communicator_data<
communication::reduce_tag>::id(),
which, generation,
// step function (invoked once for get)
[&t](auto& data, std::size_t which) {
Expand All @@ -280,7 +286,8 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<communication::reduce_tag>(),
communication::communicator_data<
communication::reduce_tag>::id(),
which, generation,
// step function (invoked for each set)
[t = HPX_FORWARD(T, t)](auto& data, std::size_t which) mutable {
Expand Down
Loading

0 comments on commit c2e4cb0

Please sign in to comment.