Skip to content

Commit

Permalink
Adding optional handshakes to acknowledge the received data
Browse files Browse the repository at this point in the history
- this prevents sends to be posted before their corresponding receives
  • Loading branch information
hkaiser committed Jul 13, 2024
1 parent 7e3e582 commit 6493863
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 128 deletions.
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-clang-16.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ configure_extra_options+=" -DHPX_WITH_FETCH_EVE=ON"
# Make sure HWLOC does not report 'cores'. This is purely an option to enable
# testing the topology code under conditions close to those on FreeBSD.
configure_extra_options+=" -DHPX_TOPOLOGY_WITH_ADDITIONAL_HWLOC_TESTING=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-12.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-14.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
4 changes: 4 additions & 0 deletions libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ namespace hpx::util {

using mutex_type = hpx::spinlock;

// The highest order bit is used for acknowledgement messages
static int MPI_MAX_TAG;

constexpr static unsigned int MPI_ACK_MASK = 0xC000;
constexpr static unsigned int MPI_ACK_TAG = 0x4000;

private:
static mutex_type mtx_;

Expand Down
18 changes: 13 additions & 5 deletions libs/core/mpi_base/src/mpi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/modules/logging.hpp>
#include <hpx/modules/mpi_base.hpp>
#include <hpx/modules/runtime_configuration.hpp>
Expand All @@ -23,7 +24,7 @@
///////////////////////////////////////////////////////////////////////////////
namespace hpx::util {

int mpi_environment::MPI_MAX_TAG = 32767;
int mpi_environment::MPI_MAX_TAG = 8192;

namespace detail {

Expand Down Expand Up @@ -269,11 +270,18 @@ namespace hpx::util {

rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank));
rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name());
void* max_tag_p;
int flag;
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);

void* max_tag_p = nullptr;
int flag = 0;
[[maybe_unused]] int ret =
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);
HPX_ASSERT(ret == MPI_SUCCESS);

if (flag)
MPI_MAX_TAG = *(int*) max_tag_p;
{
MPI_MAX_TAG =
static_cast<int>(*static_cast<int*>(max_tag_p) & ~MPI_ACK_MASK);
}
}

std::string mpi_environment::get_processor_name()
Expand Down
84 changes: 52 additions & 32 deletions libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2013-2023 Hartmut Kaiser
// Copyright (c) 2013-2024 Hartmut Kaiser
// Copyright (c) 2013-2015 Thomas Heller
// Copyright (c) 2023 Jiakun Yan
//
Expand All @@ -22,30 +22,33 @@
#include <utility>

namespace hpx::parcelset::policies::mpi {

struct header
{
using value_type = int;
enum data_pos
{
// siguature for assert_valid
// signature for assert_valid
pos_signature = 0,
// tag
pos_tag = 1 * sizeof(value_type),
// enable ack handshakes
pos_ack_handshakes = 2 * sizeof(value_type),
// non-zero-copy chunk size
pos_numbytes_nonzero_copy = 2 * sizeof(value_type),
pos_numbytes_nonzero_copy = 3 * sizeof(value_type),
// transmission chunk size
pos_numbytes_tchunk = 3 * sizeof(value_type),
pos_numbytes_tchunk = 4 * sizeof(value_type),
// how many bytes in total (including zero-copy and non-zero-copy chunks)
pos_numbytes = 4 * sizeof(value_type),
pos_numbytes = 5 * sizeof(value_type),
// zero-copy chunk number
pos_numchunks_zero_copy = 5 * sizeof(value_type),
pos_numchunks_zero_copy = 6 * sizeof(value_type),
// non-zero-copy chunk number
pos_numchunks_nonzero_copy = 6 * sizeof(value_type),
pos_numchunks_nonzero_copy = 7 * sizeof(value_type),
// whether piggyback data
pos_piggy_back_flag_data = 7 * sizeof(value_type),
pos_piggy_back_flag_data = 8 * sizeof(value_type),
// whether piggyback transmission chunk
pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1,
pos_piggy_back_address = 7 * sizeof(value_type) + 2
pos_piggy_back_flag_tchunk = 8 * sizeof(value_type) + 1,
pos_piggy_back_address = 8 * sizeof(value_type) + 2
};

template <typename buffer_type, typename ChunkType>
Expand All @@ -60,18 +63,21 @@ namespace hpx::parcelset::policies::mpi {
{
current_header_size += buffer.data_.size();
}
int num_zero_copy_chunks = buffer.num_chunks_.first;
[[maybe_unused]] int num_non_zero_copy_chunks =

int const num_zero_copy_chunks = buffer.num_chunks_.first;
[[maybe_unused]] int const num_non_zero_copy_chunks =
buffer.num_chunks_.second;
if (num_zero_copy_chunks != 0)
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<size_t>(
num_zero_copy_chunks + num_non_zero_copy_chunks));
int const tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <=
static_cast<int>(max_header_size - current_header_size))
{
current_header_size += tchunk_size;
}
Expand All @@ -86,21 +92,22 @@ namespace hpx::parcelset::policies::mpi {
HPX_ASSERT(max_header_size >= pos_piggy_back_address);
data_ = header_buffer;
memset(data_, 0, pos_piggy_back_address);
std::int64_t size = static_cast<std::int64_t>(buffer.data_.size());
std::int64_t numbytes =

std::int64_t const size =
static_cast<std::int64_t>(buffer.data_.size());
std::int64_t const numbytes =
static_cast<std::int64_t>(buffer.data_size_);
HPX_ASSERT(size <= (std::numeric_limits<value_type>::max)());
HPX_ASSERT(numbytes <= (std::numeric_limits<value_type>::max)());
int num_zero_copy_chunks = buffer.num_chunks_.first;
int num_non_zero_copy_chunks = buffer.num_chunks_.second;

int const num_zero_copy_chunks = buffer.num_chunks_.first;
int const num_non_zero_copy_chunks = buffer.num_chunks_.second;

set(pos_signature, MAGIC_SIGNATURE);
set(pos_numbytes_nonzero_copy, static_cast<value_type>(size));
set(pos_numbytes, static_cast<value_type>(numbytes));
set(pos_numchunks_zero_copy,
static_cast<value_type>(num_zero_copy_chunks));
set(pos_numchunks_nonzero_copy,
static_cast<value_type>(num_non_zero_copy_chunks));
set(pos_numchunks_zero_copy, num_zero_copy_chunks);
set(pos_numchunks_nonzero_copy, num_non_zero_copy_chunks);
data_[pos_piggy_back_flag_data] = 0;
data_[pos_piggy_back_flag_tchunk] = 0;

Expand All @@ -112,16 +119,19 @@ namespace hpx::parcelset::policies::mpi {
&data_[current_header_size], &buffer.data_[0], size);
current_header_size += size;
}

if (num_zero_copy_chunks != 0)
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<size_t>(
num_zero_copy_chunks + num_non_zero_copy_chunks));
int const tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
set(pos_numbytes_tchunk, static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <=
static_cast<int>(max_header_size - current_header_size))
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down Expand Up @@ -155,12 +165,12 @@ namespace hpx::parcelset::policies::mpi {
HPX_ASSERT(valid());
}

[[nodiscard]] char* data() noexcept
[[nodiscard]] char* data() const noexcept
{
return data_;
}

[[nodiscard]] size_t size() noexcept
[[nodiscard]] size_t size() const noexcept
{
return pos_piggy_back_address + piggy_back_size();
}
Expand All @@ -175,11 +185,21 @@ namespace hpx::parcelset::policies::mpi {
set(pos_tag, static_cast<value_type>(tag));
}

void set_ack_handshakes(bool enable_ack_handshakes) noexcept
{
set(pos_ack_handshakes, enable_ack_handshakes ? 1 : 0);
}

[[nodiscard]] value_type get_tag() const noexcept
{
return get(pos_tag);
}

[[nodiscard]] value_type get_ack_handshakes() const noexcept
{
return get(pos_ack_handshakes);
}

[[nodiscard]] value_type numbytes_nonzero_copy() const noexcept
{
return get(pos_numbytes_nonzero_copy);
Expand All @@ -205,15 +225,15 @@ namespace hpx::parcelset::policies::mpi {
return get(pos_numchunks_nonzero_copy);
}

[[nodiscard]] constexpr char* piggy_back_address() noexcept
[[nodiscard]] constexpr char* piggy_back_address() const noexcept
{
if (data_[pos_piggy_back_flag_data] ||
data_[pos_piggy_back_flag_tchunk])
return &data_[pos_piggy_back_address];
return nullptr;
}

[[nodiscard]] int piggy_back_size() noexcept
[[nodiscard]] int piggy_back_size() const noexcept
{
int result = 0;
if (data_[pos_piggy_back_flag_data])
Expand All @@ -223,14 +243,14 @@ namespace hpx::parcelset::policies::mpi {
return result;
}

[[nodiscard]] constexpr char* piggy_back_data() noexcept
[[nodiscard]] constexpr char* piggy_back_data() const noexcept
{
if (data_[pos_piggy_back_flag_data])
return &data_[pos_piggy_back_address];
return nullptr;
}

[[nodiscard]] constexpr char* piggy_back_tchunk() noexcept
[[nodiscard]] constexpr char* piggy_back_tchunk() const noexcept
{
size_t current_header_size = pos_piggy_back_address;
if (!data_[pos_piggy_back_flag_tchunk])
Expand Down
11 changes: 7 additions & 4 deletions libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ namespace hpx::parcelset::policies::mpi {
template <typename Parcelport>
struct receiver
{
using handles_header_type = std::set<std::pair<int, int>>;
using connection_type = receiver_connection<Parcelport>;
using connection_ptr = std::shared_ptr<connection_type>;
using connection_list = std::deque<connection_ptr>;
Expand Down Expand Up @@ -104,6 +103,10 @@ namespace hpx::parcelset::policies::mpi {
#if defined(HPX_MSVC)
#pragma warning(push)
#pragma warning(disable : 26110)
#endif
#if defined(HPX_GCC_VERSION)
#pragma GCC diagnostic push
#pragma GCC diagnostic error "-Werror=class-memaccess"
#endif

if (l.locked)
Expand All @@ -128,6 +131,9 @@ namespace hpx::parcelset::policies::mpi {
}
}

#if defined(HPX_GCC_VERSION)
#pragma GCC diagnostic pop
#endif
#if defined(HPX_MSVC)
#pragma warning(pop)
#endif
Expand All @@ -152,9 +158,6 @@ namespace hpx::parcelset::policies::mpi {
MPI_Request hdr_request_;
std::vector<char> header_buffer_;

hpx::spinlock handles_header_mtx_;
handles_header_type handles_header_;

hpx::spinlock connections_mtx_;
connection_list connections_;

Expand Down
Loading

0 comments on commit 6493863

Please sign in to comment.