Skip to content

Commit

Permalink
Adding optional handshakes to acknowledge the received data
Browse files Browse the repository at this point in the history
- this prevents sends to be posted before their corresponding receives
- flyby: improved MPI error handling, now throws exceptions instead of assert
  • Loading branch information
hkaiser committed Jul 13, 2024
1 parent 7e3e582 commit f5d0b72
Show file tree
Hide file tree
Showing 13 changed files with 552 additions and 184 deletions.
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-clang-16.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ configure_extra_options+=" -DHPX_WITH_FETCH_EVE=ON"
# Make sure HWLOC does not report 'cores'. This is purely an option to enable
# testing the topology code under conditions close to those on FreeBSD.
configure_extra_options+=" -DHPX_TOPOLOGY_WITH_ADDITIONAL_HWLOC_TESTING=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-12.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-14.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
4 changes: 2 additions & 2 deletions libs/core/mpi_base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ add_hpx_module(
SOURCES ${mpi_base_sources}
HEADERS ${mpi_base_headers}
COMPAT_HEADERS ${mpi_base_compat_headers}
MODULE_DEPENDENCIES hpx_format hpx_logging hpx_runtime_configuration
hpx_string_util hpx_util
MODULE_DEPENDENCIES hpx_errors hpx_format hpx_logging
hpx_runtime_configuration hpx_string_util hpx_util
DEPENDENCIES ${additional_dependencies}
CMAKE_SUBDIRS examples tests
)
9 changes: 9 additions & 0 deletions libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,17 @@ namespace hpx::util {

using mutex_type = hpx::spinlock;

static void check_mpi_error(
scoped_lock& l, hpx::source_location const& sl, int error);
static void check_mpi_error(
scoped_try_lock& l, hpx::source_location const& sl, int error);

// The highest order bit is used for acknowledgement messages
static int MPI_MAX_TAG;

constexpr static unsigned int MPI_ACK_MASK = 0xC000;
constexpr static unsigned int MPI_ACK_TAG = 0x4000;

private:
static mutex_type mtx_;

Expand Down
108 changes: 87 additions & 21 deletions libs/core/mpi_base/src/mpi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/concepts/has_xxx.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/logging.hpp>
#include <hpx/modules/mpi_base.hpp>
#include <hpx/modules/runtime_configuration.hpp>
Expand All @@ -23,7 +26,7 @@
///////////////////////////////////////////////////////////////////////////////
namespace hpx::util {

int mpi_environment::MPI_MAX_TAG = 32767;
int mpi_environment::MPI_MAX_TAG = 8192;

namespace detail {

Expand Down Expand Up @@ -111,20 +114,24 @@ namespace hpx::util {
int mpi_environment::is_initialized_ = -1;

///////////////////////////////////////////////////////////////////////////
[[noreturn]] static void throw_wrong_mpi_mode(int required, int provided)
{
std::map<int, char const*> levels = {
{MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"},
{MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"},
{MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"},
{MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}};

HPX_THROW_EXCEPTION(hpx::error::invalid_status,
"hpx::util::mpi_environment::init",
"MPI doesn't implement minimal requested thread level, required "
"{}, provided {}",
levels[required], levels[provided]);
}
namespace {

[[noreturn]] void throw_wrong_mpi_mode(int required, int provided)
{
std::map<int, char const*> levels = {
{MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"},
{MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"},
{MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"},
{MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}};

HPX_THROW_EXCEPTION(hpx::error::invalid_status,
"hpx::util::mpi_environment::init",
"MPI doesn't implement minimal requested thread level, "
"required "
"{}, provided {}",
levels[required], levels[provided]);
}
} // namespace

int mpi_environment::init(
int*, char***, int const minimal, int const required, int& provided)
Expand Down Expand Up @@ -231,7 +238,7 @@ namespace hpx::util {

MPI_Comm_dup(MPI_COMM_WORLD, &communicator_);

// explicitly disable multi-threaded mpi if needed
// explicitly disable multithreaded mpi if needed
if (provided_threading_flag_ <= MPI_THREAD_SERIALIZED)
{
rtcfg.add_entry("hpx.parcel.mpi.multithreaded", "0");
Expand Down Expand Up @@ -269,18 +276,27 @@ namespace hpx::util {

rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank));
rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name());
void* max_tag_p;
int flag;
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);

scoped_lock l;

void* max_tag_p = nullptr;
int flag = 0;
int const ret =
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);

if (flag)
MPI_MAX_TAG = *(int*) max_tag_p;
{
MPI_MAX_TAG =
static_cast<int>(*static_cast<int*>(max_tag_p) & ~MPI_ACK_MASK);
}
}

std::string mpi_environment::get_processor_name()
{
scoped_lock l;

char name[MPI_MAX_PROCESSOR_NAME + 1] = {'\0'};
char name[MPI_MAX_PROCESSOR_NAME + 1] = {};
int len = 0;
MPI_Get_processor_name(name, &len);

Expand Down Expand Up @@ -397,6 +413,56 @@ namespace hpx::util {
mtx_.unlock();
}
}

namespace {

[[noreturn]] void report_error(
hpx::source_location const& sl, int error_code)
{
char error_string[MPI_MAX_ERROR_STRING];
int error_len = sizeof(error_string);
int const ret =
MPI_Error_string(error_code, error_string, &error_len);
if (ret != MPI_SUCCESS)
{
HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
sl.function_name(),
"MPI error (%s/%d): couldn't retrieve error string for "
"code %d",
sl.file_name(), sl.line(), error_code);
}

HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
sl.function_name(), "MPI error (%s/%d): %s", sl.file_name(),
sl.line(), error_string);
}
} // namespace

void mpi_environment::check_mpi_error(
scoped_lock& l, hpx::source_location const& sl, int error_code)
{
if (error_code == MPI_SUCCESS)
{
return;
}

l.unlock();

report_error(sl, error_code);
}

void mpi_environment::check_mpi_error(
scoped_try_lock& l, hpx::source_location const& sl, int error_code)
{
if (error_code == MPI_SUCCESS)
{
return;
}

l.unlock();

report_error(sl, error_code);
}
} // namespace hpx::util

#endif
Loading

0 comments on commit f5d0b72

Please sign in to comment.