From 788004cda9c50417adfcb608923b169409a55457 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Fri, 12 Jul 2024 17:08:58 -0500 Subject: [PATCH] Adding optional handshakes to acknowledge the received data - this prevents sends to be posted before their corresponding receives --- .jenkins/lsu/env-clang-16.sh | 3 + .jenkins/lsu/env-gcc-12.sh | 3 + .jenkins/lsu/env-gcc-14.sh | 3 + .../include/hpx/mpi_base/mpi_environment.hpp | 4 + libs/core/mpi_base/src/mpi_environment.cpp | 18 +- .../include/hpx/parcelport_mpi/header.hpp | 84 +++++--- .../include/hpx/parcelport_mpi/receiver.hpp | 11 +- .../parcelport_mpi/receiver_connection.hpp | 179 ++++++++++++++--- .../include/hpx/parcelport_mpi/sender.hpp | 16 +- .../hpx/parcelport_mpi/sender_connection.hpp | 186 ++++++++++++++---- .../hpx/parcelport_mpi/tag_provider.hpp | 8 +- .../parcelport_mpi/src/parcelport_mpi.cpp | 26 ++- 12 files changed, 413 insertions(+), 128 deletions(-) diff --git a/.jenkins/lsu/env-clang-16.sh b/.jenkins/lsu/env-clang-16.sh index 4a5d951bd1aa..c96b2d88dbaa 100644 --- a/.jenkins/lsu/env-clang-16.sh +++ b/.jenkins/lsu/env-clang-16.sh @@ -37,3 +37,6 @@ configure_extra_options+=" -DHPX_WITH_FETCH_EVE=ON" # Make sure HWLOC does not report 'cores'. This is purely an option to enable # testing the topology code under conditions close to those on FreeBSD. configure_extra_options+=" -DHPX_TOPOLOGY_WITH_ADDITIONAL_HWLOC_TESTING=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/.jenkins/lsu/env-gcc-12.sh b/.jenkins/lsu/env-gcc-12.sh index 06e288a62b7f..c751fe262caa 100644 --- a/.jenkins/lsu/env-gcc-12.sh +++ b/.jenkins/lsu/env-gcc-12.sh @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/.jenkins/lsu/env-gcc-14.sh b/.jenkins/lsu/env-gcc-14.sh index a256e0651b18..d7d2ad183623 100644 --- a/.jenkins/lsu/env-gcc-14.sh +++ b/.jenkins/lsu/env-gcc-14.sh @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp index 97423be36d66..dfa34a70877d 100644 --- a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp +++ b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp @@ -86,8 +86,12 @@ namespace hpx::util { using mutex_type = hpx::spinlock; + // The highest order bit is used for acknowledgement messages static int MPI_MAX_TAG; + constexpr static unsigned int MPI_ACK_MASK = 0xC000; + constexpr static unsigned int MPI_ACK_TAG = 0x4000; + private: static mutex_type mtx_; diff --git a/libs/core/mpi_base/src/mpi_environment.cpp b/libs/core/mpi_base/src/mpi_environment.cpp index 5b7e8325c726..283b7b0fbd16 100644 --- a/libs/core/mpi_base/src/mpi_environment.cpp +++ b/libs/core/mpi_base/src/mpi_environment.cpp @@ -8,6 +8,7 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #include +#include #include #include #include @@ -23,7 +24,7 @@ /////////////////////////////////////////////////////////////////////////////// namespace hpx::util { - int mpi_environment::MPI_MAX_TAG = 32767; + int mpi_environment::MPI_MAX_TAG = 8192; namespace detail { @@ -269,11 +270,18 @@ namespace hpx::util { rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank)); rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name()); - void* max_tag_p; - int flag; - MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag); + + void* max_tag_p = nullptr; + int flag = 0; + [[maybe_unused]] int ret = + MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag); + HPX_ASSERT(ret == MPI_SUCCESS); + if (flag) - MPI_MAX_TAG = *(int*) max_tag_p; + { + MPI_MAX_TAG = + static_cast(*static_cast(max_tag_p) & ~MPI_ACK_MASK); + } } std::string mpi_environment::get_processor_name() diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp index de76bb9562f8..2c92a967d9d3 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2013-2023 Hartmut Kaiser +// Copyright (c) 2013-2024 Hartmut Kaiser // Copyright (c) 2013-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -22,30 +22,33 @@ #include namespace hpx::parcelset::policies::mpi { + struct header { using value_type = int; enum data_pos { - // siguature for assert_valid + // signature for assert_valid pos_signature = 0, // tag pos_tag = 1 * sizeof(value_type), + // enable ack handshakes + pos_ack_handshakes = 2 * sizeof(value_type), // non-zero-copy chunk size - pos_numbytes_nonzero_copy = 2 * sizeof(value_type), + pos_numbytes_nonzero_copy = 3 * sizeof(value_type), // transmission chunk size - pos_numbytes_tchunk = 3 * sizeof(value_type), + pos_numbytes_tchunk = 4 * sizeof(value_type), // how many bytes in total (including zero-copy and non-zero-copy chunks) - pos_numbytes = 4 * sizeof(value_type), + pos_numbytes = 5 * sizeof(value_type), // zero-copy chunk number - pos_numchunks_zero_copy = 5 * sizeof(value_type), + pos_numchunks_zero_copy = 6 * sizeof(value_type), // non-zero-copy chunk number - pos_numchunks_nonzero_copy = 6 * sizeof(value_type), + pos_numchunks_nonzero_copy = 7 * sizeof(value_type), // whether piggyback data - pos_piggy_back_flag_data = 7 * sizeof(value_type), + pos_piggy_back_flag_data = 8 * sizeof(value_type), // whether piggyback transmission chunk - pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1, - pos_piggy_back_address = 7 * sizeof(value_type) + 2 + pos_piggy_back_flag_tchunk = 8 * sizeof(value_type) + 1, + pos_piggy_back_address = 8 * sizeof(value_type) + 2 }; template @@ -60,18 +63,21 @@ namespace hpx::parcelset::policies::mpi { { current_header_size += buffer.data_.size(); } - int num_zero_copy_chunks = buffer.num_chunks_.first; - [[maybe_unused]] int num_non_zero_copy_chunks = + + int const num_zero_copy_chunks = buffer.num_chunks_.first; + [[maybe_unused]] int const num_non_zero_copy_chunks = buffer.num_chunks_.second; if (num_zero_copy_chunks != 0) { HPX_ASSERT(buffer.transmission_chunks_.size() == - size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = + static_cast( + num_zero_copy_chunks + num_non_zero_copy_chunks)); + int const tchunk_size = static_cast(buffer.transmission_chunks_.size() * sizeof(typename parcel_buffer::transmission_chunk_type)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= + static_cast(max_header_size - current_header_size)) { current_header_size += tchunk_size; } @@ -86,21 +92,22 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(max_header_size >= pos_piggy_back_address); data_ = header_buffer; memset(data_, 0, pos_piggy_back_address); - std::int64_t size = static_cast(buffer.data_.size()); - std::int64_t numbytes = + + std::int64_t const size = + static_cast(buffer.data_.size()); + std::int64_t const numbytes = static_cast(buffer.data_size_); HPX_ASSERT(size <= (std::numeric_limits::max)()); HPX_ASSERT(numbytes <= (std::numeric_limits::max)()); - int num_zero_copy_chunks = buffer.num_chunks_.first; - int num_non_zero_copy_chunks = buffer.num_chunks_.second; + + int const num_zero_copy_chunks = buffer.num_chunks_.first; + int const num_non_zero_copy_chunks = buffer.num_chunks_.second; set(pos_signature, MAGIC_SIGNATURE); set(pos_numbytes_nonzero_copy, static_cast(size)); set(pos_numbytes, static_cast(numbytes)); - set(pos_numchunks_zero_copy, - static_cast(num_zero_copy_chunks)); - set(pos_numchunks_nonzero_copy, - static_cast(num_non_zero_copy_chunks)); + set(pos_numchunks_zero_copy, num_zero_copy_chunks); + set(pos_numchunks_nonzero_copy, num_non_zero_copy_chunks); data_[pos_piggy_back_flag_data] = 0; data_[pos_piggy_back_flag_tchunk] = 0; @@ -112,16 +119,19 @@ namespace hpx::parcelset::policies::mpi { &data_[current_header_size], &buffer.data_[0], size); current_header_size += size; } + if (num_zero_copy_chunks != 0) { HPX_ASSERT(buffer.transmission_chunks_.size() == - size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = + static_cast( + num_zero_copy_chunks + num_non_zero_copy_chunks)); + int const tchunk_size = static_cast(buffer.transmission_chunks_.size() * sizeof(typename parcel_buffer::transmission_chunk_type)); set(pos_numbytes_tchunk, static_cast(tchunk_size)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= + static_cast(max_header_size - current_header_size)) { data_[pos_piggy_back_flag_tchunk] = 1; std::memcpy(&data_[current_header_size], @@ -155,12 +165,12 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(valid()); } - [[nodiscard]] char* data() noexcept + [[nodiscard]] char* data() const noexcept { return data_; } - [[nodiscard]] size_t size() noexcept + [[nodiscard]] size_t size() const noexcept { return pos_piggy_back_address + piggy_back_size(); } @@ -175,11 +185,21 @@ namespace hpx::parcelset::policies::mpi { set(pos_tag, static_cast(tag)); } + void set_ack_handshakes(bool enable_ack_handshakes) noexcept + { + set(pos_ack_handshakes, enable_ack_handshakes ? 1 : 0); + } + [[nodiscard]] value_type get_tag() const noexcept { return get(pos_tag); } + [[nodiscard]] value_type get_ack_handshakes() const noexcept + { + return get(pos_ack_handshakes); + } + [[nodiscard]] value_type numbytes_nonzero_copy() const noexcept { return get(pos_numbytes_nonzero_copy); @@ -205,7 +225,7 @@ namespace hpx::parcelset::policies::mpi { return get(pos_numchunks_nonzero_copy); } - [[nodiscard]] constexpr char* piggy_back_address() noexcept + [[nodiscard]] constexpr char* piggy_back_address() const noexcept { if (data_[pos_piggy_back_flag_data] || data_[pos_piggy_back_flag_tchunk]) @@ -213,7 +233,7 @@ namespace hpx::parcelset::policies::mpi { return nullptr; } - [[nodiscard]] int piggy_back_size() noexcept + [[nodiscard]] int piggy_back_size() const noexcept { int result = 0; if (data_[pos_piggy_back_flag_data]) @@ -223,14 +243,14 @@ namespace hpx::parcelset::policies::mpi { return result; } - [[nodiscard]] constexpr char* piggy_back_data() noexcept + [[nodiscard]] constexpr char* piggy_back_data() const noexcept { if (data_[pos_piggy_back_flag_data]) return &data_[pos_piggy_back_address]; return nullptr; } - [[nodiscard]] constexpr char* piggy_back_tchunk() noexcept + [[nodiscard]] constexpr char* piggy_back_tchunk() const noexcept { size_t current_header_size = pos_piggy_back_address; if (!data_[pos_piggy_back_flag_tchunk]) diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp index 0ba6bf82650d..e75b471fa8ec 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp @@ -30,7 +30,6 @@ namespace hpx::parcelset::policies::mpi { template struct receiver { - using handles_header_type = std::set>; using connection_type = receiver_connection; using connection_ptr = std::shared_ptr; using connection_list = std::deque; @@ -104,6 +103,10 @@ namespace hpx::parcelset::policies::mpi { #if defined(HPX_MSVC) #pragma warning(push) #pragma warning(disable : 26110) +#endif +#if defined(HPX_GCC_VERSION) +#pragma GCC diagnostic push +#pragma GCC diagnostic error "-Werror=class-memaccess" #endif if (l.locked) @@ -128,6 +131,9 @@ namespace hpx::parcelset::policies::mpi { } } +#if defined(HPX_GCC_VERSION) +#pragma GCC diagnostic pop +#endif #if defined(HPX_MSVC) #pragma warning(pop) #endif @@ -152,9 +158,6 @@ namespace hpx::parcelset::policies::mpi { MPI_Request hdr_request_; std::vector header_buffer_; - hpx::spinlock handles_header_mtx_; - handles_header_type handles_header_; - hpx::spinlock connections_mtx_; connection_list connections_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp index 3abba418bf00..042784cdcd52 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 @@ -32,41 +32,53 @@ namespace hpx::parcelset::policies::mpi { struct receiver_connection { private: - enum connection_state + enum class connection_state : std::uint8_t { - initialized, - rcvd_transmission_chunks, - rcvd_data, - rcvd_chunks, + initialized = 1, + rcvd_transmission_chunks = 2, + rcvd_data = 3, + rcvd_chunks = 4, + + acked_transmission_chunks = 5, + acked_data = 6 }; using data_type = std::vector; using buffer_type = parcel_buffer; + constexpr int ack_tag() const noexcept + { + return static_cast(tag_ | util::mpi_environment::MPI_ACK_TAG); + } + public: receiver_connection( - int src, std::vector header_buffer, Parcelport& pp) noexcept - : state_(initialized) + int src, std::vector header_buffer, Parcelport& pp) + : state_(connection_state::initialized) , src_(src) , request_(MPI_REQUEST_NULL) , request_ptr_(nullptr) , chunks_idx_(0) , zero_copy_chunks_idx_(0) + , needs_ack_handshake_(false) + , ack_(0) , pp_(pp) { header header_ = header(header_buffer.data()); header_.assert_valid(); + #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point& data = buffer_.data_point_; data.time_ = timer_.elapsed_nanoseconds(); data.bytes_ = static_cast(header_.numbytes()); #endif tag_ = header_.get_tag(); + needs_ack_handshake_ = header_.get_ack_handshakes(); + // decode data buffer_.data_.resize(header_.numbytes_nonzero_copy()); - char* piggy_back_data = header_.piggy_back_data(); - if (piggy_back_data) + if (char* piggy_back_data = header_.piggy_back_data()) { need_recv_data = false; memcpy(buffer_.data_.data(), piggy_back_data, @@ -76,6 +88,7 @@ namespace hpx::parcelset::policies::mpi { { need_recv_data = true; } + need_recv_tchunks = false; if (header_.num_zero_copy_chunks() != 0) { @@ -89,16 +102,15 @@ namespace hpx::parcelset::policies::mpi { tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); int tchunks_length = static_cast(tchunks.size() * sizeof(buffer_type::transmission_chunk_type)); - char* piggy_back_tchunk = header_.piggy_back_tchunk(); - if (piggy_back_tchunk) + if (char* piggy_back_tchunk = header_.piggy_back_tchunk()) { - memcpy(static_cast(tchunks.data()), - piggy_back_tchunk, tchunks_length); + memcpy(tchunks.data(), piggy_back_tchunk, tchunks_length); } else { need_recv_tchunks = true; } + // zero-copy chunks buffer_.chunks_.resize(num_zero_copy_chunks); if (!pp_.allow_zero_copy_receive_optimizations()) @@ -112,18 +124,24 @@ namespace hpx::parcelset::policies::mpi { { switch (state_) { - case initialized: + case connection_state::initialized: return receive_transmission_chunks(num_thread); - case rcvd_transmission_chunks: - return receive_data(num_thread); + case connection_state::rcvd_transmission_chunks: + return ack_transmission_chunks(num_thread); - case rcvd_data: - return receive_chunks(num_thread); + case connection_state::rcvd_data: + return ack_data(num_thread); - case rcvd_chunks: + case connection_state::rcvd_chunks: return done(num_thread); + case connection_state::acked_transmission_chunks: + return receive_data(num_thread); + + case connection_state::acked_data: + return receive_chunks(num_thread); + default: HPX_ASSERT(false); } @@ -145,19 +163,71 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; + + state_ = connection_state::rcvd_transmission_chunks; + return ack_transmission_chunks(num_thread); + } + + // no need to acknowledge the transmission chunks + state_ = connection_state::rcvd_transmission_chunks; + return receive_data(num_thread); + } + + constexpr bool need_ack_transmission_chunks() const noexcept + { + return needs_ack_handshake_ && need_recv_tchunks; + } + + bool ack_transmission_chunks(std::size_t num_thread = -1) + { + if (!need_ack_transmission_chunks()) + { + return receive_data(num_thread); + } + + HPX_ASSERT(state_ == connection_state::rcvd_transmission_chunks); + + if (!request_done()) + { + return false; } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + ack_ = static_cast( + connection_state::acked_transmission_chunks); + [[maybe_unused]] int const ret = + MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), + util::mpi_environment::communicator(), &request_); + HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); - state_ = rcvd_transmission_chunks; + request_ptr_ = &request_; + } + state_ = connection_state::acked_transmission_chunks; return receive_data(num_thread); } + constexpr bool need_ack_data() const noexcept + { + return needs_ack_handshake_ && need_recv_data; + } + bool receive_data(std::size_t num_thread = -1) { + HPX_ASSERT( + (!need_ack_transmission_chunks() && + state_ == connection_state::rcvd_transmission_chunks) || + (need_ack_transmission_chunks() && + state_ == connection_state::acked_transmission_chunks)); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); if (need_recv_data) { @@ -169,21 +239,60 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; + + state_ = connection_state::rcvd_data; + return ack_data(num_thread); + } + + // no need to acknowledge the data sent + state_ = connection_state::rcvd_data; + return receive_chunks(num_thread); + } + + bool ack_data(std::size_t num_thread = -1) + { + if (!need_ack_data()) + { + return receive_chunks(num_thread); } - state_ = rcvd_data; + HPX_ASSERT(state_ == connection_state::rcvd_data); + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + ack_ = static_cast(connection_state::acked_data); + [[maybe_unused]] int const ret = + MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), + util::mpi_environment::communicator(), &request_); + HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + + request_ptr_ = &request_; + } + + state_ = connection_state::acked_data; return receive_chunks(num_thread); } bool receive_chunks(std::size_t num_thread = -1) { + HPX_ASSERT( + (!need_ack_data() && state_ == connection_state::rcvd_data) || + (need_ack_data() && state_ == connection_state::acked_data)); + if (pp_.allow_zero_copy_receive_optimizations()) { if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); // handle zero-copy receive, this should be done on the first entry // to receive_chunks only @@ -191,18 +300,18 @@ namespace hpx::parcelset::policies::mpi { { HPX_ASSERT(zero_copy_chunks_idx_ == 0); - auto const num_zero_copy_chunks = static_cast( - static_cast(buffer_.num_chunks_.first)); + auto const num_zero_copy_chunks = + static_cast(buffer_.num_chunks_.first); if (num_zero_copy_chunks != 0) { HPX_ASSERT( buffer_.chunks_.size() == num_zero_copy_chunks); // De-serialize the parcels such that all data but the - // zero-copy chunks are in place. This de-serialization also - // allocates all zero-chunk buffers and stores those in the - // chunks array for the subsequent networking to place the - // received data directly. + // zero-copy chunks are in place. This de-serialization + // also allocates all zero-chunk buffers and stores + // those in the chunks array for the subsequent + // networking to place the received data directly. for (std::size_t i = 0; i != num_zero_copy_chunks; ++i) { auto const chunk_size = static_cast( @@ -230,6 +339,7 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); auto& c = buffer_.chunks_[chunks_idx_++]; if (c.type_ == serialization::chunk_type::chunk_type_index) @@ -239,9 +349,9 @@ namespace hpx::parcelset::policies::mpi { // the zero-copy chunks come first in the transmission_chunks_ // array - auto const chunk_size = static_cast( + auto const chunk_size = buffer_.transmission_chunks_[zero_copy_chunks_idx_++] - .second); + .second; // the parcel de-serialization above should have allocated the // correct amount of memory @@ -275,6 +385,7 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); std::size_t const idx = chunks_idx_++; std::size_t const chunk_size = @@ -299,17 +410,20 @@ namespace hpx::parcelset::policies::mpi { } } } - state_ = rcvd_chunks; + state_ = connection_state::rcvd_chunks; return done(num_thread); } bool done(std::size_t num_thread = -1) noexcept { + HPX_ASSERT(state_ == connection_state::rcvd_chunks); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point& data = buffer_.data_point_; @@ -378,6 +492,9 @@ namespace hpx::parcelset::policies::mpi { std::size_t chunks_idx_; std::size_t zero_copy_chunks_idx_; + bool needs_ack_handshake_; + char ack_; + Parcelport& pp_; std::vector parcels_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp index e1a8d0592ff6..409f6e45ddb6 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2021 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -35,11 +35,13 @@ namespace hpx::parcelset::policies::mpi { using connection_ptr = std::shared_ptr; using connection_list = std::deque; - void run() noexcept {} + constexpr static void run() noexcept {} - connection_ptr create_connection(int dest, parcelset::parcelport* pp) + connection_ptr create_connection( + int dest, parcelset::parcelport* pp, bool enable_ack_handshakes) { - return std::make_shared(this, dest, pp); + return std::make_shared( + this, dest, pp, enable_ack_handshakes); } void add(connection_ptr const& ptr) @@ -101,12 +103,14 @@ namespace hpx::parcelset::policies::mpi { using parcel_buffer_type = parcel_buffer; using callback_fn_type = hpx::move_only_function; + bool send_immediate(parcelset::parcelport* pp, parcelset::locality const& dest, parcel_buffer_type buffer, - callback_fn_type&& callbackFn) + callback_fn_type&& callbackFn, bool enable_ack_handshakes) { int dest_rank = dest.get().rank(); - auto connection = create_connection(dest_rank, pp); + auto connection = + create_connection(dest_rank, pp, enable_ack_handshakes); connection->buffer_ = HPX_MOVE(buffer); connection->async_write(HPX_MOVE(callbackFn), nullptr); return true; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp index 27fd01dda9d8..74a98b7ec4d1 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2021 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -50,27 +50,32 @@ namespace hpx::parcelset::policies::mpi { using data_type = std::vector; - enum connection_state + enum class connection_state : std::uint8_t { - initialized, - sent_header, - sent_transmission_chunks, - sent_data, - sent_chunks + initialized = 0, + sent_header = 1, + sent_transmission_chunks = 2, + sent_data = 3, + sent_chunks = 4, + + acked_transmission_chunks = 5, + acked_data = 6 }; using base_type = parcelset::parcelport_connection; public: - sender_connection(sender_type* s, int dst, parcelset::parcelport* pp) - : state_(initialized) + sender_connection(sender_type* s, int dst, parcelset::parcelport* pp, + bool enable_ack_handshakes) + : state_(connection_state::initialized) , sender_(s) , tag_(-1) , dst_(dst) , request_(MPI_REQUEST_NULL) , request_ptr_(nullptr) , chunks_idx_(0) + , needs_ack_handshake_(enable_ack_handshakes) , ack_(0) , pp_(pp) , there_(parcelset::locality(locality(dst_))) @@ -87,6 +92,11 @@ namespace hpx::parcelset::policies::mpi { { } + constexpr int ack_tag() const noexcept + { + return static_cast(tag_ | util::mpi_environment::MPI_ACK_TAG); + } + using handler_type = hpx::move_only_function; using post_handler_type = hpx::move_only_functionget_zero_copy_serialization_threshold())); - header_ = header(buffer_, static_cast(header_buffer.data()), - header_buffer.size()); + + header_ = + header(buffer_, header_buffer.data(), header_buffer.size()); header_.set_tag(tag_); + header_.set_ack_handshakes(needs_ack_handshake_); header_.assert_valid(); - state_ = initialized; + state_ = connection_state::initialized; handler_ = HPX_MOVE(handler); @@ -133,21 +145,27 @@ namespace hpx::parcelset::policies::mpi { { switch (state_) { - case initialized: + case connection_state::initialized: return send_header(); - case sent_header: + case connection_state::sent_header: return send_transmission_chunks(); - case sent_transmission_chunks: - return send_data(); + case connection_state::sent_transmission_chunks: + return ack_transmission_chunks(); - case sent_data: - return send_chunks(); + case connection_state::sent_data: + return ack_data(); - case sent_chunks: + case connection_state::sent_chunks: return done(); + case connection_state::acked_transmission_chunks: + return send_data(); + + case connection_state::acked_data: + return send_chunks(); + default: HPX_ASSERT(false); } @@ -156,32 +174,32 @@ namespace hpx::parcelset::policies::mpi { bool send_header() { + HPX_ASSERT(state_ == connection_state::initialized); + HPX_ASSERT(request_ptr_ == nullptr); + { util::mpi_environment::scoped_lock l; - HPX_ASSERT(state_ == initialized); - HPX_ASSERT(request_ptr_ == nullptr); [[maybe_unused]] int const ret = MPI_Isend(header_buffer.data(), - (int) header_buffer.size(), MPI_BYTE, dst_, 0, + static_cast(header_buffer.size()), MPI_BYTE, dst_, 0, util::mpi_environment::communicator(), &request_); HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; } - state_ = sent_header; + state_ = connection_state::sent_header; return send_transmission_chunks(); } bool send_transmission_chunks() { - HPX_ASSERT(state_ == sent_header); - HPX_ASSERT(request_ptr_ != nullptr); + HPX_ASSERT(state_ == connection_state::sent_header); + if (!request_done()) { return false; } - HPX_ASSERT(request_ptr_ == nullptr); auto const& chunks = buffer_.transmission_chunks_; @@ -197,19 +215,66 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; + + state_ = connection_state::sent_transmission_chunks; + return ack_transmission_chunks(); } - state_ = sent_transmission_chunks; + // no need to acknowledge the transmission chunks + state_ = connection_state::sent_transmission_chunks; + return send_data(); + } + + constexpr bool need_ack_transmission_chunks() const noexcept + { + auto const& chunks = buffer_.transmission_chunks_; + return needs_ack_handshake_ && !chunks.empty() && + !header_.piggy_back_tchunk(); + } + + bool ack_transmission_chunks() + { + if (!need_ack_transmission_chunks()) + { + return send_data(); + } + + HPX_ASSERT(state_ == connection_state::sent_transmission_chunks); + + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + [[maybe_unused]] int const ret = + MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), + util::mpi_environment::communicator(), &request_); + HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + + request_ptr_ = &request_; + } + + state_ = connection_state::acked_transmission_chunks; return send_data(); } bool send_data() { - HPX_ASSERT(state_ == sent_transmission_chunks); + HPX_ASSERT( + (need_ack_transmission_chunks() && + state_ == connection_state::acked_transmission_chunks) || + (!need_ack_transmission_chunks() && + state_ == connection_state::sent_transmission_chunks)); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); if (!header_.piggy_back_data()) { @@ -221,15 +286,56 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; + + state_ = connection_state::sent_data; + return ack_data(); + } + + // no need to acknowledge the data sent + state_ = connection_state::sent_data; + return send_chunks(); + } + + constexpr bool need_ack_data() const noexcept + { + return needs_ack_handshake_ && !header_.piggy_back_data(); + } + + bool ack_data() + { + if (!need_ack_data()) + { + return send_chunks(); + } + + HPX_ASSERT(state_ == connection_state::sent_data); + + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + [[maybe_unused]] int const ret = + MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), + util::mpi_environment::communicator(), &request_); + HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + + request_ptr_ = &request_; } - state_ = sent_data; + state_ = connection_state::acked_data; return send_chunks(); } bool send_chunks() { - HPX_ASSERT(state_ == sent_data); + HPX_ASSERT( + (!need_ack_data() && state_ == connection_state::sent_data) || + (need_ack_data() && state_ == connection_state::acked_data)); while (chunks_idx_ < buffer_.chunks_.size()) { @@ -240,13 +346,13 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = - MPI_Isend(const_cast(c.data_.cpos_), - static_cast(c.size_), MPI_BYTE, dst_, tag_, - util::mpi_environment::communicator(), &request_); + [[maybe_unused]] int const ret = MPI_Isend(c.data_.cpos_, + static_cast(c.size_), MPI_BYTE, dst_, tag_, + util::mpi_environment::communicator(), &request_); HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); request_ptr_ = &request_; @@ -255,21 +361,24 @@ namespace hpx::parcelset::policies::mpi { ++chunks_idx_; } - state_ = sent_chunks; - + state_ = connection_state::sent_chunks; return done(); } bool done() { + HPX_ASSERT(state_ == connection_state::sent_chunks); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); error_code const ec(throwmode::lightweight); handler_(ec); handler_.reset(); + #if defined(HPX_HAVE_PARCELPORT_COUNTERS) buffer_.data_point_.time_ = static_cast( @@ -279,8 +388,7 @@ namespace hpx::parcelset::policies::mpi { #endif buffer_.clear(); - state_ = initialized; - + state_ = connection_state::initialized; return true; } @@ -323,6 +431,8 @@ namespace hpx::parcelset::policies::mpi { MPI_Request request_; MPI_Request* request_ptr_; std::size_t chunks_idx_; + + bool needs_ack_handshake_; char ack_; parcelset::parcelport* pp_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp index dcb83334194a..14eaf26d91a3 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp @@ -10,19 +10,15 @@ #include #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_MPI) -#include -#include +#include #include -#include -#include -#include namespace hpx::parcelset::policies::mpi { struct tag_provider { - tag_provider() + constexpr tag_provider() noexcept : next_tag(0) { } diff --git a/libs/full/parcelport_mpi/src/parcelport_mpi.cpp b/libs/full/parcelport_mpi/src/parcelport_mpi.cpp index d1d703bcfee1..8d98b6c27355 100644 --- a/libs/full/parcelport_mpi/src/parcelport_mpi.cpp +++ b/libs/full/parcelport_mpi/src/parcelport_mpi.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2020 Google // @@ -138,6 +138,17 @@ namespace hpx::parcelset { return false; } + static bool enable_ack_handshakes( + util::runtime_configuration const& ini) + { + if (hpx::util::get_entry_as( + ini, "hpx.parcel.mpi.ack_handshake", 0) != 0) + { + return true; + } + return false; + } + public: using sender_type = sender; parcelport(util::runtime_configuration const& ini, @@ -148,6 +159,7 @@ namespace hpx::parcelset { , background_threads_(background_threads(ini)) , multi_threaded_mpi_(multi_threaded_mpi(ini)) , enable_send_immediate_(enable_send_immediate(ini)) + , enable_ack_handshakes_(enable_ack_handshakes(ini)) { } @@ -209,7 +221,8 @@ namespace hpx::parcelset { parcelset::locality const& l, error_code&) { int const dest_rank = l.get().rank(); - return sender_.create_connection(dest_rank, this); + return sender_.create_connection( + dest_rank, this, enable_ack_handshakes_); } parcelset::locality agas_locality( @@ -245,7 +258,7 @@ namespace hpx::parcelset { return has_work; } - bool can_send_immediate() const + constexpr bool can_send_immediate() const { return enable_send_immediate_; } @@ -255,8 +268,8 @@ namespace hpx::parcelset { sender::parcel_buffer_type buffer, sender::callback_fn_type&& callbackFn) { - return sender_.send_immediate( - pp, dest, HPX_MOVE(buffer), HPX_MOVE(callbackFn)); + return sender_.send_immediate(pp, dest, HPX_MOVE(buffer), + HPX_MOVE(callbackFn), enable_ack_handshakes_); } template @@ -326,6 +339,7 @@ namespace hpx::parcelset { std::size_t background_threads_; bool multi_threaded_mpi_; bool enable_send_immediate_; + bool enable_ack_handshakes_; }; } // namespace policies::mpi } // namespace hpx::parcelset @@ -333,7 +347,7 @@ namespace hpx::parcelset { #include // Inject additional configuration data into the factory registry for this -// type. This information ends up in the system wide configuration database +// type. This information ends up in the system-wide configuration database // under the plugin specific section: // // [hpx.parcel.mpi]