Skip to content

Commit 07cbea6

Browse files
committed
added exec::reduce including a customization for exec::static_thread_pool
1 parent 4f3e79f commit 07cbea6

File tree

4 files changed

+693
-4
lines changed

4 files changed

+693
-4
lines changed

include/exec/reduce.hpp

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright (c) 2023 NVIDIA Corporation
3+
*
4+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
5+
* (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://llvm.org/LICENSE.txt
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "../stdexec/execution.hpp"
19+
20+
#include <numeric>
21+
#include <ranges>
22+
23+
namespace exec {
24+
25+
namespace __reduce {
26+
27+
template <class _ReceiverId, class _InitT, class _RedOp>
28+
struct __receiver {
29+
using _Receiver = stdexec::__t<_ReceiverId>;
30+
31+
struct __data {
32+
_Receiver __rcvr_;
33+
STDEXEC_NO_UNIQUE_ADDRESS _InitT __init_;
34+
STDEXEC_NO_UNIQUE_ADDRESS _RedOp __redop_;
35+
};
36+
37+
struct __t {
38+
using __id = __receiver;
39+
__data* __op_;
40+
41+
template <
42+
stdexec::__same_as<stdexec::set_value_t> _Tag,
43+
class _Range,
44+
class _Value = stdexec::range_value_t<_Range>>
45+
requires stdexec::invocable<_RedOp, _InitT, _Value>
46+
&& stdexec::__receiver_of_invoke_result<_Receiver, _RedOp, _InitT, _Value>
47+
friend void tag_invoke(_Tag, __t&& __self, _Range&& __range) noexcept {
48+
auto result = std::reduce(
49+
std::ranges::begin(__range), std::ranges::end(__range), __self.__op_->__init_, __self.__op_->__redop_);
50+
51+
stdexec::set_value((_Receiver&&) __self.__op_->__rcvr_, std::move(result));
52+
}
53+
54+
template <stdexec::__one_of<stdexec::set_error_t, stdexec::set_stopped_t> _Tag, class... _As>
55+
requires stdexec::__callable<_Tag, _Receiver, _As...>
56+
friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept {
57+
__tag((_Receiver&&) __self.__op_->__rcvr_, (_As&&) __as...);
58+
}
59+
60+
friend auto tag_invoke(stdexec::get_env_t, const __t& __self) noexcept
61+
-> stdexec::env_of_t<const _Receiver&> {
62+
return stdexec::get_env(__self.__op_->__rcvr_);
63+
}
64+
};
65+
};
66+
67+
template <class _Sender, class _ReceiverId, class _InitT, class _RedOp>
68+
struct __operation {
69+
using _Receiver = stdexec::__t<_ReceiverId>;
70+
using __receiver_id = __receiver<_ReceiverId, _InitT, _RedOp>;
71+
using __receiver_t = stdexec::__t<__receiver_id>;
72+
73+
struct __t : stdexec::__immovable {
74+
using __id = __operation;
75+
typename __receiver_id::__data __data_;
76+
stdexec::connect_result_t<_Sender, __receiver_t> __op_;
77+
78+
__t(_Sender&& __sndr, _Receiver __rcvr, _InitT __init, _RedOp __redop) noexcept(
79+
stdexec::__nothrow_decay_copyable<_Receiver> && stdexec::__nothrow_decay_copyable<_RedOp>
80+
&& stdexec::__nothrow_connectable<_Sender, __receiver_t>)
81+
: __data_{(_Receiver&&) __rcvr, (_InitT&&) __init, (_RedOp&&) __redop}
82+
, __op_(stdexec::connect((_Sender&&) __sndr, __receiver_t{&__data_})) {
83+
}
84+
85+
friend void tag_invoke(stdexec::start_t, __t& __self) noexcept {
86+
stdexec::start(__self.__op_);
87+
}
88+
};
89+
};
90+
91+
template <class _SenderId, class _InitT, class _RedOp>
92+
struct __sender {
93+
using _Sender = stdexec::__t<_SenderId>;
94+
template <class _Receiver>
95+
using __receiver = stdexec::__t<__receiver<stdexec::__id<_Receiver>, _InitT, _RedOp>>;
96+
template <class _Self, class _Receiver>
97+
using __operation = stdexec::__t<
98+
__operation<stdexec::__copy_cvref_t<_Self, _Sender>, stdexec::__id<_Receiver>, _InitT, _RedOp>>;
99+
100+
struct __t {
101+
using __id = __sender;
102+
using is_sender = void;
103+
STDEXEC_NO_UNIQUE_ADDRESS _Sender __sndr_;
104+
STDEXEC_NO_UNIQUE_ADDRESS _InitT __init_;
105+
STDEXEC_NO_UNIQUE_ADDRESS _RedOp __redop_;
106+
107+
template <stdexec::__decays_to<__t> _Self, stdexec::receiver _Receiver>
108+
requires stdexec::sender_to<stdexec::__copy_cvref_t<_Self, _Sender>, __receiver<_Receiver>>
109+
friend auto tag_invoke(stdexec::connect_t, _Self&& __self, _Receiver __rcvr) noexcept(
110+
stdexec::__nothrow_constructible_from<
111+
__operation<_Self, _Receiver>,
112+
stdexec::__copy_cvref_t<_Self, _Sender>,
113+
_Receiver&&,
114+
stdexec::__copy_cvref_t<_Self, _InitT>,
115+
stdexec::__copy_cvref_t<_Self, _RedOp>>) -> __operation<_Self, _Receiver> {
116+
return {
117+
((_Self&&) __self).__sndr_,
118+
(_Receiver&&) __rcvr,
119+
((_Self&&) __self).__init_,
120+
((_Self&&) __self).__redop_};
121+
}
122+
123+
template <stdexec::__decays_to<__t> _Self, class _Env>
124+
friend auto tag_invoke(stdexec::get_completion_signatures_t, _Self&&, _Env&&)
125+
-> stdexec::dependent_completion_signatures<_Env>;
126+
127+
template <stdexec::__decays_to<__t> _Self, class _Env>
128+
friend auto tag_invoke(stdexec::get_completion_signatures_t, _Self&&, _Env&&)
129+
-> stdexec::completion_signatures<stdexec::set_value_t(_InitT)>
130+
requires true;
131+
132+
friend auto tag_invoke(stdexec::get_env_t, const __t& __self) noexcept
133+
-> stdexec::env_of_t<const _Sender&> {
134+
return get_env(__self.__sndr_);
135+
}
136+
};
137+
};
138+
139+
struct reduce_t {
140+
template <stdexec::sender _Sender, class _InitT, class _RedOp>
141+
using __sender =
142+
stdexec::__t<__sender<stdexec::__id<stdexec::__decay_t<_Sender>>, _InitT, _RedOp>>;
143+
144+
template <stdexec::sender _Sender, class _InitT, stdexec::__movable_value _RedOp>
145+
requires stdexec::__tag_invocable_with_completion_scheduler<
146+
reduce_t,
147+
stdexec::set_value_t,
148+
_Sender,
149+
_InitT,
150+
_RedOp>
151+
stdexec::sender auto operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const noexcept(
152+
stdexec::nothrow_tag_invocable<
153+
reduce_t,
154+
stdexec::__completion_scheduler_for<_Sender, stdexec::set_value_t>,
155+
_Sender,
156+
_InitT,
157+
_RedOp>) {
158+
auto __sched = stdexec::get_completion_scheduler<stdexec::set_value_t>(stdexec::get_env(__sndr));
159+
return tag_invoke(
160+
reduce_t{}, std::move(__sched), (_Sender&&) __sndr, (_InitT&&) __init, (_RedOp&&) __redop);
161+
}
162+
163+
template <stdexec::sender _Sender, class _InitT, stdexec::__movable_value _RedOp>
164+
requires(!stdexec::__tag_invocable_with_completion_scheduler<
165+
reduce_t,
166+
stdexec::set_value_t,
167+
_Sender,
168+
_InitT,
169+
_RedOp>)
170+
&& stdexec::tag_invocable<reduce_t, _Sender, _InitT, _RedOp>
171+
stdexec::sender auto operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const
172+
noexcept(stdexec::nothrow_tag_invocable<reduce_t, _Sender, _InitT, _RedOp>) {
173+
return tag_invoke(reduce_t{}, (_Sender&&) __sndr, (_InitT&&) __init, (_RedOp&&) __redop);
174+
}
175+
176+
template <stdexec::sender _Sender, class _InitT, stdexec::__movable_value _RedOp>
177+
requires(!stdexec::__tag_invocable_with_completion_scheduler<
178+
reduce_t,
179+
stdexec::set_value_t,
180+
_Sender,
181+
_InitT,
182+
_RedOp>)
183+
&& (!stdexec::tag_invocable<reduce_t, _Sender, _InitT, _RedOp>)
184+
STDEXEC_DETAIL_CUDACC_HOST_DEVICE __sender<_Sender, _InitT, _RedOp>
185+
operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const {
186+
return __sender<_Sender, _InitT, _RedOp>{(_Sender&&) __sndr, __init, (_RedOp&&) __redop};
187+
}
188+
189+
template <class _InitT, class _RedOp = std::plus<>>
190+
stdexec::__binder_back<reduce_t, _InitT, _RedOp> operator()(_InitT __init, _RedOp __redop = {}) const {
191+
return {
192+
{},
193+
{},
194+
{(_InitT&&) __init, (_RedOp&&) __redop}
195+
};
196+
}
197+
};
198+
199+
}
200+
201+
using __reduce::reduce_t;
202+
inline constexpr reduce_t reduce{};
203+
}

0 commit comments

Comments
 (0)