From 6780fc6ade930b85d37c66d754b1cd9f68647ab0 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Fri, 12 Jul 2024 17:08:58 -0500 Subject: [PATCH] Adding optional handshakes to acknowledge the received data - this prevents sends to be posted before their corresponding receives - flyby: improved MPI error handling, now throws exceptions instead of assert --- .jenkins/lsu/env-clang-16.sh | 3 + .jenkins/lsu/env-gcc-12.sh | 3 + .jenkins/lsu/env-gcc-14.sh | 3 + libs/core/mpi_base/CMakeLists.txt | 4 +- .../include/hpx/mpi_base/mpi_environment.hpp | 9 + libs/core/mpi_base/src/mpi_environment.cpp | 112 +++++++-- .../include/hpx/parcelport_mpi/header.hpp | 84 ++++--- .../include/hpx/parcelport_mpi/receiver.hpp | 24 +- .../parcelport_mpi/receiver_connection.hpp | 232 ++++++++++++++---- .../include/hpx/parcelport_mpi/sender.hpp | 16 +- .../hpx/parcelport_mpi/sender_connection.hpp | 214 ++++++++++++---- .../hpx/parcelport_mpi/tag_provider.hpp | 8 +- .../parcelport_mpi/src/parcelport_mpi.cpp | 31 ++- 13 files changed, 558 insertions(+), 185 deletions(-) diff --git a/.jenkins/lsu/env-clang-16.sh b/.jenkins/lsu/env-clang-16.sh index 4a5d951bd1aa..c96b2d88dbaa 100644 --- a/.jenkins/lsu/env-clang-16.sh +++ b/.jenkins/lsu/env-clang-16.sh @@ -37,3 +37,6 @@ configure_extra_options+=" -DHPX_WITH_FETCH_EVE=ON" # Make sure HWLOC does not report 'cores'. This is purely an option to enable # testing the topology code under conditions close to those on FreeBSD. configure_extra_options+=" -DHPX_TOPOLOGY_WITH_ADDITIONAL_HWLOC_TESTING=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/.jenkins/lsu/env-gcc-12.sh b/.jenkins/lsu/env-gcc-12.sh index 06e288a62b7f..c751fe262caa 100644 --- a/.jenkins/lsu/env-gcc-12.sh +++ b/.jenkins/lsu/env-gcc-12.sh @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/.jenkins/lsu/env-gcc-14.sh b/.jenkins/lsu/env-gcc-14.sh index a256e0651b18..d7d2ad183623 100644 --- a/.jenkins/lsu/env-gcc-14.sh +++ b/.jenkins/lsu/env-gcc-14.sh @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +# enable additional handshaking in MPI parcelport +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1" diff --git a/libs/core/mpi_base/CMakeLists.txt b/libs/core/mpi_base/CMakeLists.txt index cafd5d0ff5fe..29c70621859d 100644 --- a/libs/core/mpi_base/CMakeLists.txt +++ b/libs/core/mpi_base/CMakeLists.txt @@ -35,8 +35,8 @@ add_hpx_module( SOURCES ${mpi_base_sources} HEADERS ${mpi_base_headers} COMPAT_HEADERS ${mpi_base_compat_headers} - MODULE_DEPENDENCIES hpx_format hpx_logging hpx_runtime_configuration - hpx_string_util hpx_util + MODULE_DEPENDENCIES hpx_errors hpx_format hpx_logging + hpx_runtime_configuration hpx_string_util hpx_util DEPENDENCIES ${additional_dependencies} CMAKE_SUBDIRS examples tests ) diff --git a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp index 97423be36d66..dde6149131bc 100644 --- a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp +++ b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp @@ -86,8 +86,17 @@ namespace hpx::util { using mutex_type = hpx::spinlock; + static void check_mpi_error( + scoped_lock& l, hpx::source_location const& sl, int error); + static void check_mpi_error( + scoped_try_lock& l, hpx::source_location const& sl, int error); + + // The highest order bit is used for acknowledgement messages static int MPI_MAX_TAG; + constexpr static unsigned int MPI_ACK_MASK = 0xC000; + constexpr static unsigned int MPI_ACK_TAG = 0x4000; + private: static mutex_type mtx_; diff --git a/libs/core/mpi_base/src/mpi_environment.cpp b/libs/core/mpi_base/src/mpi_environment.cpp index 5b7e8325c726..a155bc262e45 100644 --- a/libs/core/mpi_base/src/mpi_environment.cpp +++ b/libs/core/mpi_base/src/mpi_environment.cpp @@ -8,6 +8,9 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #include +#include +#include +#include #include #include #include @@ -23,7 +26,7 @@ /////////////////////////////////////////////////////////////////////////////// namespace hpx::util { - int mpi_environment::MPI_MAX_TAG = 32767; + int mpi_environment::MPI_MAX_TAG = 8192; namespace detail { @@ -111,20 +114,24 @@ namespace hpx::util { int mpi_environment::is_initialized_ = -1; /////////////////////////////////////////////////////////////////////////// - [[noreturn]] static void throw_wrong_mpi_mode(int required, int provided) - { - std::map levels = { - {MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"}, - {MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"}, - {MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"}, - {MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}}; - - HPX_THROW_EXCEPTION(hpx::error::invalid_status, - "hpx::util::mpi_environment::init", - "MPI doesn't implement minimal requested thread level, required " - "{}, provided {}", - levels[required], levels[provided]); - } + namespace { + + [[noreturn]] void throw_wrong_mpi_mode(int required, int provided) + { + std::map levels = { + {MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"}, + {MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"}, + {MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"}, + {MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}}; + + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "hpx::util::mpi_environment::init", + "MPI doesn't implement minimal requested thread level, " + "required " + "{}, provided {}", + levels[required], levels[provided]); + } + } // namespace int mpi_environment::init( int*, char***, int const minimal, int const required, int& provided) @@ -231,7 +238,7 @@ namespace hpx::util { MPI_Comm_dup(MPI_COMM_WORLD, &communicator_); - // explicitly disable multi-threaded mpi if needed + // explicitly disable multithreaded mpi if needed if (provided_threading_flag_ <= MPI_THREAD_SERIALIZED) { rtcfg.add_entry("hpx.parcel.mpi.multithreaded", "0"); @@ -250,6 +257,10 @@ namespace hpx::util { "disable MPI multi-threading."); } + // let all errors be returned from MPI calls + MPI_Comm_set_errhandler(communicator_, MPI_ERRORS_RETURN); + + // initialize status this_rank = rank(); #if defined(HPX_HAVE_NETWORKING) @@ -269,18 +280,27 @@ namespace hpx::util { rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank)); rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name()); - void* max_tag_p; - int flag; - MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag); + + scoped_lock l; + + void* max_tag_p = nullptr; + int flag = 0; + int const ret = + MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag); + check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret); + if (flag) - MPI_MAX_TAG = *(int*) max_tag_p; + { + MPI_MAX_TAG = + static_cast(*static_cast(max_tag_p) & ~MPI_ACK_MASK); + } } std::string mpi_environment::get_processor_name() { scoped_lock l; - char name[MPI_MAX_PROCESSOR_NAME + 1] = {'\0'}; + char name[MPI_MAX_PROCESSOR_NAME + 1] = {}; int len = 0; MPI_Get_processor_name(name, &len); @@ -397,6 +417,56 @@ namespace hpx::util { mtx_.unlock(); } } + + namespace { + + [[noreturn]] void report_error( + hpx::source_location const& sl, int error_code) + { + char error_string[MPI_MAX_ERROR_STRING + 1]; + int error_len = sizeof(error_string); + int const ret = + MPI_Error_string(error_code, error_string, &error_len); + if (ret != MPI_SUCCESS) + { + HPX_THROW_EXCEPTION(hpx::error::internal_server_error, + sl.function_name(), + "MPI error (%s/%d): couldn't retrieve error string for " + "code %d", + sl.file_name(), sl.line(), error_code); + } + + HPX_THROW_EXCEPTION(hpx::error::internal_server_error, + sl.function_name(), "MPI error (%s/%d): %s", sl.file_name(), + sl.line(), error_string); + } + } // namespace + + void mpi_environment::check_mpi_error( + scoped_lock& l, hpx::source_location const& sl, int error_code) + { + if (error_code == MPI_SUCCESS) + { + return; + } + + l.unlock(); + + report_error(sl, error_code); + } + + void mpi_environment::check_mpi_error( + scoped_try_lock& l, hpx::source_location const& sl, int error_code) + { + if (error_code == MPI_SUCCESS) + { + return; + } + + l.unlock(); + + report_error(sl, error_code); + } } // namespace hpx::util #endif diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp index de76bb9562f8..2c92a967d9d3 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2013-2023 Hartmut Kaiser +// Copyright (c) 2013-2024 Hartmut Kaiser // Copyright (c) 2013-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -22,30 +22,33 @@ #include namespace hpx::parcelset::policies::mpi { + struct header { using value_type = int; enum data_pos { - // siguature for assert_valid + // signature for assert_valid pos_signature = 0, // tag pos_tag = 1 * sizeof(value_type), + // enable ack handshakes + pos_ack_handshakes = 2 * sizeof(value_type), // non-zero-copy chunk size - pos_numbytes_nonzero_copy = 2 * sizeof(value_type), + pos_numbytes_nonzero_copy = 3 * sizeof(value_type), // transmission chunk size - pos_numbytes_tchunk = 3 * sizeof(value_type), + pos_numbytes_tchunk = 4 * sizeof(value_type), // how many bytes in total (including zero-copy and non-zero-copy chunks) - pos_numbytes = 4 * sizeof(value_type), + pos_numbytes = 5 * sizeof(value_type), // zero-copy chunk number - pos_numchunks_zero_copy = 5 * sizeof(value_type), + pos_numchunks_zero_copy = 6 * sizeof(value_type), // non-zero-copy chunk number - pos_numchunks_nonzero_copy = 6 * sizeof(value_type), + pos_numchunks_nonzero_copy = 7 * sizeof(value_type), // whether piggyback data - pos_piggy_back_flag_data = 7 * sizeof(value_type), + pos_piggy_back_flag_data = 8 * sizeof(value_type), // whether piggyback transmission chunk - pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1, - pos_piggy_back_address = 7 * sizeof(value_type) + 2 + pos_piggy_back_flag_tchunk = 8 * sizeof(value_type) + 1, + pos_piggy_back_address = 8 * sizeof(value_type) + 2 }; template @@ -60,18 +63,21 @@ namespace hpx::parcelset::policies::mpi { { current_header_size += buffer.data_.size(); } - int num_zero_copy_chunks = buffer.num_chunks_.first; - [[maybe_unused]] int num_non_zero_copy_chunks = + + int const num_zero_copy_chunks = buffer.num_chunks_.first; + [[maybe_unused]] int const num_non_zero_copy_chunks = buffer.num_chunks_.second; if (num_zero_copy_chunks != 0) { HPX_ASSERT(buffer.transmission_chunks_.size() == - size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = + static_cast( + num_zero_copy_chunks + num_non_zero_copy_chunks)); + int const tchunk_size = static_cast(buffer.transmission_chunks_.size() * sizeof(typename parcel_buffer::transmission_chunk_type)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= + static_cast(max_header_size - current_header_size)) { current_header_size += tchunk_size; } @@ -86,21 +92,22 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(max_header_size >= pos_piggy_back_address); data_ = header_buffer; memset(data_, 0, pos_piggy_back_address); - std::int64_t size = static_cast(buffer.data_.size()); - std::int64_t numbytes = + + std::int64_t const size = + static_cast(buffer.data_.size()); + std::int64_t const numbytes = static_cast(buffer.data_size_); HPX_ASSERT(size <= (std::numeric_limits::max)()); HPX_ASSERT(numbytes <= (std::numeric_limits::max)()); - int num_zero_copy_chunks = buffer.num_chunks_.first; - int num_non_zero_copy_chunks = buffer.num_chunks_.second; + + int const num_zero_copy_chunks = buffer.num_chunks_.first; + int const num_non_zero_copy_chunks = buffer.num_chunks_.second; set(pos_signature, MAGIC_SIGNATURE); set(pos_numbytes_nonzero_copy, static_cast(size)); set(pos_numbytes, static_cast(numbytes)); - set(pos_numchunks_zero_copy, - static_cast(num_zero_copy_chunks)); - set(pos_numchunks_nonzero_copy, - static_cast(num_non_zero_copy_chunks)); + set(pos_numchunks_zero_copy, num_zero_copy_chunks); + set(pos_numchunks_nonzero_copy, num_non_zero_copy_chunks); data_[pos_piggy_back_flag_data] = 0; data_[pos_piggy_back_flag_tchunk] = 0; @@ -112,16 +119,19 @@ namespace hpx::parcelset::policies::mpi { &data_[current_header_size], &buffer.data_[0], size); current_header_size += size; } + if (num_zero_copy_chunks != 0) { HPX_ASSERT(buffer.transmission_chunks_.size() == - size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = + static_cast( + num_zero_copy_chunks + num_non_zero_copy_chunks)); + int const tchunk_size = static_cast(buffer.transmission_chunks_.size() * sizeof(typename parcel_buffer::transmission_chunk_type)); set(pos_numbytes_tchunk, static_cast(tchunk_size)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= + static_cast(max_header_size - current_header_size)) { data_[pos_piggy_back_flag_tchunk] = 1; std::memcpy(&data_[current_header_size], @@ -155,12 +165,12 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(valid()); } - [[nodiscard]] char* data() noexcept + [[nodiscard]] char* data() const noexcept { return data_; } - [[nodiscard]] size_t size() noexcept + [[nodiscard]] size_t size() const noexcept { return pos_piggy_back_address + piggy_back_size(); } @@ -175,11 +185,21 @@ namespace hpx::parcelset::policies::mpi { set(pos_tag, static_cast(tag)); } + void set_ack_handshakes(bool enable_ack_handshakes) noexcept + { + set(pos_ack_handshakes, enable_ack_handshakes ? 1 : 0); + } + [[nodiscard]] value_type get_tag() const noexcept { return get(pos_tag); } + [[nodiscard]] value_type get_ack_handshakes() const noexcept + { + return get(pos_ack_handshakes); + } + [[nodiscard]] value_type numbytes_nonzero_copy() const noexcept { return get(pos_numbytes_nonzero_copy); @@ -205,7 +225,7 @@ namespace hpx::parcelset::policies::mpi { return get(pos_numchunks_nonzero_copy); } - [[nodiscard]] constexpr char* piggy_back_address() noexcept + [[nodiscard]] constexpr char* piggy_back_address() const noexcept { if (data_[pos_piggy_back_flag_data] || data_[pos_piggy_back_flag_tchunk]) @@ -213,7 +233,7 @@ namespace hpx::parcelset::policies::mpi { return nullptr; } - [[nodiscard]] int piggy_back_size() noexcept + [[nodiscard]] int piggy_back_size() const noexcept { int result = 0; if (data_[pos_piggy_back_flag_data]) @@ -223,14 +243,14 @@ namespace hpx::parcelset::policies::mpi { return result; } - [[nodiscard]] constexpr char* piggy_back_data() noexcept + [[nodiscard]] constexpr char* piggy_back_data() const noexcept { if (data_[pos_piggy_back_flag_data]) return &data_[pos_piggy_back_address]; return nullptr; } - [[nodiscard]] constexpr char* piggy_back_tchunk() noexcept + [[nodiscard]] constexpr char* piggy_back_tchunk() const noexcept { size_t current_header_size = pos_piggy_back_address; if (!data_[pos_piggy_back_flag_tchunk]) diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp index 0ba6bf82650d..e3e23fb6cb51 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp @@ -30,7 +30,6 @@ namespace hpx::parcelset::policies::mpi { template struct receiver { - using handles_header_type = std::set>; using connection_type = receiver_connection; using connection_ptr = std::shared_ptr; using connection_list = std::deque; @@ -112,9 +111,11 @@ namespace hpx::parcelset::policies::mpi { if (request_done_locked(l, hdr_request_, &status)) { int recv_size = 0; - [[maybe_unused]] int const ret = + int const ret = MPI_Get_count(&status, MPI_CHAR, &recv_size); - HPX_ASSERT(ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + std::vector recv_header(header_buffer_.begin(), header_buffer_.begin() + recv_size); @@ -139,11 +140,12 @@ namespace hpx::parcelset::policies::mpi { void post_new_header([[maybe_unused]] Lock& l) noexcept { HPX_ASSERT_OWNS_LOCK(l); - [[maybe_unused]] int const ret = MPI_Irecv(header_buffer_.data(), + int const ret = MPI_Irecv(header_buffer_.data(), static_cast(header_buffer_.size()), MPI_BYTE, MPI_ANY_SOURCE, 0, util::mpi_environment::communicator(), &hdr_request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); } Parcelport& pp_; @@ -152,21 +154,19 @@ namespace hpx::parcelset::policies::mpi { MPI_Request hdr_request_; std::vector header_buffer_; - hpx::spinlock handles_header_mtx_; - handles_header_type handles_header_; - hpx::spinlock connections_mtx_; connection_list connections_; template - static bool request_done_locked([[maybe_unused]] Lock& l, - MPI_Request& r, MPI_Status* status) noexcept + static bool request_done_locked( + Lock& l, MPI_Request& r, MPI_Status* status) noexcept { HPX_ASSERT_OWNS_LOCK(l); int completed = 0; - [[maybe_unused]] int const ret = MPI_Test(&r, &completed, status); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + int const ret = MPI_Test(&r, &completed, status); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); return completed ? true : false; } diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp index 3abba418bf00..bed4e218af3a 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 @@ -32,41 +32,53 @@ namespace hpx::parcelset::policies::mpi { struct receiver_connection { private: - enum connection_state + enum class connection_state : std::uint8_t { - initialized, - rcvd_transmission_chunks, - rcvd_data, - rcvd_chunks, + initialized = 1, + rcvd_transmission_chunks = 2, + rcvd_data = 3, + rcvd_chunks = 4, + + acked_transmission_chunks = 5, + acked_data = 6 }; using data_type = std::vector; using buffer_type = parcel_buffer; + constexpr int ack_tag() const noexcept + { + return static_cast(tag_ | util::mpi_environment::MPI_ACK_TAG); + } + public: receiver_connection( - int src, std::vector header_buffer, Parcelport& pp) noexcept - : state_(initialized) + int src, std::vector header_buffer, Parcelport& pp) + : state_(connection_state::initialized) , src_(src) , request_(MPI_REQUEST_NULL) , request_ptr_(nullptr) , chunks_idx_(0) , zero_copy_chunks_idx_(0) + , needs_ack_handshake_(false) + , ack_(0) , pp_(pp) { header header_ = header(header_buffer.data()); header_.assert_valid(); + #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point& data = buffer_.data_point_; data.time_ = timer_.elapsed_nanoseconds(); data.bytes_ = static_cast(header_.numbytes()); #endif tag_ = header_.get_tag(); + needs_ack_handshake_ = header_.get_ack_handshakes(); + // decode data buffer_.data_.resize(header_.numbytes_nonzero_copy()); - char* piggy_back_data = header_.piggy_back_data(); - if (piggy_back_data) + if (char* piggy_back_data = header_.piggy_back_data()) { need_recv_data = false; memcpy(buffer_.data_.data(), piggy_back_data, @@ -76,29 +88,38 @@ namespace hpx::parcelset::policies::mpi { { need_recv_data = true; } + need_recv_tchunks = false; if (header_.num_zero_copy_chunks() != 0) { // decode transmission chunk - int num_zero_copy_chunks = header_.num_zero_copy_chunks(); - int num_non_zero_copy_chunks = + int const num_zero_copy_chunks = header_.num_zero_copy_chunks(); + int const num_non_zero_copy_chunks = header_.num_non_zero_copy_chunks(); buffer_.num_chunks_.first = num_zero_copy_chunks; buffer_.num_chunks_.second = num_non_zero_copy_chunks; + auto& tchunks = buffer_.transmission_chunks_; tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - int tchunks_length = static_cast(tchunks.size() * - sizeof(buffer_type::transmission_chunk_type)); - char* piggy_back_tchunk = header_.piggy_back_tchunk(); - if (piggy_back_tchunk) + if (char* piggy_back_tchunk = header_.piggy_back_tchunk()) { - memcpy(static_cast(tchunks.data()), - piggy_back_tchunk, tchunks_length); +#if defined(HPX_GCC_VERSION) && !defined(HPX_CLANG_VERSION) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wclass-memaccess" +#endif + int const tchunks_length = static_cast(tchunks.size() * + sizeof(buffer_type::transmission_chunk_type)); + memcpy(tchunks.data(), piggy_back_tchunk, tchunks_length); + +#if defined(HPX_GCC_VERSION) && !defined(HPX_CLANG_VERSION) +#pragma GCC diagnostic pop +#endif } else { need_recv_tchunks = true; } + // zero-copy chunks buffer_.chunks_.resize(num_zero_copy_chunks); if (!pp_.allow_zero_copy_receive_optimizations()) @@ -112,18 +133,24 @@ namespace hpx::parcelset::policies::mpi { { switch (state_) { - case initialized: + case connection_state::initialized: return receive_transmission_chunks(num_thread); - case rcvd_transmission_chunks: - return receive_data(num_thread); + case connection_state::rcvd_transmission_chunks: + return ack_transmission_chunks(num_thread); - case rcvd_data: - return receive_chunks(num_thread); + case connection_state::rcvd_data: + return ack_data(num_thread); - case rcvd_chunks: + case connection_state::rcvd_chunks: return done(num_thread); + case connection_state::acked_transmission_chunks: + return receive_data(num_thread); + + case connection_state::acked_data: + return receive_chunks(num_thread); + default: HPX_ASSERT(false); } @@ -136,54 +163,148 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = - MPI_Irecv(buffer_.transmission_chunks_.data(), - static_cast(buffer_.transmission_chunks_.size() * - sizeof(buffer_type::transmission_chunk_type)), - MPI_BYTE, src_, tag_, - util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + int const ret = MPI_Irecv(buffer_.transmission_chunks_.data(), + static_cast(buffer_.transmission_chunks_.size() * + sizeof(buffer_type::transmission_chunk_type)), + MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), + &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; + + state_ = connection_state::rcvd_transmission_chunks; + return ack_transmission_chunks(num_thread); + } + + // no need to acknowledge the transmission chunks + state_ = connection_state::rcvd_transmission_chunks; + return receive_data(num_thread); + } + + constexpr bool need_ack_transmission_chunks() const noexcept + { + return needs_ack_handshake_ && need_recv_tchunks; + } + + bool ack_transmission_chunks(std::size_t num_thread = -1) + { + if (!need_ack_transmission_chunks()) + { + return receive_data(num_thread); } - state_ = rcvd_transmission_chunks; + HPX_ASSERT(state_ == connection_state::rcvd_transmission_chunks); + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + ack_ = static_cast( + connection_state::acked_transmission_chunks); + int const ret = + MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), + util::mpi_environment::communicator(), &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + + request_ptr_ = &request_; + } + + state_ = connection_state::acked_transmission_chunks; return receive_data(num_thread); } + constexpr bool need_ack_data() const noexcept + { + return needs_ack_handshake_ && need_recv_data; + } + bool receive_data(std::size_t num_thread = -1) { + HPX_ASSERT( + (!need_ack_transmission_chunks() && + state_ == connection_state::rcvd_transmission_chunks) || + (need_ack_transmission_chunks() && + state_ == connection_state::acked_transmission_chunks)); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); if (need_recv_data) { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Irecv(buffer_.data_.data(), + int const ret = MPI_Irecv(buffer_.data_.data(), static_cast(buffer_.data_.size()), MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; + + state_ = connection_state::rcvd_data; + return ack_data(num_thread); + } + + // no need to acknowledge the data sent + state_ = connection_state::rcvd_data; + return receive_chunks(num_thread); + } + + bool ack_data(std::size_t num_thread = -1) + { + if (!need_ack_data()) + { + return receive_chunks(num_thread); } - state_ = rcvd_data; + HPX_ASSERT(state_ == connection_state::rcvd_data); + + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + ack_ = static_cast(connection_state::acked_data); + int const ret = + MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), + util::mpi_environment::communicator(), &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + request_ptr_ = &request_; + } + + state_ = connection_state::acked_data; return receive_chunks(num_thread); } bool receive_chunks(std::size_t num_thread = -1) { + HPX_ASSERT( + (!need_ack_data() && state_ == connection_state::rcvd_data) || + (need_ack_data() && state_ == connection_state::acked_data)); + if (pp_.allow_zero_copy_receive_optimizations()) { if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); // handle zero-copy receive, this should be done on the first entry // to receive_chunks only @@ -191,18 +312,18 @@ namespace hpx::parcelset::policies::mpi { { HPX_ASSERT(zero_copy_chunks_idx_ == 0); - auto const num_zero_copy_chunks = static_cast( - static_cast(buffer_.num_chunks_.first)); + auto const num_zero_copy_chunks = + static_cast(buffer_.num_chunks_.first); if (num_zero_copy_chunks != 0) { HPX_ASSERT( buffer_.chunks_.size() == num_zero_copy_chunks); // De-serialize the parcels such that all data but the - // zero-copy chunks are in place. This de-serialization also - // allocates all zero-chunk buffers and stores those in the - // chunks array for the subsequent networking to place the - // received data directly. + // zero-copy chunks are in place. This de-serialization + // also allocates all zero-chunk buffers and stores + // those in the chunks array for the subsequent + // networking to place the received data directly. for (std::size_t i = 0; i != num_zero_copy_chunks; ++i) { auto const chunk_size = static_cast( @@ -230,6 +351,7 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); auto& c = buffer_.chunks_[chunks_idx_++]; if (c.type_ == serialization::chunk_type::chunk_type_index) @@ -239,9 +361,9 @@ namespace hpx::parcelset::policies::mpi { // the zero-copy chunks come first in the transmission_chunks_ // array - auto const chunk_size = static_cast( + auto const chunk_size = buffer_.transmission_chunks_[zero_copy_chunks_idx_++] - .second); + .second; // the parcel de-serialization above should have allocated the // correct amount of memory @@ -253,10 +375,11 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Irecv(c.data(), + int const ret = MPI_Irecv(c.data(), static_cast(chunk_size), MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } @@ -275,6 +398,7 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); std::size_t const idx = chunks_idx_++; std::size_t const chunk_size = @@ -290,26 +414,30 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Irecv(c.data(), + int const ret = MPI_Irecv(c.data(), static_cast(c.size()), MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } } } - state_ = rcvd_chunks; + state_ = connection_state::rcvd_chunks; return done(num_thread); } bool done(std::size_t num_thread = -1) noexcept { + HPX_ASSERT(state_ == connection_state::rcvd_chunks); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point& data = buffer_.data_point_; @@ -350,9 +478,10 @@ namespace hpx::parcelset::policies::mpi { } int completed = 0; - [[maybe_unused]] int const ret = + int const ret = MPI_Test(request_ptr_, &completed, MPI_STATUS_IGNORE); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); if (completed) { @@ -378,6 +507,9 @@ namespace hpx::parcelset::policies::mpi { std::size_t chunks_idx_; std::size_t zero_copy_chunks_idx_; + bool needs_ack_handshake_; + char ack_; + Parcelport& pp_; std::vector parcels_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp index e1a8d0592ff6..409f6e45ddb6 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2021 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -35,11 +35,13 @@ namespace hpx::parcelset::policies::mpi { using connection_ptr = std::shared_ptr; using connection_list = std::deque; - void run() noexcept {} + constexpr static void run() noexcept {} - connection_ptr create_connection(int dest, parcelset::parcelport* pp) + connection_ptr create_connection( + int dest, parcelset::parcelport* pp, bool enable_ack_handshakes) { - return std::make_shared(this, dest, pp); + return std::make_shared( + this, dest, pp, enable_ack_handshakes); } void add(connection_ptr const& ptr) @@ -101,12 +103,14 @@ namespace hpx::parcelset::policies::mpi { using parcel_buffer_type = parcel_buffer; using callback_fn_type = hpx::move_only_function; + bool send_immediate(parcelset::parcelport* pp, parcelset::locality const& dest, parcel_buffer_type buffer, - callback_fn_type&& callbackFn) + callback_fn_type&& callbackFn, bool enable_ack_handshakes) { int dest_rank = dest.get().rank(); - auto connection = create_connection(dest_rank, pp); + auto connection = + create_connection(dest_rank, pp, enable_ack_handshakes); connection->buffer_ = HPX_MOVE(buffer); connection->async_write(HPX_MOVE(callbackFn), nullptr); return true; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp index 27fd01dda9d8..7b80fec43900 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2021 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2023 Jiakun Yan // @@ -50,27 +50,32 @@ namespace hpx::parcelset::policies::mpi { using data_type = std::vector; - enum connection_state + enum class connection_state : std::uint8_t { - initialized, - sent_header, - sent_transmission_chunks, - sent_data, - sent_chunks + initialized = 0, + sent_header = 1, + sent_transmission_chunks = 2, + sent_data = 3, + sent_chunks = 4, + + acked_transmission_chunks = 5, + acked_data = 6 }; using base_type = parcelset::parcelport_connection; public: - sender_connection(sender_type* s, int dst, parcelset::parcelport* pp) - : state_(initialized) + sender_connection(sender_type* s, int dst, parcelset::parcelport* pp, + bool enable_ack_handshakes) + : state_(connection_state::initialized) , sender_(s) , tag_(-1) , dst_(dst) , request_(MPI_REQUEST_NULL) , request_ptr_(nullptr) , chunks_idx_(0) + , needs_ack_handshake_(enable_ack_handshakes) , ack_(0) , pp_(pp) , there_(parcelset::locality(locality(dst_))) @@ -87,6 +92,11 @@ namespace hpx::parcelset::policies::mpi { { } + constexpr int ack_tag() const noexcept + { + return static_cast(tag_ | util::mpi_environment::MPI_ACK_TAG); + } + using handler_type = hpx::move_only_function; using post_handler_type = hpx::move_only_functionget_zero_copy_serialization_threshold())); - header_ = header(buffer_, static_cast(header_buffer.data()), - header_buffer.size()); + + header_ = + header(buffer_, header_buffer.data(), header_buffer.size()); header_.set_tag(tag_); + header_.set_ack_handshakes(needs_ack_handshake_); header_.assert_valid(); - state_ = initialized; + state_ = connection_state::initialized; handler_ = HPX_MOVE(handler); @@ -133,21 +145,27 @@ namespace hpx::parcelset::policies::mpi { { switch (state_) { - case initialized: + case connection_state::initialized: return send_header(); - case sent_header: + case connection_state::sent_header: return send_transmission_chunks(); - case sent_transmission_chunks: - return send_data(); + case connection_state::sent_transmission_chunks: + return ack_transmission_chunks(); - case sent_data: - return send_chunks(); + case connection_state::sent_data: + return ack_data(); - case sent_chunks: + case connection_state::sent_chunks: return done(); + case connection_state::acked_transmission_chunks: + return send_data(); + + case connection_state::acked_data: + return send_chunks(); + default: HPX_ASSERT(false); } @@ -156,32 +174,33 @@ namespace hpx::parcelset::policies::mpi { bool send_header() { + HPX_ASSERT(state_ == connection_state::initialized); + HPX_ASSERT(request_ptr_ == nullptr); + { util::mpi_environment::scoped_lock l; - HPX_ASSERT(state_ == initialized); - HPX_ASSERT(request_ptr_ == nullptr); - [[maybe_unused]] int const ret = MPI_Isend(header_buffer.data(), - (int) header_buffer.size(), MPI_BYTE, dst_, 0, + int const ret = MPI_Isend(header_buffer.data(), + static_cast(header_buffer.size()), MPI_BYTE, dst_, 0, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } - state_ = sent_header; + state_ = connection_state::sent_header; return send_transmission_chunks(); } bool send_transmission_chunks() { - HPX_ASSERT(state_ == sent_header); - HPX_ASSERT(request_ptr_ != nullptr); + HPX_ASSERT(state_ == connection_state::sent_header); + if (!request_done()) { return false; } - HPX_ASSERT(request_ptr_ == nullptr); auto const& chunks = buffer_.transmission_chunks_; @@ -189,47 +208,139 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Isend(chunks.data(), + int const ret = MPI_Isend(chunks.data(), static_cast(chunks.size() * sizeof(parcel_buffer_type::transmission_chunk_type)), MPI_BYTE, dst_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + + request_ptr_ = &request_; + + state_ = connection_state::sent_transmission_chunks; + return ack_transmission_chunks(); + } + + // no need to acknowledge the transmission chunks + state_ = connection_state::sent_transmission_chunks; + return send_data(); + } + + constexpr bool need_ack_transmission_chunks() const noexcept + { + auto const& chunks = buffer_.transmission_chunks_; + return needs_ack_handshake_ && !chunks.empty() && + !header_.piggy_back_tchunk(); + } + + bool ack_transmission_chunks() + { + if (!need_ack_transmission_chunks()) + { + return send_data(); + } + + HPX_ASSERT(state_ == connection_state::sent_transmission_chunks); + + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + int const ret = + MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), + util::mpi_environment::communicator(), &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } - state_ = sent_transmission_chunks; + state_ = connection_state::acked_transmission_chunks; return send_data(); } bool send_data() { - HPX_ASSERT(state_ == sent_transmission_chunks); + HPX_ASSERT( + (need_ack_transmission_chunks() && + state_ == connection_state::acked_transmission_chunks) || + (!need_ack_transmission_chunks() && + state_ == connection_state::sent_transmission_chunks)); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); if (!header_.piggy_back_data()) { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Isend(buffer_.data_.data(), + int const ret = MPI_Isend(buffer_.data_.data(), static_cast(buffer_.data_.size()), MPI_BYTE, dst_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + + request_ptr_ = &request_; + + state_ = connection_state::sent_data; + return ack_data(); + } + + // no need to acknowledge the data sent + state_ = connection_state::sent_data; + return send_chunks(); + } + + constexpr bool need_ack_data() const noexcept + { + return needs_ack_handshake_ && !header_.piggy_back_data(); + } + + bool ack_data() + { + if (!need_ack_data()) + { + return send_chunks(); + } + + HPX_ASSERT(state_ == connection_state::sent_data); + + if (!request_done()) + { + return false; + } + HPX_ASSERT(request_ptr_ == nullptr); + + { + util::mpi_environment::scoped_lock l; + + int const ret = + MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), + util::mpi_environment::communicator(), &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } - state_ = sent_data; + state_ = connection_state::acked_data; return send_chunks(); } bool send_chunks() { - HPX_ASSERT(state_ == sent_data); + HPX_ASSERT( + (!need_ack_data() && state_ == connection_state::sent_data) || + (need_ack_data() && state_ == connection_state::acked_data)); while (chunks_idx_ < buffer_.chunks_.size()) { @@ -240,14 +351,15 @@ namespace hpx::parcelset::policies::mpi { { return false; } + HPX_ASSERT(request_ptr_ == nullptr); util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = - MPI_Isend(const_cast(c.data_.cpos_), - static_cast(c.size_), MPI_BYTE, dst_, tag_, - util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + int const ret = MPI_Isend(c.data_.cpos_, + static_cast(c.size_), MPI_BYTE, dst_, tag_, + util::mpi_environment::communicator(), &request_); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); request_ptr_ = &request_; } @@ -255,21 +367,24 @@ namespace hpx::parcelset::policies::mpi { ++chunks_idx_; } - state_ = sent_chunks; - + state_ = connection_state::sent_chunks; return done(); } bool done() { + HPX_ASSERT(state_ == connection_state::sent_chunks); + if (!request_done()) { return false; } + HPX_ASSERT(request_ptr_ == nullptr); error_code const ec(throwmode::lightweight); handler_(ec); handler_.reset(); + #if defined(HPX_HAVE_PARCELPORT_COUNTERS) buffer_.data_point_.time_ = static_cast( @@ -279,8 +394,7 @@ namespace hpx::parcelset::policies::mpi { #endif buffer_.clear(); - state_ = initialized; - + state_ = connection_state::initialized; return true; } @@ -291,16 +405,18 @@ namespace hpx::parcelset::policies::mpi { return true; } - util::mpi_environment::scoped_try_lock const l; + util::mpi_environment::scoped_try_lock l; if (!l.locked) { return false; } int completed = 0; - [[maybe_unused]] int const ret = + int const ret = MPI_Test(request_ptr_, &completed, MPI_STATUS_IGNORE); - HPX_ASSERT(ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); + if (completed) { request_ptr_ = nullptr; @@ -323,6 +439,8 @@ namespace hpx::parcelset::policies::mpi { MPI_Request request_; MPI_Request* request_ptr_; std::size_t chunks_idx_; + + bool needs_ack_handshake_; char ack_; parcelset::parcelport* pp_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp index dcb83334194a..14eaf26d91a3 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp @@ -10,19 +10,15 @@ #include #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_MPI) -#include -#include +#include #include -#include -#include -#include namespace hpx::parcelset::policies::mpi { struct tag_provider { - tag_provider() + constexpr tag_provider() noexcept : next_tag(0) { } diff --git a/libs/full/parcelport_mpi/src/parcelport_mpi.cpp b/libs/full/parcelport_mpi/src/parcelport_mpi.cpp index d1d703bcfee1..e30025899e76 100644 --- a/libs/full/parcelport_mpi/src/parcelport_mpi.cpp +++ b/libs/full/parcelport_mpi/src/parcelport_mpi.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2020 Google // @@ -138,6 +138,17 @@ namespace hpx::parcelset { return false; } + static bool enable_ack_handshakes( + util::runtime_configuration const& ini) + { + if (hpx::util::get_entry_as( + ini, "hpx.parcel.mpi.ack_handshake", 0) != 0) + { + return true; + } + return false; + } + public: using sender_type = sender; parcelport(util::runtime_configuration const& ini, @@ -148,6 +159,7 @@ namespace hpx::parcelset { , background_threads_(background_threads(ini)) , multi_threaded_mpi_(multi_threaded_mpi(ini)) , enable_send_immediate_(enable_send_immediate(ini)) + , enable_ack_handshakes_(enable_ack_handshakes(ini)) { } @@ -193,9 +205,10 @@ namespace hpx::parcelset { { util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = + int const ret = MPI_Barrier(util::mpi_environment::communicator()); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + util::mpi_environment::check_mpi_error( + l, HPX_CURRENT_SOURCE_LOCATION(), ret); } } @@ -209,7 +222,8 @@ namespace hpx::parcelset { parcelset::locality const& l, error_code&) { int const dest_rank = l.get().rank(); - return sender_.create_connection(dest_rank, this); + return sender_.create_connection( + dest_rank, this, enable_ack_handshakes_); } parcelset::locality agas_locality( @@ -245,7 +259,7 @@ namespace hpx::parcelset { return has_work; } - bool can_send_immediate() const + constexpr bool can_send_immediate() const noexcept { return enable_send_immediate_; } @@ -255,8 +269,8 @@ namespace hpx::parcelset { sender::parcel_buffer_type buffer, sender::callback_fn_type&& callbackFn) { - return sender_.send_immediate( - pp, dest, HPX_MOVE(buffer), HPX_MOVE(callbackFn)); + return sender_.send_immediate(pp, dest, HPX_MOVE(buffer), + HPX_MOVE(callbackFn), enable_ack_handshakes_); } template @@ -326,6 +340,7 @@ namespace hpx::parcelset { std::size_t background_threads_; bool multi_threaded_mpi_; bool enable_send_immediate_; + bool enable_ack_handshakes_; }; } // namespace policies::mpi } // namespace hpx::parcelset @@ -333,7 +348,7 @@ namespace hpx::parcelset { #include // Inject additional configuration data into the factory registry for this -// type. This information ends up in the system wide configuration database +// type. This information ends up in the system-wide configuration database // under the plugin specific section: // // [hpx.parcel.mpi]