Skip to content

Commit

Permalink
Fixing race in collective operations
Browse files Browse the repository at this point in the history
- adding test
  • Loading branch information
hkaiser committed Jan 22, 2024
1 parent b629cac commit 51f4d47
Show file tree
Hide file tree
Showing 3 changed files with 842 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,22 @@ namespace hpx::collectives::detail {
std::size_t generation, std::size_t capacity, F&& f, Lock& l)
{
HPX_ASSERT_OWNS_LOCK(l);
auto sf = gate_.get_shared_future(l);

traits::detail::get_shared_state(sf)->reserve_callbacks(
get_num_sites(capacity));

auto fut = sf.then(hpx::launch::sync, HPX_FORWARD(F, f));

// Wait for the requested generation to be processed.
gate_.synchronize(generation == static_cast<std::size_t>(-1) ?
gate_.generation(l) :
generation,
l);

return fut;
// Get future from gate only after synchronization as otherwise we
// may get a future returned that does not belong to the requested
// generation.
auto sf = gate_.get_shared_future(l);

traits::detail::get_shared_state(sf)->reserve_callbacks(
get_num_sites(capacity));

return sf.then(hpx::launch::sync, HPX_FORWARD(F, f));
}

template <typename Lock>
Expand All @@ -262,9 +265,16 @@ namespace hpx::collectives::detail {
"collective operation {}, which {}, generation {}.",
basename_, operation, which, generation);
}
current_operation_ = operation;

if (generation == static_cast<std::size_t>(-1) ||
generation == gate_.generation(l))
{
current_operation_ = operation;
}

return true;
}

return false;
}

Expand All @@ -284,6 +294,11 @@ namespace hpx::collectives::detail {
// This callback will be invoked once for each participating
// site after all sites have checked in.

// On exit, keep track of number of invocations of this
// callback.
auto on_exit = hpx::experimental::scope_exit(
[this] { ++on_ready_count_; });

f.get(); // propagate any exceptions

// It does not matter whether the lock will be acquired here. It
Expand Down Expand Up @@ -323,11 +338,6 @@ namespace hpx::collectives::detail {
on_ready_count_, num_sites_);
}

// On exit, keep track of number of invocations of this
// callback.
auto on_exit = hpx::experimental::scope_exit(
[this] { ++on_ready_count_; });

if constexpr (!std::is_same_v<std::nullptr_t,
std::decay_t<Finalizer>>)
{
Expand All @@ -338,8 +348,6 @@ namespace hpx::collectives::detail {
else
{
HPX_UNUSED(this);
HPX_UNUSED(which);
HPX_UNUSED(generation);
HPX_UNUSED(num_values);
HPX_UNUSED(finalizer);
}
Expand Down Expand Up @@ -373,7 +381,7 @@ namespace hpx::collectives::detail {

if constexpr (!std::is_same_v<std::nullptr_t, std::decay_t<Step>>)
{
// call provided step function for each invocation site
// Call provided step function for each invocation site.
HPX_FORWARD(Step, step)(access_data<Data>(num_values), which);
}

Expand All @@ -399,7 +407,7 @@ namespace hpx::collectives::detail {
"been invoked at the end of the collective {} "
"operation. Expected count {}, received count {}, "
"which {}, generation {}.",
*operation, on_ready_count_, num_sites_, which,
operation, on_ready_count_, num_sites_, which,
generation);
return;
}
Expand All @@ -416,7 +424,7 @@ namespace hpx::collectives::detail {
return f;
}

// protect against vector<bool> idiosyncrasies
// Protect against vector<bool> idiosyncrasies.
template <typename ValueType, typename Data>
static constexpr decltype(auto) handle_bool(Data&& data) noexcept
{
Expand Down
3 changes: 2 additions & 1 deletion libs/full/collectives/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023 Hartmut Kaiser
# Copyright (c) 2019-2024 Hartmut Kaiser
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -23,6 +23,7 @@ if(HPX_WITH_NETWORKING)
set(tests
${tests}
broadcast_direct
concurrent_collectives
exclusive_scan_
gather
inclusive_scan_
Expand Down
Loading

0 comments on commit 51f4d47

Please sign in to comment.