Skip to content

Commit 5146387

Browse files
authored
Merge branch 'main' into p3892
2 parents 531d86a + 712953b commit 5146387

File tree

3 files changed

+38
-20
lines changed

3 files changed

+38
-20
lines changed

include/stdexec/__detail/__when_all.hpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,12 @@ namespace stdexec {
137137
using __f = __meval<
138138
__concat_completion_signatures,
139139
__meval<__eptr_completion_if_t, __all_nothrow_decay_copyable_results<_Senders...>>,
140-
completion_signatures<set_stopped_t()>,
141140
__minvoke<__with_default<__qq<__set_values_sig_t>, completion_signatures<>>, _Senders...>,
142141
__transform_completion_signatures<
143142
__completion_signatures_of_t<_Senders, _Env...>,
144143
__mconst<completion_signatures<>>::__f,
145144
__set_error_t,
146-
completion_signatures<>,
145+
completion_signatures<set_stopped_t()>,
147146
__concat_completion_signatures
148147
>...
149148
>;
@@ -191,7 +190,7 @@ namespace stdexec {
191190

192191
struct _INVALID_ARGUMENTS_TO_WHEN_ALL_ { };
193192

194-
template <class _ErrorsVariant, class _ValuesTuple, class _StopToken>
193+
template <class _ErrorsVariant, class _ValuesTuple, class _StopToken, bool _SendsStopped>
195194
struct __when_all_state {
196195
using __stop_callback_t = stop_callback_for_t<_StopToken, __on_stop_request>;
197196

@@ -222,7 +221,11 @@ namespace stdexec {
222221
}
223222
break;
224223
case __stopped:
225-
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
224+
if constexpr (_SendsStopped) {
225+
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
226+
} else {
227+
STDEXEC_UNREACHABLE();
228+
}
226229
break;
227230
default:;
228231
}
@@ -243,7 +246,11 @@ namespace stdexec {
243246
using _Traits = __traits<_Env, _Child...>;
244247
using _ErrorsVariant = _Traits::__errors_variant;
245248
using _ValuesTuple = _Traits::__values_tuple;
246-
using _State = __when_all_state<_ErrorsVariant, _ValuesTuple, stop_token_of_t<_Env>>;
249+
using _State = __when_all_state<
250+
_ErrorsVariant,
251+
_ValuesTuple,
252+
stop_token_of_t<_Env>,
253+
(sends_stopped<_Child, _Env> || ...)>;
247254
return _State{sizeof...(_Child)};
248255
};
249256
}
@@ -309,15 +316,9 @@ namespace stdexec {
309316
// register stop callback:
310317
__state.__on_stop_.emplace(
311318
get_stop_token(stdexec::get_env(__rcvr)), __on_stop_request{__state.__stop_source_});
312-
if (__state.__stop_source_.stop_requested()) {
313-
// Stop has already been requested. Don't bother starting
314-
// the child operations.
315-
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
316-
} else {
317-
(stdexec::start(__child_ops), ...);
318-
if constexpr (sizeof...(__child_ops) == 0) {
319-
__state.__complete(__rcvr);
320-
}
319+
(stdexec::start(__child_ops), ...);
320+
if constexpr (sizeof...(__child_ops) == 0) {
321+
__state.__complete(__rcvr);
321322
}
322323
};
323324

test/exec/test_fork.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace {
3434
STATIC_REQUIRE(
3535
set_equivalent<
3636
completion_signatures_of_t<decltype(sndr), env<>>,
37-
completion_signatures<set_value_t(), set_error_t(std::exception_ptr), set_stopped_t()>
37+
completion_signatures<set_value_t(), set_error_t(std::exception_ptr)>
3838
>);
3939
}
4040

@@ -65,7 +65,7 @@ namespace {
6565
STATIC_REQUIRE(
6666
set_equivalent<
6767
completion_signatures_of_t<decltype(sndr), env<>>,
68-
completion_signatures<set_value_t(int, int), set_error_t(std::exception_ptr), set_stopped_t()>
68+
completion_signatures<set_value_t(int, int), set_error_t(std::exception_ptr)>
6969
>);
7070

7171
auto [i1, i2] = sync_wait(sndr).value();

test/stdexec/algos/adaptors/test_when_all.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,14 @@ namespace {
232232
ex::when_all(ex::just(13), ex::just_error(std::exception_ptr{}), ex::just_stopped()));
233233
}
234234

235-
TEST_CASE("when_all has the sends_stopped == true", "[adaptors][when_all]") {
236-
check_sends_stopped<true>(ex::when_all(ex::just(13)));
237-
check_sends_stopped<true>(ex::when_all(ex::just_error(-1)));
235+
TEST_CASE(
236+
"when_all has sends_stopped == true if and only if at least one child sends stopped",
237+
"[adaptors][when_all]") {
238+
check_sends_stopped<false>(ex::when_all(ex::just(13)));
239+
check_sends_stopped<false>(ex::when_all(ex::just_error(-1)));
238240
check_sends_stopped<true>(ex::when_all(ex::just_stopped()));
239241

240-
check_sends_stopped<true>(ex::when_all(ex::just(3), ex::just(0.14)));
242+
check_sends_stopped<false>(ex::when_all(ex::just(3), ex::just(0.14)));
241243
check_sends_stopped<true>(ex::when_all(ex::just(3), ex::just_error(-1), ex::just_stopped()));
242244
}
243245

@@ -381,4 +383,19 @@ namespace {
381383
wait_for_value(std::move(snd), std::string{"hello world"});
382384
}
383385
}
386+
387+
TEST_CASE("when_all defers stop handling to its children", "[adaptors][when_all]") {
388+
ex::inplace_stop_source source;
389+
source.request_stop();
390+
auto snd = ex::when_all(ex::just(), ex::just());
391+
static_assert(set_equivalent<
392+
ex::completion_signatures_of_t<decltype(snd), ex::env<>>,
393+
ex::completion_signatures<ex::set_value_t()>>);
394+
auto env = ex::prop(ex::get_stop_token, source.get_token());
395+
static_assert(set_equivalent<
396+
ex::completion_signatures_of_t<decltype(snd), decltype(env)>,
397+
ex::completion_signatures<ex::set_value_t()>>);
398+
auto op = ex::connect(snd, expect_void_receiver{});
399+
ex::start(op);
400+
}
384401
} // namespace

0 commit comments

Comments
 (0)