diff --git a/CMakeLists.txt b/CMakeLists.txt index a626a64676..4e6131659f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -163,7 +163,7 @@ if(DLAF_WITH_SCALAPACK) endif() # ----- pika -find_package(pika 0.19.1 REQUIRED) +find_package(pika 0.30.0 REQUIRED) # ----- BLASPP/LAPACKPP find_package(blaspp REQUIRED) diff --git a/README.md b/README.md index 9a6cee3be7..79d909ae33 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,8 @@ export DLAF_INSTALL_PREFIX=`spack location -i dla-future` Then, you can configure your project with one of the following: ```bash -# By appending the value to the CMAKE_INSTALL_PREFIX -cmake -DCMAKE_INSTALL_PREFIX=${DLAF_INSTALL_PREFIX} .. +# By appending the value to the CMAKE_PREFIX_PATH +cmake -DCMAKE_PREFIX_PATH=${DLAF_INSTALL_PREFIX} .. # ... or by setting DLAF_DIR cmake -DDLAF_DIR="$DLAF_INSTALL_PREFIX/lib/cmake" .. diff --git a/ci/common-ci.yml b/ci/common-ci.yml index f070765240..5ecf679332 100644 --- a/ci/common-ci.yml +++ b/ci/common-ci.yml @@ -34,7 +34,7 @@ stages: reports: dotenv: build.env variables: - SPACK_SHA: develop-2024-10-06 + SPACK_SHA: develop-2024-11-10 SPACK_DLAF_REPO: ./spack DOCKER_BUILD_ARGS: '[ "BASE_IMAGE", diff --git a/ci/cuda/gcc11_codecov.yml b/ci/cuda/gcc11_codecov.yml index 31fcea1150..2011db1756 100644 --- a/ci/cuda/gcc11_codecov.yml +++ b/ci/cuda/gcc11_codecov.yml @@ -22,6 +22,8 @@ cuda gcc11 codecov build: cuda gcc11 codecov test: extends: .run_common + variables: + PIKA_MPI_ENABLE_POOL: 1 needs: - cuda gcc11 codecov build trigger: diff --git a/ci/cuda/gcc11_debug_scalapack.yml b/ci/cuda/gcc11_debug_scalapack.yml index 98b07a1d03..87443ef852 100644 --- a/ci/cuda/gcc11_debug_scalapack.yml +++ b/ci/cuda/gcc11_debug_scalapack.yml @@ -20,6 +20,8 @@ cuda gcc11 debug scalapack build: cuda gcc11 debug scalapack test: extends: .run_common + variables: + PIKA_MPI_ENABLE_POOL: 1 needs: - cuda gcc11 debug scalapack build trigger: diff --git a/ci/cuda/gcc11_release.yml b/ci/cuda/gcc11_release.yml index 40d2b20bf2..93799b4b66 100644 --- a/ci/cuda/gcc11_release.yml +++ b/ci/cuda/gcc11_release.yml @@ -21,6 +21,8 @@ cuda gcc11 release build: cuda gcc11 release test: extends: .run_common + variables: + PIKA_MPI_ENABLE_POOL: 1 needs: - cuda gcc11 release build trigger: diff --git a/ci/cuda/gcc11_release_scalapack.yml b/ci/cuda/gcc11_release_scalapack.yml index 5a668e4439..f222db5849 100644 --- a/ci/cuda/gcc11_release_scalapack.yml +++ b/ci/cuda/gcc11_release_scalapack.yml @@ -21,6 +21,8 @@ cuda gcc11 release scalapack build: cuda gcc11 release scalapack test: extends: .run_common + variables: + PIKA_MPI_ENABLE_POOL: 1 needs: - cuda gcc11 release scalapack build trigger: diff --git a/ci/docker/debug-cuda-scalapack.yaml b/ci/docker/debug-cuda-scalapack.yaml index 24d0785896..d2a5c5ef85 100644 --- a/ci/docker/debug-cuda-scalapack.yaml +++ b/ci/docker/debug-cuda-scalapack.yaml @@ -35,6 +35,6 @@ spack: - '+fortran' pika: require: - - '@0.19.1' + - '@0.30.0' - 'build_type=Debug' - 'malloc=system' diff --git a/include/dlaf/init.h b/include/dlaf/init.h index fe21d84ac5..31d78e4331 100644 --- a/include/dlaf/init.h +++ b/include/dlaf/init.h @@ -50,7 +50,6 @@ struct configuration { double umpire_device_memory_pool_coalescing_reallocation_ratio = 1.0; std::size_t num_gpu_blas_handles = 16; std::size_t num_gpu_lapack_handles = 16; - std::string mpi_pool = "mpi"; }; std::ostream& operator<<(std::ostream& os, const configuration& cfg); @@ -107,10 +106,4 @@ struct [[nodiscard]] ScopedInitializer { ScopedInitializer& operator=(ScopedInitializer&&) = delete; ScopedInitializer& operator=(const ScopedInitializer&) = delete; }; - -/// Initialize the MPI pool. -/// -/// -void initResourcePartitionerHandler(pika::resource::partitioner& rp, - const pika::program_options::variables_map& vm); } diff --git a/include/dlaf/schedulers.h b/include/dlaf/schedulers.h index 1b6aae04fa..4e1e7db0c1 100644 --- a/include/dlaf/schedulers.h +++ b/include/dlaf/schedulers.h @@ -47,9 +47,4 @@ auto getBackendScheduler( } #endif } - -inline auto getMPIScheduler() { - return pika::execution::experimental::thread_pool_scheduler{ - &pika::resource::get_thread_pool(getConfiguration().mpi_pool)}; -} } // namespace dlaf::internal diff --git a/include/dlaf/sender/transform_mpi.h b/include/dlaf/sender/transform_mpi.h index f5c2ac728a..4a07488d25 100644 --- a/include/dlaf/sender/transform_mpi.h +++ b/include/dlaf/sender/transform_mpi.h @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -30,16 +31,17 @@ inline void consumeCommunicatorWrapper(CommunicatorPipelineExclusiveWrapper& com [[maybe_unused]] auto comm_wrapper_local = std::move(comm_wrapper); } -/// \overload consumeCommunicatorWrapper +/// \overload consumeCommunicatorWrapper (for non communicator types) template void consumeCommunicatorWrapper(T&) {} /// Helper type for wrapping MPI calls. /// -/// Wrapper type around calls to MPI functions. Provides a call operator that -/// creates an MPI request and passes it as the last argument to the provided -/// callable. The wrapper then waits for the request to complete with -/// yield_while. +/// The wrapper explicitly releases any dla communicator objects when the pika::transform_mpi +/// function returns (e.g. a message has been sent/posted) to prevent blocking access to many +/// queued mpi operations. +/// The mpi operations can complete asynchronously later, but the commmunicator is +/// released/made available once the mpi task has been safely initiated /// /// This could in theory be a lambda inside transformMPI. However, clang at /// least until version 12 fails with an internal compiler error with a trailing @@ -47,36 +49,17 @@ void consumeCommunicatorWrapper(T&) {} template struct MPICallHelper { std::decay_t f; - template - auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)..., - std::declval())) { - MPI_Request req; - auto is_request_completed = [&req] { - int flag; - MPI_Test(&req, &flag, MPI_STATUS_IGNORE); - return flag == 0; - }; - // Note: - // Callables passed to transformMPI have their arguments passed by reference, but doing so - // with PromiseGuard would keep the guard alive until the completion of the MPI operation, - // whereas we are only looking to guard the submission of the MPI operation. We therefore - // explicitly release CommunicatorPipelineExclusiveWrapper after submitting the MPI operation - // with consumeCommunicatorWrapper. - // - // We also use unwrap various types passed to the MPI operation, including PromiseGuards of - // any type, to allow the MPI operation not to care whether a Communicator was wrapped in a - // PromiseGuard or not. - using result_type = decltype(std::move(f)(dlaf::common::internal::unwrap(ts)..., &req)); + template + auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...)) { + using result_type = decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...)); if constexpr (std::is_void_v) { - std::move(f)(dlaf::common::internal::unwrap(ts)..., &req); + std::move(f)(dlaf::common::internal::unwrap(ts)...); (internal::consumeCommunicatorWrapper(ts), ...); - pika::util::yield_while(is_request_completed); } else { - auto r = std::move(f)(dlaf::common::internal::unwrap(ts)..., &req); + auto r = std::move(f)(dlaf::common::internal::unwrap(ts)...); (internal::consumeCommunicatorWrapper(ts), ...); - pika::util::yield_while(is_request_completed); return r; } } @@ -85,16 +68,16 @@ struct MPICallHelper { template MPICallHelper(F&&) -> MPICallHelper>; -/// Lazy transformMPI. This does not submit the work and returns a sender. +/// Lazy transformMPI. Returns a sender that will submit the work passed in template >> [[nodiscard]] decltype(auto) transformMPI(F&& f, Sender&& sender) { - using dlaf::internal::continues_on; - namespace ex = pika::execution::experimental; - - return continues_on(std::forward(sender), dlaf::internal::getMPIScheduler()) | - ex::then(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::forward(f)}}) | - ex::drop_operation_state(); + using dlaf::common::internal::ConsumeRvalues; + using pika::execution::experimental::drop_operation_state; + using pika::mpi::experimental::transform_mpi; + return std::forward(sender) // + | transform_mpi(ConsumeRvalues{MPICallHelper{std::forward(f)}}) // + | drop_operation_state(); } template diff --git a/miniapp/miniapp_band_to_tridiag.cpp b/miniapp/miniapp_band_to_tridiag.cpp index b820515453..b272c6f3e9 100644 --- a/miniapp/miniapp_band_to_tridiag.cpp +++ b/miniapp/miniapp_band_to_tridiag.cpp @@ -209,6 +209,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_bt_band_to_tridiag.cpp b/miniapp/miniapp_bt_band_to_tridiag.cpp index 5b2830436a..93dd17248f 100644 --- a/miniapp/miniapp_bt_band_to_tridiag.cpp +++ b/miniapp/miniapp_bt_band_to_tridiag.cpp @@ -219,6 +219,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_bt_reduction_to_band.cpp b/miniapp/miniapp_bt_reduction_to_band.cpp index 8fcc85d846..b237f7cde3 100644 --- a/miniapp/miniapp_bt_reduction_to_band.cpp +++ b/miniapp/miniapp_bt_reduction_to_band.cpp @@ -238,6 +238,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_cholesky.cpp b/miniapp/miniapp_cholesky.cpp index 362a96aad1..3db45e8957 100644 --- a/miniapp/miniapp_cholesky.cpp +++ b/miniapp/miniapp_cholesky.cpp @@ -228,7 +228,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_communication.cpp b/miniapp/miniapp_communication.cpp index dd147d6d26..21bfdb6570 100644 --- a/miniapp/miniapp_communication.cpp +++ b/miniapp/miniapp_communication.cpp @@ -606,6 +606,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_eigensolver.cpp b/miniapp/miniapp_eigensolver.cpp index 296f52d471..c12b259de2 100644 --- a/miniapp/miniapp_eigensolver.cpp +++ b/miniapp/miniapp_eigensolver.cpp @@ -257,7 +257,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_gen_eigensolver.cpp b/miniapp/miniapp_gen_eigensolver.cpp index 20bc04e590..5b96110894 100644 --- a/miniapp/miniapp_gen_eigensolver.cpp +++ b/miniapp/miniapp_gen_eigensolver.cpp @@ -286,7 +286,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_gen_to_std.cpp b/miniapp/miniapp_gen_to_std.cpp index 6b96e98252..9052ac40bc 100644 --- a/miniapp/miniapp_gen_to_std.cpp +++ b/miniapp/miniapp_gen_to_std.cpp @@ -216,6 +216,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_reduction_to_band.cpp b/miniapp/miniapp_reduction_to_band.cpp index 306bb473a8..3dc33cbdaa 100644 --- a/miniapp/miniapp_reduction_to_band.cpp +++ b/miniapp/miniapp_reduction_to_band.cpp @@ -257,6 +257,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_triangular_multiplication.cpp b/miniapp/miniapp_triangular_multiplication.cpp index 4078a0fd48..3fbf47540e 100644 --- a/miniapp/miniapp_triangular_multiplication.cpp +++ b/miniapp/miniapp_triangular_multiplication.cpp @@ -225,6 +225,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_triangular_solver.cpp b/miniapp/miniapp_triangular_solver.cpp index 149c54567d..c4cd6bb514 100644 --- a/miniapp/miniapp_triangular_solver.cpp +++ b/miniapp/miniapp_triangular_solver.cpp @@ -244,6 +244,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_tridiag_solver.cpp b/miniapp/miniapp_tridiag_solver.cpp index 16d0789c84..038b91875b 100644 --- a/miniapp/miniapp_tridiag_solver.cpp +++ b/miniapp/miniapp_tridiag_solver.cpp @@ -225,6 +225,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/scripts/systems.py b/scripts/systems.py index 8d88bb471d..2df5194c56 100644 --- a/scripts/systems.py +++ b/scripts/systems.py @@ -88,6 +88,7 @@ export MPICH_MAX_THREAD_SAFETY=multiple export MIMALLOC_EAGER_COMMIT_DELAY=0 export MIMALLOC_ALLOW_LARGE_OS_PAGES=1 +export DLAF_BT_BAND_TO_TRIDIAG_HH_APPLY_GROUP_SIZE=128 # Debug module list &> modules_{bs_name}.txt diff --git a/spack/packages/dla-future/package.py b/spack/packages/dla-future/package.py index 927279f9ab..b88cc16488 100644 --- a/spack/packages/dla-future/package.py +++ b/spack/packages/dla-future/package.py @@ -93,6 +93,7 @@ class DlaFuture(CMakePackage, CudaPackage, ROCmPackage): depends_on("pika@0.18:", when="@0.3") depends_on("pika@0.19.1:", when="@0.4.0:") conflicts("^pika@0.28:", when="@:0.6") + depends_on("pika@0.30:", when="@0.7.0:") depends_on("pika-algorithms@0.1:", when="@:0.2") depends_on("pika +mpi") depends_on("pika +cuda", when="+cuda") diff --git a/src/c_api/init.cpp b/src/c_api/init.cpp index 0a36a70226..bd34b57ccf 100644 --- a/src/c_api/init.cpp +++ b/src/c_api/init.cpp @@ -26,7 +26,6 @@ void dlaf_initialize(int argc_pika, const char** argv_pika, int argc_dlaf, // pika initialization pika::init_params params; - params.rp_callback = dlaf::initResourcePartitionerHandler; params.desc_cmdline = desc; // After pika 0.21.0 pika::start reports errors only by exception and returns void #if PIKA_VERSION_FULL >= 0x001500 diff --git a/src/init.cpp b/src/init.cpp index 0d4366e15e..ff5d79824c 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -28,6 +29,7 @@ #include namespace dlaf { + std::ostream& operator<<(std::ostream& os, const configuration& cfg) { // clang-format off os << " num_np_gpu_streams_per_thread = " << cfg.num_np_gpu_streams_per_thread << std::endl; @@ -44,7 +46,7 @@ std::ostream& operator<<(std::ostream& os, const configuration& cfg) { os << " umpire_device_memory_pool_coalescing_reallocation_ratio = " << cfg.umpire_device_memory_pool_coalescing_reallocation_ratio << std::endl; os << " num_gpu_blas_handles = " << cfg.num_gpu_blas_handles << std::endl; os << " num_gpu_lapack_handles = " << cfg.num_gpu_lapack_handles << std::endl; - os << " mpi_pool = " << cfg.mpi_pool << std::endl; + os << " mpi_pool = " << pika::mpi::experimental::get_pool_name() << std::endl; // clang-format on return os; } @@ -55,6 +57,11 @@ bool& initialized() { return i; } +int& mpi_initialized() { + static int i = 0; + return i; +} + template struct Init { // Initialization and finalization does nothing by default. Behaviour can be @@ -258,22 +265,6 @@ void updateConfiguration(const pika::program_options::variables_map& vm, configu warnUnusedConfigurationOption(vm, "NUM_GPU_BLAS_HANDLES", "num-gpu-blas-handles", "only supported with pika 0.29.0 or newer"); warnUnusedConfigurationOption(vm, "NUM_GPU_LAPACK_HANDLES", "num-gpu-lapack-handles", "only supported with pika 0.29.0 or newer"); #endif - // clang-format on - cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default"; - - // Warn if not using MPI pool without --dlaf:no-mpi-pool - int mpi_initialized; - DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized)); - if (mpi_initialized) { - int ntasks; - DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks)); - if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as()) { - std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for " - "MPI communication but --dlaf:no-mpi-pool is not set. This may " - "indicate a bug in DLA-Future or pika. Performance may be degraded." - << std::endl; - } - } // update tune parameters // @@ -369,15 +360,15 @@ void initialize(const pika::program_options::variables_map& vm, const configurat std::exit(0); } - int mpi_initialized; - DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized)); - if (mpi_initialized) { + DLAF_MPI_CHECK_ERROR(MPI_Initialized(&dlaf::internal::mpi_initialized())); + if (dlaf::internal::mpi_initialized()) { int provided; DLAF_MPI_CHECK_ERROR(MPI_Query_thread(&provided)); if (provided < MPI_THREAD_MULTIPLE) { std::cerr << "MPI must be initialized to `MPI_THREAD_MULTIPLE` for DLA-Future!\n"; MPI_Abort(MPI_COMM_WORLD, 1); } + pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler); } DLAF_ASSERT(!internal::initialized(), ""); @@ -404,6 +395,9 @@ void finalize() { #ifdef DLAF_WITH_GPU internal::Init::finalize(); #endif + if (dlaf::internal::mpi_initialized()) { + pika::mpi::experimental::stop_polling(); + } internal::getConfiguration() = {}; internal::initialized() = false; } @@ -420,31 +414,4 @@ ScopedInitializer::ScopedInitializer(int argc, const char* const argv[], const c ScopedInitializer::~ScopedInitializer() { finalize(); } - -void initResourcePartitionerHandler(pika::resource::partitioner& rp, - const pika::program_options::variables_map& vm) { - // Don't create the MPI pool if the user disabled it - if (vm["dlaf:no-mpi-pool"].as()) - return; - - // Don't create the MPI pool if there is a single process - int ntasks; - DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks)); - if (ntasks == 1) - return; - - // Disable idle backoff on the MPI pool - using pika::threads::scheduler_mode; - auto mode = scheduler_mode::default_mode; - mode = scheduler_mode(mode & ~scheduler_mode::enable_idle_backoff); - - // Create a thread pool with a single core that we will use for all - // communication related tasks - rp.create_thread_pool("mpi", pika::resource::scheduling_policy::static_priority, mode); -#if PIKA_VERSION_FULL >= 0x001C00 // >= 0.28.0 - rp.add_resource(rp.sockets()[0].cores()[0].pus()[0], "mpi"); -#else - rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "mpi"); -#endif -} } diff --git a/test/src/gtest_mpipika_main.cpp b/test/src/gtest_mpipika_main.cpp index a7f9b23e09..2c40f6e14b 100644 --- a/test/src/gtest_mpipika_main.cpp +++ b/test/src/gtest_mpipika_main.cpp @@ -103,7 +103,6 @@ GTEST_API_ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; // Initialize pika auto ret = pika::init(test_main, argc, argv, p); diff --git a/test/unit/communication/test_comm_sender.cpp b/test/unit/communication/test_comm_sender.cpp index 3083301660..df8d12622a 100644 --- a/test/unit/communication/test_comm_sender.cpp +++ b/test/unit/communication/test_comm_sender.cpp @@ -43,10 +43,7 @@ void test_transform_mpi() { auto send = just(send_buf.data(), size, dtype, send_rank, tag, comm) | transformMPI(MPI_Isend); auto recv = just(recv_buf.data(), size, dtype, recv_rank, tag, comm) | transformMPI(MPI_Irecv); - sync_wait(when_all(std::move(send), std::move(recv)) | then([](int e1, int e2) { - DLAF_MPI_CHECK_ERROR(e1); - DLAF_MPI_CHECK_ERROR(e2); - })); + sync_wait(when_all(std::move(send), std::move(recv))); std::vector expected_recv_buf(static_cast(size), recv_rank); @@ -65,8 +62,7 @@ TEST(Bcast, Polling) { double val = (comm.rank() == root_rank) ? 4.2 : 1.2; std::vector buf(static_cast(size), val); - sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) | - then([](int e) { DLAF_MPI_CHECK_ERROR(e); })); + sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast)); std::vector expected_buf(static_cast(size), 4.2); ASSERT_TRUE(expected_buf == buf); diff --git a/test/unit/communication/test_transform_mpi.cpp b/test/unit/communication/test_transform_mpi.cpp index bf9f857e0d..86fe68ed35 100644 --- a/test/unit/communication/test_transform_mpi.cpp +++ b/test/unit/communication/test_transform_mpi.cpp @@ -65,11 +65,7 @@ TEST_F(TransformMPITest, PromiseGuardManagement) { int message; whenAllLift(&message, 1, MPI_INT, 1, 0, chain.exclusive()) | transformMPI(MPI_Irecv) | - ex::then([&sent_guard](auto mpi_err_code) { - EXPECT_EQ(MPI_SUCCESS, mpi_err_code); - sent_guard = true; - }) | - ex::ensure_started(); + ex::then([&sent_guard]() { sent_guard = true; }) | ex::ensure_started(); // Note: // At this point IRecv is (getting) posted but it won't complete until this Rank 0 will trigger