Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding optional handshakes to acknowledge the received data #6520

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-clang-16.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ configure_extra_options+=" -DHPX_WITH_FETCH_EVE=ON"
# Make sure HWLOC does not report 'cores'. This is purely an option to enable
# testing the topology code under conditions close to those on FreeBSD.
configure_extra_options+=" -DHPX_TOPOLOGY_WITH_ADDITIONAL_HWLOC_TESTING=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-12.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-14.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ configure_extra_options+=" -DHPX_WITH_EVE_TAG=main"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

# enable additional handshaking in MPI parcelport
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:ini=hpx.parcel.mpi.ack_handshake!=1"
4 changes: 2 additions & 2 deletions libs/core/mpi_base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ add_hpx_module(
SOURCES ${mpi_base_sources}
HEADERS ${mpi_base_headers}
COMPAT_HEADERS ${mpi_base_compat_headers}
MODULE_DEPENDENCIES hpx_format hpx_logging hpx_runtime_configuration
hpx_string_util hpx_util
MODULE_DEPENDENCIES hpx_errors hpx_format hpx_logging
hpx_runtime_configuration hpx_string_util hpx_util
DEPENDENCIES ${additional_dependencies}
CMAKE_SUBDIRS examples tests
)
9 changes: 9 additions & 0 deletions libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,17 @@ namespace hpx::util {

using mutex_type = hpx::spinlock;

static void check_mpi_error(
scoped_lock& l, hpx::source_location const& sl, int error);
static void check_mpi_error(
scoped_try_lock& l, hpx::source_location const& sl, int error);

// The highest order bit is used for acknowledgement messages
static int MPI_MAX_TAG;

constexpr static unsigned int MPI_ACK_MASK = 0xC000;
constexpr static unsigned int MPI_ACK_TAG = 0x4000;

private:
static mutex_type mtx_;

Expand Down
112 changes: 91 additions & 21 deletions libs/core/mpi_base/src/mpi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/concepts/has_xxx.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/modules/logging.hpp>
#include <hpx/modules/mpi_base.hpp>
#include <hpx/modules/runtime_configuration.hpp>
Expand All @@ -23,7 +26,7 @@
///////////////////////////////////////////////////////////////////////////////
namespace hpx::util {

int mpi_environment::MPI_MAX_TAG = 32767;
int mpi_environment::MPI_MAX_TAG = 8192;

namespace detail {

Expand Down Expand Up @@ -111,20 +114,24 @@ namespace hpx::util {
int mpi_environment::is_initialized_ = -1;

///////////////////////////////////////////////////////////////////////////
[[noreturn]] static void throw_wrong_mpi_mode(int required, int provided)
{
std::map<int, char const*> levels = {
{MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"},
{MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"},
{MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"},
{MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}};

HPX_THROW_EXCEPTION(hpx::error::invalid_status,
"hpx::util::mpi_environment::init",
"MPI doesn't implement minimal requested thread level, required "
"{}, provided {}",
levels[required], levels[provided]);
}
namespace {

[[noreturn]] void throw_wrong_mpi_mode(int required, int provided)
{
std::map<int, char const*> levels = {
{MPI_THREAD_SINGLE, "MPI_THREAD_SINGLE"},
{MPI_THREAD_FUNNELED, "MPI_THREAD_FUNNELED"},
{MPI_THREAD_SERIALIZED, "MPI_THREAD_SERIALIZED"},
{MPI_THREAD_MULTIPLE, "MPI_THREAD_MULTIPLE"}};

HPX_THROW_EXCEPTION(hpx::error::invalid_status,
"hpx::util::mpi_environment::init",
"MPI doesn't implement minimal requested thread level, "
"required "
"{}, provided {}",
levels[required], levels[provided]);
}
} // namespace

int mpi_environment::init(
int*, char***, int const minimal, int const required, int& provided)
Expand Down Expand Up @@ -231,7 +238,7 @@ namespace hpx::util {

MPI_Comm_dup(MPI_COMM_WORLD, &communicator_);

// explicitly disable multi-threaded mpi if needed
// explicitly disable multithreaded mpi if needed
if (provided_threading_flag_ <= MPI_THREAD_SERIALIZED)
{
rtcfg.add_entry("hpx.parcel.mpi.multithreaded", "0");
Expand All @@ -250,6 +257,10 @@ namespace hpx::util {
"disable MPI multi-threading.");
}

// let all errors be returned from MPI calls
MPI_Comm_set_errhandler(communicator_, MPI_ERRORS_RETURN);

// initialize status
this_rank = rank();

#if defined(HPX_HAVE_NETWORKING)
Expand All @@ -269,18 +280,27 @@ namespace hpx::util {

rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank));
rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name());
void* max_tag_p;
int flag;
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);

scoped_lock l;

void* max_tag_p = nullptr;
int flag = 0;
int const ret =
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);

if (flag)
MPI_MAX_TAG = *(int*) max_tag_p;
{
MPI_MAX_TAG =
static_cast<int>(*static_cast<int*>(max_tag_p) & ~MPI_ACK_MASK);
}
}

std::string mpi_environment::get_processor_name()
{
scoped_lock l;

char name[MPI_MAX_PROCESSOR_NAME + 1] = {'\0'};
char name[MPI_MAX_PROCESSOR_NAME + 1] = {};
int len = 0;
MPI_Get_processor_name(name, &len);

Expand Down Expand Up @@ -397,6 +417,56 @@ namespace hpx::util {
mtx_.unlock();
}
}

namespace {

[[noreturn]] void report_error(
hpx::source_location const& sl, int error_code)
{
char error_string[MPI_MAX_ERROR_STRING + 1];
int error_len = sizeof(error_string);
int const ret =
MPI_Error_string(error_code, error_string, &error_len);
if (ret != MPI_SUCCESS)
{
HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
sl.function_name(),
"MPI error (%s/%d): couldn't retrieve error string for "
"code %d",
sl.file_name(), sl.line(), error_code);
}

HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
sl.function_name(), "MPI error (%s/%d): %s", sl.file_name(),
sl.line(), error_string);
}
} // namespace

void mpi_environment::check_mpi_error(
scoped_lock& l, hpx::source_location const& sl, int error_code)
{
if (error_code == MPI_SUCCESS)
{
return;
}

l.unlock();

report_error(sl, error_code);
}

void mpi_environment::check_mpi_error(
scoped_try_lock& l, hpx::source_location const& sl, int error_code)
{
if (error_code == MPI_SUCCESS)
{
return;
}

l.unlock();

report_error(sl, error_code);
}
} // namespace hpx::util

#endif
Loading
Loading