Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pika's transform_mpi and polling support #1125

Merged
merged 20 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ if(DLAF_WITH_SCALAPACK)
endif()

# ----- pika
find_package(pika 0.19.1 REQUIRED)
find_package(pika 0.30.0 REQUIRED)

# ----- BLASPP/LAPACKPP
find_package(blaspp REQUIRED)
Expand Down
2 changes: 1 addition & 1 deletion ci/common-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ stages:
reports:
dotenv: build.env
variables:
SPACK_SHA: develop-2024-10-06
SPACK_SHA: develop-2024-11-10
msimberg marked this conversation as resolved.
Show resolved Hide resolved
SPACK_DLAF_REPO: ./spack
DOCKER_BUILD_ARGS: '[
"BASE_IMAGE",
Expand Down
2 changes: 2 additions & 0 deletions ci/cuda/gcc11_codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ cuda gcc11 codecov build:

cuda gcc11 codecov test:
extends: .run_common
variables:
PIKA_MPI_ENABLE_POOL: 1
needs:
- cuda gcc11 codecov build
trigger:
Expand Down
2 changes: 2 additions & 0 deletions ci/cuda/gcc11_debug_scalapack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ cuda gcc11 debug scalapack build:

cuda gcc11 debug scalapack test:
extends: .run_common
variables:
PIKA_MPI_ENABLE_POOL: 1
needs:
- cuda gcc11 debug scalapack build
trigger:
Expand Down
2 changes: 2 additions & 0 deletions ci/cuda/gcc11_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ cuda gcc11 release build:

cuda gcc11 release test:
extends: .run_common
variables:
PIKA_MPI_ENABLE_POOL: 1
needs:
- cuda gcc11 release build
trigger:
Expand Down
2 changes: 2 additions & 0 deletions ci/cuda/gcc11_release_scalapack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ cuda gcc11 release scalapack build:

cuda gcc11 release scalapack test:
extends: .run_common
variables:
PIKA_MPI_ENABLE_POOL: 1
needs:
- cuda gcc11 release scalapack build
trigger:
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/debug-cuda-scalapack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ spack:
- '+fortran'
pika:
require:
- '@0.19.1'
- '@0.30.0'
- 'build_type=Debug'
- 'malloc=system'
7 changes: 0 additions & 7 deletions include/dlaf/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ struct configuration {
double umpire_device_memory_pool_coalescing_reallocation_ratio = 1.0;
std::size_t num_gpu_blas_handles = 16;
std::size_t num_gpu_lapack_handles = 16;
std::string mpi_pool = "mpi";
};

std::ostream& operator<<(std::ostream& os, const configuration& cfg);
Expand Down Expand Up @@ -107,10 +106,4 @@ struct [[nodiscard]] ScopedInitializer {
ScopedInitializer& operator=(ScopedInitializer&&) = delete;
ScopedInitializer& operator=(const ScopedInitializer&) = delete;
};

/// Initialize the MPI pool.
///
///
void initResourcePartitionerHandler(pika::resource::partitioner& rp,
const pika::program_options::variables_map& vm);
}
5 changes: 0 additions & 5 deletions include/dlaf/schedulers.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,4 @@ auto getBackendScheduler(
}
#endif
}

inline auto getMPIScheduler() {
return pika::execution::experimental::thread_pool_scheduler{
&pika::resource::get_thread_pool(getConfiguration().mpi_pool)};
}
} // namespace dlaf::internal
55 changes: 19 additions & 36 deletions include/dlaf/sender/transform_mpi.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <utility>

#include <pika/execution.hpp>
msimberg marked this conversation as resolved.
Show resolved Hide resolved
#include <pika/mpi.hpp>

#include <dlaf/common/consume_rvalues.h>
#include <dlaf/common/unwrap.h>
Expand All @@ -30,53 +31,35 @@ inline void consumeCommunicatorWrapper(CommunicatorPipelineExclusiveWrapper& com
[[maybe_unused]] auto comm_wrapper_local = std::move(comm_wrapper);
}

/// \overload consumeCommunicatorWrapper
/// \overload consumeCommunicatorWrapper (for non communicator types)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was commenting here when I realised (thanks @msimberg for checking with me) that consumeCommunicatorWrapper is not currently doing exactly this. I just opened #1218 for changing its behaviour to the expected and more correct one.

In the meanwhile I would suggest to not alter this comment.

template <typename T>
void consumeCommunicatorWrapper(T&) {}

/// Helper type for wrapping MPI calls.
///
/// Wrapper type around calls to MPI functions. Provides a call operator that
/// creates an MPI request and passes it as the last argument to the provided
/// callable. The wrapper then waits for the request to complete with
/// yield_while.
/// The wrapper explicitly releases any dla communicator objects when the pika::transform_mpi
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up what I commented above about consumeCommunicatorWrapper, I would change this comment to reflect what happens right now

Suggested change
/// The wrapper explicitly releases any dla communicator objects when the pika::transform_mpi
/// The wrapper explicitly releases any CommunicatorPipelineExclusiveWrapper object when the pika::transform_mpi

I will take care of fixing this comment in the other PR (that will come after this one) 😉

/// function returns (e.g. a message has been sent/posted) to prevent blocking access to many
/// queued mpi operations.
/// The mpi operations can complete asynchronously later, but the commmunicator is
/// released/made available once the mpi task has been safely initiated
///
/// This could in theory be a lambda inside transformMPI. However, clang at
/// least until version 12 fails with an internal compiler error with a trailing
/// decltype for SFINAE. GCC has no problems with a lambda.
template <typename F>
struct MPICallHelper {
std::decay_t<F> f;
template <typename... Ts>
auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...,
std::declval<MPI_Request*>())) {
MPI_Request req;
auto is_request_completed = [&req] {
int flag;
MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
return flag == 0;
};

// Note:
// Callables passed to transformMPI have their arguments passed by reference, but doing so
// with PromiseGuard would keep the guard alive until the completion of the MPI operation,
// whereas we are only looking to guard the submission of the MPI operation. We therefore
// explicitly release CommunicatorPipelineExclusiveWrapper after submitting the MPI operation
// with consumeCommunicatorWrapper.
//
// We also use unwrap various types passed to the MPI operation, including PromiseGuards of
// any type, to allow the MPI operation not to care whether a Communicator was wrapped in a
// PromiseGuard or not.
using result_type = decltype(std::move(f)(dlaf::common::internal::unwrap(ts)..., &req));
template <typename... Ts>
auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...)) {
using result_type = decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...));
if constexpr (std::is_void_v<result_type>) {
std::move(f)(dlaf::common::internal::unwrap(ts)..., &req);
std::move(f)(dlaf::common::internal::unwrap(ts)...);
(internal::consumeCommunicatorWrapper(ts), ...);
pika::util::yield_while(is_request_completed);
}
else {
auto r = std::move(f)(dlaf::common::internal::unwrap(ts)..., &req);
auto r = std::move(f)(dlaf::common::internal::unwrap(ts)...);
(internal::consumeCommunicatorWrapper(ts), ...);
pika::util::yield_while(is_request_completed);
return r;
}
}
Expand All @@ -85,16 +68,16 @@ struct MPICallHelper {
template <typename F>
MPICallHelper(F&&) -> MPICallHelper<std::decay_t<F>>;

/// Lazy transformMPI. This does not submit the work and returns a sender.
/// Lazy transformMPI. Returns a sender that will submit the work passed in
template <typename F, typename Sender,
typename = std::enable_if_t<pika::execution::experimental::is_sender_v<Sender>>>
[[nodiscard]] decltype(auto) transformMPI(F&& f, Sender&& sender) {
using dlaf::internal::continues_on;
namespace ex = pika::execution::experimental;

return continues_on(std::forward<Sender>(sender), dlaf::internal::getMPIScheduler()) |
ex::then(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::forward<F>(f)}}) |
ex::drop_operation_state();
biddisco marked this conversation as resolved.
Show resolved Hide resolved
using dlaf::common::internal::ConsumeRvalues;
using pika::execution::experimental::drop_operation_state;
using pika::mpi::experimental::transform_mpi;
return std::forward<Sender>(sender) //
| transform_mpi(ConsumeRvalues{MPICallHelper{std::forward<F>(f)}}) //
albestro marked this conversation as resolved.
Show resolved Hide resolved
| drop_operation_state();
}

template <typename F>
Expand Down
1 change: 0 additions & 1 deletion miniapp/miniapp_band_to_tridiag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_bt_band_to_tridiag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_bt_reduction_to_band.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_cholesky.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}

Expand Down
1 change: 0 additions & 1 deletion miniapp/miniapp_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_eigensolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}

Expand Down
1 change: 0 additions & 1 deletion miniapp/miniapp_gen_eigensolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}

Expand Down
1 change: 0 additions & 1 deletion miniapp/miniapp_gen_to_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_reduction_to_band.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_triangular_multiplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_triangular_solver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 0 additions & 1 deletion miniapp/miniapp_tridiag_solver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,5 @@ int main(int argc, char** argv) {

pika::init_params p;
p.desc_cmdline = desc_commandline;
p.rp_callback = dlaf::initResourcePartitionerHandler;
return pika::init(pika_main, argc, argv, p);
}
1 change: 1 addition & 0 deletions spack/packages/dla-future/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class DlaFuture(CMakePackage, CudaPackage, ROCmPackage):
depends_on("[email protected]:", when="@0.3")
depends_on("[email protected]:", when="@0.4.0:")
conflicts("^[email protected]:", when="@:0.6")
depends_on("[email protected]:", when="@0.7.0:")
depends_on("[email protected]:", when="@:0.2")
depends_on("pika +mpi")
depends_on("pika +cuda", when="+cuda")
Expand Down
1 change: 0 additions & 1 deletion src/c_api/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ void dlaf_initialize(int argc_pika, const char** argv_pika, int argc_dlaf,

// pika initialization
pika::init_params params;
params.rp_callback = dlaf::initResourcePartitionerHandler;
params.desc_cmdline = desc;
// After pika 0.21.0 pika::start reports errors only by exception and returns void
#if PIKA_VERSION_FULL >= 0x001500
Expand Down
Loading
Loading