Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ All in `std::` namespace.

### Other
* [`poolstl::iota_iter`](include/poolstl/iota_iter.hpp) - Iterate over integers. Same as iterating over output of [`std::iota`](https://en.cppreference.com/w/cpp/algorithm/iota) but without materializing anything. Iterator version of [`std::ranges::iota_view`](https://en.cppreference.com/w/cpp/ranges/iota_view).
* `poolstl::for_each_chunk` - Like `std::for_each`, but explicitly splits the input range into chunks then exposes the chunked parallelism. A user-specified chunk constructor is called for each parallel chunk then its output is passed to each loop iteration. Useful for workloads that need an expensive workspace that can be reused between iterations, but not simultaneously by all iterations in parallel.

## Usage

Expand Down
34 changes: 34 additions & 0 deletions include/poolstl/algorithm
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,38 @@ namespace std {
}
}

namespace poolstl {

template <class RandIt, class ChunkConstructor, class UnaryFunction>
void for_each_chunk(RandIt first, RandIt last, ChunkConstructor construct, UnaryFunction f) {
if (first == last) {
return;
}

auto chunk_data = construct();
for (; first != last; ++first) {
f(*first, chunk_data);
}
}

/**
* NOTE: Iterators are expected to be random access.
*
* Like `std::for_each`, but exposes the chunking. The `construct` method is called once per parallel chunk and
* its output is passed to `f`.
*
* Useful for cases where an expensive workspace can be shared between loop iterations
* but cannot be shared by all parallel iterations.
*/
template <class ExecPolicy, class RandIt, class ChunkConstructor, class UnaryFunction>
poolstl::internal::enable_if_par<ExecPolicy, void>
for_each_chunk(ExecPolicy&& policy, RandIt first, RandIt last, ChunkConstructor construct, UnaryFunction f) {
auto futures = poolstl::internal::parallel_chunk_for(std::forward<ExecPolicy>(policy), first, last,
[&construct, &f](RandIt chunk_first, RandIt chunk_last) {
for_each_chunk(chunk_first, chunk_last, construct, f);
});
poolstl::internal::get_futures(futures);
}
}

#endif
4 changes: 2 additions & 2 deletions include/poolstl/internal/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

// Version macros.
#define POOLSTL_VERSION_MAJOR 0
#define POOLSTL_VERSION_MINOR 4
#define POOLSTL_VERSION_PATCH 0
#define POOLSTL_VERSION_MINOR 3
#define POOLSTL_VERSION_PATCH 1

#include <cstddef>
#include <functional>
Expand Down
76 changes: 40 additions & 36 deletions include/poolstl/seq_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
* Forward poolstl::seq to the native sequential (no policy) method.
*/

#define POOLSTL_DEFINE_SEQ_FWD(FNAME) \
#define POOLSTL_DEFINE_SEQ_FWD(NS, FNAME) \
template<class EP, typename...ARGS> \
auto FNAME(EP&&, ARGS&&...args) -> \
poolstl::internal::enable_if_seq<EP, decltype(std::FNAME(std::forward<ARGS>(args)...))> { \
return std::FNAME(std::forward<ARGS>(args)...); \
poolstl::internal::enable_if_seq<EP, decltype(NS::FNAME(std::forward<ARGS>(args)...))> { \
return NS::FNAME(std::forward<ARGS>(args)...); \
}

#define POOLSTL_DEFINE_SEQ_FWD_VOID(FNAME) \
#define POOLSTL_DEFINE_SEQ_FWD_VOID(NS, FNAME) \
template<class EP, typename...ARGS> \
poolstl::internal::enable_if_seq<EP, void> FNAME(EP&&, ARGS&&... args) { \
std::FNAME(std::forward<ARGS>(args)...); \
NS::FNAME(std::forward<ARGS>(args)...); \
}

#if POOLSTL_HAVE_CXX17
Expand All @@ -32,64 +32,68 @@
* Useful to choose between parallel and sequential policies at runtime via par_if.
*/

#define POOLSTL_DEFINE_PAR_IF_FWD_VOID(FNAME) \
#define POOLSTL_DEFINE_PAR_IF_FWD_VOID(NS, FNAME) \
template<class EP, typename...ARGS> \
poolstl::internal::enable_if_poolstl_variant<EP, void> FNAME(EP&& policy, ARGS&&...args) { \
std::visit([&](auto&& pol) { std::FNAME(pol, std::forward<ARGS>(args)...); }, policy.var); \
std::visit([&](auto&& pol) { NS::FNAME(pol, std::forward<ARGS>(args)...); }, policy.var); \
}

#define POOLSTL_DEFINE_PAR_IF_FWD(FNAME) \
#define POOLSTL_DEFINE_PAR_IF_FWD(NS, FNAME) \
template<class EP, typename...ARGS> \
auto FNAME(EP&& policy, ARGS&&...args) -> \
poolstl::internal::enable_if_poolstl_variant<EP, decltype(std::FNAME(std::forward<ARGS>(args)...))> { \
return std::visit([&](auto&& pol) { return std::FNAME(pol, std::forward<ARGS>(args)...); }, policy.var); \
poolstl::internal::enable_if_poolstl_variant<EP, decltype(NS::FNAME(std::forward<ARGS>(args)...))> { \
return std::visit([&](auto&& pol) { return NS::FNAME(pol, std::forward<ARGS>(args)...); }, policy.var); \
}

#else
#define POOLSTL_DEFINE_PAR_IF_FWD_VOID(FNAME)
#define POOLSTL_DEFINE_PAR_IF_FWD(FNAME)
#define POOLSTL_DEFINE_PAR_IF_FWD_VOID(NS, FNAME)
#define POOLSTL_DEFINE_PAR_IF_FWD(NS, FNAME)
#endif
/*
* Define both the sequential forward and dynamic chooser.
*/
#define POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(FNAME) \
POOLSTL_DEFINE_SEQ_FWD(FNAME) \
POOLSTL_DEFINE_PAR_IF_FWD(FNAME)
#define POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(NS, FNAME) \
POOLSTL_DEFINE_SEQ_FWD(NS, FNAME) \
POOLSTL_DEFINE_PAR_IF_FWD(NS, FNAME)

#define POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(FNAME) \
POOLSTL_DEFINE_SEQ_FWD_VOID(FNAME) \
POOLSTL_DEFINE_PAR_IF_FWD_VOID(FNAME)
#define POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(NS, FNAME) \
POOLSTL_DEFINE_SEQ_FWD_VOID(NS, FNAME) \
POOLSTL_DEFINE_PAR_IF_FWD_VOID(NS, FNAME)

namespace std {
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(all_of)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(any_of)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(none_of)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, all_of)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, any_of)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, none_of)

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(count)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(count_if)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, count)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, count_if)

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(copy)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(copy_n)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, copy)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, copy_n)

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(fill)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(fill_n)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(std, fill)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, fill_n)

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(find)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(find_if)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(find_if_not)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, find)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, find_if)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, find_if_not)

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(for_each)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(std, for_each)
#if POOLSTL_HAVE_CXX17_LIB
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(for_each_n)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, for_each_n)
#endif

POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(transform)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, transform)

#if POOLSTL_HAVE_CXX17_LIB
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(exclusive_scan)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(reduce)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(transform_reduce)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, exclusive_scan)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, reduce)
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF(std, transform_reduce)
#endif
}

namespace poolstl {
POOLSTL_DEFINE_BOTH_SEQ_FWD_AND_PAR_IF_VOID(poolstl, for_each_chunk)
}

#endif
32 changes: 32 additions & 0 deletions tests/poolstl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,38 @@ TEST_CASE("for_each_n", "[alg][algorithm]") {
}
}

TEST_CASE("for_each_chunk", "[alg][algorithm][poolstl]") {
std::atomic<int> sum{0};
std::atomic<int> num_chunks{0};
for (auto num_threads : test_thread_counts) {
ttp::task_thread_pool pool(num_threads);

for (auto num_iters : test_arr_sizes) {
auto v = iota_vector(num_iters);

for (auto is_sequential : {true, false}) {
num_chunks = 0;
sum = 0;
auto cc = [&]() { ++num_chunks; return 1; };
auto f = [&](auto, auto) { ++sum; };
if (is_sequential) {
poolstl::for_each_chunk(poolstl::par_if(false), v.cbegin(), v.cend(), cc, f);
REQUIRE(num_chunks == (v.empty() ? 0 : 1));
} else {
poolstl::for_each_chunk(poolstl::par.on(pool), v.cbegin(), v.cend(), cc, f);
if (num_threads != 0) {
REQUIRE(num_chunks <= std::min((int)v.size(), num_threads));
}
if (!v.empty()) {
REQUIRE(num_chunks > 0);
}
}
REQUIRE(sum == num_iters);
}
}
}
}

TEST_CASE("sort", "[alg][algorithm]") {
for (auto num_threads : test_thread_counts) {
ttp::task_thread_pool pool(num_threads);
Expand Down