|
| 1 | +/* |
| 2 | + * Copyright (c) 2023 Maikel Nadolski |
| 3 | + * Copyright (c) 2023 NVIDIA Corporation |
| 4 | + * |
| 5 | + * Licensed under the Apache License Version 2.0 with LLVM Exceptions |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * https://llvm.org/LICENSE.txt |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | +#pragma once |
| 18 | + |
| 19 | +#include "../../stdexec/concepts.hpp" |
| 20 | +#include "../../stdexec/execution.hpp" |
| 21 | +#include "../sequence_senders.hpp" |
| 22 | + |
| 23 | +#include "../__detail/__basic_sequence.hpp" |
| 24 | +#include "./transform_each.hpp" |
| 25 | +#include "./ignore_all_values.hpp" |
| 26 | +#include "stdexec/__detail/__execution_fwd.hpp" |
| 27 | +#include "stdexec/__detail/__meta.hpp" |
| 28 | +#include "stdexec/__detail/__senders_core.hpp" |
| 29 | +#include "stdexec/__detail/__transform_completion_signatures.hpp" |
| 30 | + |
| 31 | +namespace exec { |
| 32 | + namespace __merge { |
| 33 | + using namespace stdexec; |
| 34 | + |
| 35 | + template <class _Receiver> |
| 36 | + struct __operation_base { |
| 37 | + _Receiver __receiver_; |
| 38 | + }; |
| 39 | + |
| 40 | + template <class _ReceiverId> |
| 41 | + struct __result_receiver { |
| 42 | + using _Receiver = stdexec::__t<_ReceiverId>; |
| 43 | + |
| 44 | + struct __t { |
| 45 | + using receiver_concept = stdexec::receiver_t; |
| 46 | + using __id = __result_receiver; |
| 47 | + |
| 48 | + __operation_base<_Receiver>* __op_; |
| 49 | + |
| 50 | + void set_value() noexcept { |
| 51 | + stdexec::set_value(static_cast<_Receiver&&>(__op_->__receiver_)); |
| 52 | + } |
| 53 | + |
| 54 | + template <class _Error> |
| 55 | + void set_error(_Error&& __error) noexcept { |
| 56 | + stdexec::set_error( |
| 57 | + static_cast<_Receiver&&>(__op_->__receiver_), static_cast<_Error&&>(__error)); |
| 58 | + } |
| 59 | + |
| 60 | + void set_stopped() noexcept { |
| 61 | + stdexec::set_stopped(static_cast<_Receiver&&>(__op_->__receiver_)); |
| 62 | + } |
| 63 | + |
| 64 | + auto get_env() const noexcept -> env_of_t<_Receiver> { |
| 65 | + return stdexec::get_env(__op_->__receiver_); |
| 66 | + } |
| 67 | + }; |
| 68 | + }; |
| 69 | + |
| 70 | + template <class _ReceiverId> |
| 71 | + struct __merge_each_fn { |
| 72 | + using _Receiver = stdexec::__t<_ReceiverId>; |
| 73 | + |
| 74 | + template <stdexec::sender _Item> |
| 75 | + auto operator()(_Item&& __item, __operation_base<_Receiver>* __op) const |
| 76 | + noexcept(__nothrow_callable<set_next_t, _Receiver&, _Item>) |
| 77 | + -> next_sender_of_t<_Receiver, _Item> { |
| 78 | + return exec::set_next(__op->__receiver_, static_cast<_Item&&>(__item)); |
| 79 | + } |
| 80 | + }; |
| 81 | + |
| 82 | + struct __combine { |
| 83 | + template <class _ReceiverId> |
| 84 | + using merge_each_fn_t = |
| 85 | + __binder_back<__merge_each_fn<_ReceiverId>, __operation_base<__t<_ReceiverId>>*>; |
| 86 | + |
| 87 | + template <class _Sequence, class _ReceiverId> |
| 88 | + using transform_sender_t = |
| 89 | + __call_result_t<exec::transform_each_t, _Sequence, merge_each_fn_t<_ReceiverId>>; |
| 90 | + template <class _Sequence, class _ReceiverId> |
| 91 | + using ignored_sender_t = |
| 92 | + __call_result_t<exec::ignore_all_values_t, transform_sender_t<_Sequence, _ReceiverId>>; |
| 93 | + |
| 94 | + template <class _ReceiverId, class... _Sequences> |
| 95 | + using result_sender_t = |
| 96 | + __call_result_t<when_all_t, ignored_sender_t<_Sequences, _ReceiverId>...>; |
| 97 | + }; |
| 98 | + |
| 99 | + template <class _ReceiverId, class... _Sequences> |
| 100 | + struct __operation { |
| 101 | + using _Receiver = stdexec::__t<_ReceiverId>; |
| 102 | + |
| 103 | + using merge_each_fn_t = typename __combine::merge_each_fn_t<_ReceiverId>; |
| 104 | + |
| 105 | + template <class _ReceiverIdDependent> |
| 106 | + using result_sender_t = |
| 107 | + typename __combine::result_sender_t<_ReceiverIdDependent, _Sequences...>; |
| 108 | + |
| 109 | + struct __t : __operation_base<_Receiver> { |
| 110 | + using __id = __operation; |
| 111 | + |
| 112 | + connect_result_t<result_sender_t<_ReceiverId>, stdexec::__t<__result_receiver<_ReceiverId>>> |
| 113 | + __op_result_; |
| 114 | + |
| 115 | + __t(_Receiver __rcvr, _Sequences... __sequences) |
| 116 | + : __operation_base<_Receiver>{static_cast<_Receiver&&>(__rcvr)} |
| 117 | + , __op_result_{stdexec::connect( |
| 118 | + stdexec::when_all( |
| 119 | + exec::ignore_all_values( |
| 120 | + exec::transform_each( |
| 121 | + static_cast<_Sequences&&>(__sequences), |
| 122 | + merge_each_fn_t{{this}, {}, {}}))...), |
| 123 | + stdexec::__t<__result_receiver<_ReceiverId>>{this})} { |
| 124 | + } |
| 125 | + |
| 126 | + void start() & noexcept { |
| 127 | + stdexec::start(__op_result_); |
| 128 | + } |
| 129 | + }; |
| 130 | + }; |
| 131 | + |
| 132 | + template <class _Receiver> |
| 133 | + struct __subscribe_fn { |
| 134 | + _Receiver& __rcvr_; |
| 135 | + |
| 136 | + template <class... _Sequences> |
| 137 | + auto operator()(__ignore, __ignore, _Sequences... __sequences) noexcept( |
| 138 | + (__nothrow_decay_copyable<_Sequences> && ...) && __nothrow_move_constructible<_Receiver>) |
| 139 | + -> __t<__operation<__id<_Receiver>, _Sequences...>> { |
| 140 | + return {static_cast<_Receiver&&>(__rcvr_), static_cast<_Sequences&&>(__sequences)...}; |
| 141 | + } |
| 142 | + }; |
| 143 | + |
| 144 | + struct merge_t { |
| 145 | + template <class... _Sequences> |
| 146 | + auto operator()(_Sequences&&... __sequences) const |
| 147 | + noexcept((__nothrow_decay_copyable<_Sequences> && ...)) -> __well_formed_sequence_sender |
| 148 | + auto { |
| 149 | + auto __domain = __common_domain_t<_Sequences...>(); |
| 150 | + return transform_sender( |
| 151 | + __domain, make_sequence_expr<merge_t>(__(), static_cast<_Sequences&&>(__sequences)...)); |
| 152 | + } |
| 153 | + |
| 154 | + template <class _Error> |
| 155 | + using __set_error_t = completion_signatures<set_error_t(__decay_t<_Error>)>; |
| 156 | + |
| 157 | + struct _INVALID_ARGUMENTS_TO_MERGE_ { }; |
| 158 | + |
| 159 | + template <class _Self, class _Env> |
| 160 | + using __error_t = __mexception< |
| 161 | + _INVALID_ARGUMENTS_TO_MERGE_, |
| 162 | + __children_of<_Self, __q<_WITH_SEQUENCES_>>, |
| 163 | + _WITH_ENVIRONMENT_<_Env> |
| 164 | + >; |
| 165 | + |
| 166 | + template <class... _Env> |
| 167 | + struct __completions_fn_t { |
| 168 | + |
| 169 | + template <class... _Sequences> |
| 170 | + using __f = __meval< |
| 171 | + __concat_completion_signatures, |
| 172 | + completion_signatures<set_stopped_t()>, |
| 173 | + __sequence_completion_signatures_of_t<_Sequences, _Env...>... |
| 174 | + >; |
| 175 | + }; |
| 176 | + |
| 177 | + template <class _Self, class... _Env> |
| 178 | + using __completions_t = __children_of<_Self, __completions_fn_t<_Env...>>; |
| 179 | + |
| 180 | + template <sender_expr_for<merge_t> _Self, class... _Env> |
| 181 | + static auto get_completion_signatures(_Self&&, _Env&&...) noexcept { |
| 182 | + return __minvoke<__mtry_catch<__q<__completions_t>, __q<__error_t>>, _Self, _Env...>(); |
| 183 | + } |
| 184 | + |
| 185 | + template <class... _Env> |
| 186 | + struct __items_fn_t { |
| 187 | + |
| 188 | + template <class... _Sequences> |
| 189 | + using __f = stdexec::__mapply< |
| 190 | + stdexec::__munique<stdexec::__q<exec::item_types>>, |
| 191 | + stdexec::__minvoke< |
| 192 | + stdexec::__mconcat<stdexec::__qq<exec::item_types>>, |
| 193 | + __item_types_of_t<_Sequences, _Env...>... |
| 194 | + > |
| 195 | + >; |
| 196 | + }; |
| 197 | + |
| 198 | + template <class _Self, class... _Env> |
| 199 | + using __items_t = __children_of<_Self, __items_fn_t<_Env...>>; |
| 200 | + |
| 201 | + template <sender_expr_for<merge_t> _Self, class... _Env> |
| 202 | + static auto get_item_types(_Self&&, _Env&&...) noexcept { |
| 203 | + return __minvoke<__mtry_catch<__q<__items_t>, __q<__error_t>>, _Self, _Env...>(); |
| 204 | + } |
| 205 | + |
| 206 | + template <sender_expr_for<merge_t> _Self, receiver _Receiver> |
| 207 | + static auto subscribe(_Self&& __self, _Receiver __rcvr) |
| 208 | + noexcept(__nothrow_callable<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>>) |
| 209 | + -> __sexpr_apply_result_t<_Self, __subscribe_fn<_Receiver>> { |
| 210 | + return __sexpr_apply(static_cast<_Self&&>(__self), __subscribe_fn<_Receiver>{__rcvr}); |
| 211 | + } |
| 212 | + }; |
| 213 | + } // namespace __merge |
| 214 | + |
| 215 | + using __merge::merge_t; |
| 216 | + inline constexpr merge_t merge{}; |
| 217 | +} // namespace exec |
0 commit comments