Skip to content

Commit 091214d

Browse files
authored
Add parallel std::partition and use it to improve sort (#31)
This version of partition only uses two threads (spawns and waits for one additional task).
1 parent d99c5a6 commit 091214d

File tree

7 files changed

+208
-51
lines changed

7 files changed

+208
-51
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Algorithms are added on an as-needed basis. If you need one [open an issue](http
4747
* [`fill`](https://en.cppreference.com/w/cpp/algorithm/fill), [`fill_n`](https://en.cppreference.com/w/cpp/algorithm/fill_n)
4848
* [`find`](https://en.cppreference.com/w/cpp/algorithm/find), [`find_if`](https://en.cppreference.com/w/cpp/algorithm/find_if), [`find_if_not`](https://en.cppreference.com/w/cpp/algorithm/find_if_not)
4949
* [`for_each`](https://en.cppreference.com/w/cpp/algorithm/for_each), [`for_each_n`](https://en.cppreference.com/w/cpp/algorithm/for_each_n)
50+
* [`partition`](https://en.cppreference.com/w/cpp/algorithm/partition)
5051
* [`sort`](https://en.cppreference.com/w/cpp/algorithm/sort), [`stable_sort`](https://en.cppreference.com/w/cpp/algorithm/stable_sort)
5152
* [`transform`](https://en.cppreference.com/w/cpp/algorithm/transform)
5253
@@ -197,9 +198,9 @@ for_each()/real_time 94.6 ms
197198
for_each(poolstl::par)/real_time 18.7 ms 0.044 ms 36
198199
for_each(std::execution::par)/real_time 15.3 ms 12.9 ms 46
199200
sort()/real_time 603 ms 602 ms 1
200-
sort(poolstl::par)/real_time 137 ms 11.8 ms 5
201+
sort(poolstl::par)/real_time 112 ms 6.64 ms 6
201202
sort(std::execution::par)/real_time 113 ms 102 ms 6
202-
pluggable_sort(poolstl::par, ..., pdqsort)/real_time 91.8 ms 11.9 ms 7
203+
pluggable_sort(poolstl::par, ..., pdqsort)/real_time 71.7 ms 6.67 ms 10
203204
transform()/real_time 95.0 ms 94.9 ms 7
204205
transform(poolstl::par)/real_time 17.4 ms 0.037 ms 38
205206
transform(std::execution::par)/real_time 15.3 ms 13.2 ms 45

benchmark/algorithm_bench.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,35 @@ BENCHMARK(for_each<std_par>)->Name("for_each(std::execution::par)")->UseRealTime
9393

9494
////////////////////////////////
9595

96+
template <class ExecPolicy>
97+
void partition(benchmark::State& state) {
98+
auto values = iota_vector<int>(arr_length);
99+
std::vector<int> haystack(arr_length);
100+
101+
double pivot_frac = ((double)state.range(0)) / 100;
102+
int pivot_val = (int)((double)values.size() * pivot_frac);
103+
auto pred = [pivot_val] (const int& em) { return em < pivot_val; };
104+
105+
for ([[maybe_unused]] auto _ : state) {
106+
if constexpr (is_policy<ExecPolicy>::value) {
107+
auto res = std::partition(policy<ExecPolicy>::get(), values.begin(), values.end(), pred);
108+
benchmark::DoNotOptimize(res);
109+
} else {
110+
auto res = std::partition(values.begin(), values.end(), pred);
111+
benchmark::DoNotOptimize(res);
112+
}
113+
benchmark::ClobberMemory();
114+
}
115+
}
116+
117+
BENCHMARK(partition<seq>)->Name("partition()")->UseRealTime()->ArgName("pivot_percentile")->Arg(50);
118+
BENCHMARK(partition<poolstl_par>)->Name("partition(poolstl::par)")->UseRealTime()->ArgName("pivot_percentile")->Arg(50);
119+
#ifdef POOLSTL_BENCH_STD_PAR
120+
BENCHMARK(partition<std_par>)->Name("partition(std::execution::par)")->UseRealTime()->ArgName("pivot_percentile")->Arg(50);
121+
#endif
122+
123+
////////////////////////////////
124+
96125
template <class ExecPolicy>
97126
void sort(benchmark::State& state) {
98127
auto source = random_vector<int>(arr_length / 10);

include/poolstl/algorithm

Lines changed: 77 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,66 @@
1212
#include "internal/ttp_impl.hpp"
1313
#include "internal/thread_impl.hpp"
1414

15+
namespace poolstl {
16+
/**
17+
* NOTE: Iterators are expected to be random access.
18+
*
19+
* Like `std::sort`, but allows specifying the sequential sort method, which must have the
20+
* same signature as the comparator version of `std::sort`.
21+
*
22+
* Implemented as a high-level quicksort that delegates to `sort_func`, in parallel, once the range has been
23+
* sufficiently partitioned.
24+
*/
25+
template <class ExecPolicy, class RandIt, class Compare>
26+
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
27+
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp,
28+
void (sort_func)(RandIt, RandIt, Compare) = std::sort) {
29+
if (poolstl::internal::is_seq<ExecPolicy>(policy)) {
30+
sort_func(first, last, comp);
31+
return;
32+
}
33+
34+
// Parallel partition.
35+
// The partition_p2 method spawns and waits for its own child task. A deadlock is possible if all worker
36+
// threads are waiting for tasks that in turn have to workers to execute them. This is only an issue because
37+
// our thread pool does not have the concept of dependencies.
38+
// So ensure
39+
auto& task_pool = *policy.pool();
40+
std::atomic<int> allowed_parallel_partitions{(int)task_pool.get_num_threads() / 2};
41+
42+
auto part_func = [&task_pool, &allowed_parallel_partitions](RandIt chunk_first, RandIt chunk_last,
43+
poolstl::internal::pivot_predicate<Compare,
44+
typename std::iterator_traits<RandIt>::value_type> pred) {
45+
if (allowed_parallel_partitions.fetch_sub(1) > 0) {
46+
return poolstl::internal::partition_p2(task_pool, chunk_first, chunk_last, pred);
47+
} else {
48+
return std::partition(chunk_first, chunk_last, pred);
49+
}
50+
};
51+
52+
poolstl::internal::parallel_quicksort(std::forward<ExecPolicy>(policy), first, last, comp, sort_func, part_func,
53+
poolstl::internal::quicksort_pivot<RandIt>);
54+
}
55+
56+
/**
57+
* NOTE: Iterators are expected to be random access.
58+
*
59+
* Like `std::sort`, but allows specifying the sequential sort method, which must have the
60+
* same signature as the comparator version of `std::sort`.
61+
*
62+
* Implemented as a parallel high-level quicksort that delegates to `sort_func` once the range has been
63+
* sufficiently partitioned.
64+
*/
65+
template <class ExecPolicy, class RandIt>
66+
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
67+
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last,
68+
void (sort_func)(RandIt, RandIt,
69+
std::less<typename std::iterator_traits<RandIt>::value_type>) = std::sort){
70+
using T = typename std::iterator_traits<RandIt>::value_type;
71+
pluggable_sort(std::forward<ExecPolicy>(policy), first, last, std::less<T>(), sort_func);
72+
}
73+
}
74+
1575
namespace std {
1676

1777
/**
@@ -209,6 +269,22 @@ namespace std {
209269
return last;
210270
}
211271

272+
/**
273+
* NOTE: Iterators are expected to be random access.
274+
* See std::partition https://en.cppreference.com/w/cpp/algorithm/partition
275+
*
276+
* Current implementation uses at most 2 threads.
277+
*/
278+
template <class ExecPolicy, class RandIt, class Predicate>
279+
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, RandIt>
280+
partition(ExecPolicy &&policy, RandIt first, RandIt last, Predicate pred) {
281+
if (poolstl::internal::is_seq<ExecPolicy>(policy)) {
282+
return std::partition(first, last, pred);
283+
}
284+
285+
return poolstl::internal::partition_p2(*policy.pool(), first, last, pred);
286+
}
287+
212288
/**
213289
* NOTE: Iterators are expected to be random access.
214290
* See std::sort https://en.cppreference.com/w/cpp/algorithm/sort
@@ -221,11 +297,7 @@ namespace std {
221297
return;
222298
}
223299

224-
poolstl::internal::parallel_quicksort(std::forward<ExecPolicy>(policy), first, last, comp,
225-
std::sort<RandIt, Compare>,
226-
std::partition<RandIt, poolstl::internal::pivot_predicate<Compare,
227-
typename std::iterator_traits<RandIt>::value_type>>,
228-
poolstl::internal::quicksort_pivot<RandIt>);
300+
poolstl::pluggable_sort(std::forward<ExecPolicy>(policy), first, last, comp, std::sort<RandIt, Compare>);
229301
}
230302

231303
/**
@@ -377,48 +449,6 @@ namespace poolstl {
377449
(void*)nullptr, 1, construct, f);
378450
}
379451

380-
/**
381-
* NOTE: Iterators are expected to be random access.
382-
*
383-
* Like `std::sort`, but allows specifying the sequential sort method, which must have the
384-
* same signature as the comparator version of `std::sort`.
385-
*
386-
* Implemented as a high-level quicksort that delegates to `sort_func`, in parallel, once the range has been
387-
* sufficiently partitioned.
388-
*/
389-
template <class ExecPolicy, class RandIt, class Compare>
390-
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
391-
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last, Compare comp,
392-
void (sort_func)(RandIt, RandIt, Compare) = std::sort) {
393-
if (poolstl::internal::is_seq<ExecPolicy>(policy)) {
394-
sort_func(first, last, comp);
395-
return;
396-
}
397-
398-
poolstl::internal::parallel_quicksort(std::forward<ExecPolicy>(policy), first, last, comp, sort_func,
399-
std::partition<RandIt, poolstl::internal::pivot_predicate<Compare,
400-
typename std::iterator_traits<RandIt>::value_type>>,
401-
poolstl::internal::quicksort_pivot<RandIt>);
402-
}
403-
404-
/**
405-
* NOTE: Iterators are expected to be random access.
406-
*
407-
* Like `std::sort`, but allows specifying the sequential sort method, which must have the
408-
* same signature as the comparator version of `std::sort`.
409-
*
410-
* Implemented as a parallel high-level quicksort that delegates to `sort_func` once the range has been
411-
* sufficiently partitioned.
412-
*/
413-
template <class ExecPolicy, class RandIt>
414-
poolstl::internal::enable_if_poolstl_policy<ExecPolicy, void>
415-
pluggable_sort(ExecPolicy &&policy, RandIt first, RandIt last,
416-
void (sort_func)(RandIt, RandIt,
417-
std::less<typename std::iterator_traits<RandIt>::value_type>) = std::sort){
418-
using T = typename std::iterator_traits<RandIt>::value_type;
419-
pluggable_sort(std::forward<ExecPolicy>(policy), first, last, std::less<T>(), sort_func);
420-
}
421-
422452
/**
423453
* NOTE: Iterators are expected to be random access.
424454
*

include/poolstl/internal/ttp_impl.hpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,14 @@ namespace poolstl {
273273
// Target partition size. Range will be recursively partitioned into partitions no bigger than this
274274
// size. Target approximately twice as many partitions as threads to reduce impact of uneven pivot
275275
// selection.
276-
std::ptrdiff_t target_leaf_size = std::max(std::distance(first, last) / (task_pool.get_num_threads() * 2),
276+
auto num_threads = task_pool.get_num_threads();
277+
std::ptrdiff_t target_leaf_size = std::max(std::distance(first, last) / (num_threads * 2),
277278
(std::ptrdiff_t)5);
278279

280+
if (num_threads == 1) {
281+
target_leaf_size = std::distance(first, last);
282+
}
283+
279284
// task_thread_pool does not support creating task DAGs, so organize the code such that
280285
// all parallel tasks are independent. The parallel tasks can spawn additional parallel tasks, and they
281286
// record their "child" task's std::future into a common vector to be waited on by the main thread.
@@ -304,6 +309,39 @@ namespace poolstl {
304309
// Wait on all the parallel tasks.
305310
get_futures(futures);
306311
}
312+
313+
/**
314+
* Partition range according to predicate. Unstable.
315+
*
316+
* This implementation only parallelizes with p=2; will spawn and wait for only one task.
317+
*/
318+
template <class RandIt, class Predicate>
319+
RandIt partition_p2(task_thread_pool::task_thread_pool &task_pool, RandIt first, RandIt last, Predicate pred) {
320+
auto range_size = std::distance(first, last);
321+
if (range_size < 4) {
322+
return std::partition(first, last, pred);
323+
}
324+
325+
// approach should be generalizable to arbitrary p
326+
327+
RandIt mid = std::next(first + range_size / 2);
328+
329+
// partition left and right halves in parallel
330+
auto left_future = task_pool.submit(std::partition<RandIt, Predicate>, first, mid, pred);
331+
RandIt right_mid = std::partition(mid, last, pred);
332+
RandIt left_mid = left_future.get();
333+
334+
// merge the two partitioned halves
335+
auto left_highs_size = std::distance(left_mid, mid);
336+
auto right_lows_size = std::distance(mid, right_mid);
337+
if (left_highs_size <= right_lows_size) {
338+
std::swap_ranges(left_mid, mid, right_mid - left_highs_size);
339+
return right_mid - left_highs_size;
340+
} else {
341+
std::swap_ranges(mid, right_mid, left_mid);
342+
return left_mid + right_lows_size;
343+
}
344+
}
307345
}
308346
}
309347

include/poolstl/seq_fwd.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ namespace std {
9797
POOLSTL_DEFINE_SEQ_FWD(std, for_each_n)
9898
#endif
9999

100+
POOLSTL_DEFINE_SEQ_FWD(std, partition)
100101
POOLSTL_DEFINE_SEQ_FWD(std, transform)
102+
POOLSTL_DEFINE_SEQ_FWD(std, sort)
101103

102104
// <numeric>
103105

include/poolstl/variant_policy.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ namespace std {
105105
POOLSTL_DEFINE_PAR_IF_FWD(std, for_each_n)
106106
#endif
107107

108+
POOLSTL_DEFINE_PAR_IF_FWD(std, partition)
108109
POOLSTL_DEFINE_PAR_IF_FWD(std, transform)
110+
POOLSTL_DEFINE_PAR_IF_FWD(std, sort)
109111

110112
// <numeric>
111113

@@ -121,6 +123,7 @@ namespace poolstl {
121123
// <poolstl/algorithm>
122124

123125
POOLSTL_DEFINE_PAR_IF_FWD_VOID(poolstl, for_each_chunk)
126+
POOLSTL_DEFINE_PAR_IF_FWD_VOID(poolstl, pluggable_sort)
124127
}
125128
#endif
126129

tests/poolstl_test.cpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,16 +321,70 @@ TEST_CASE("inplace_merge", "[alg][algorithm]") {
321321
}
322322
}
323323

324+
TEST_CASE("partition", "[alg][algorithm]") {
325+
for (auto num_threads: test_thread_counts) {
326+
ttp::task_thread_pool pool(num_threads);
327+
328+
for (auto num_iters: test_arr_sizes) {
329+
for (int scramble_type = 0; scramble_type <= 2; ++scramble_type) {
330+
auto source = iota_vector(num_iters);
331+
switch (scramble_type) {
332+
case 0:
333+
std::reverse(source.begin(), source.end());
334+
break;
335+
case 1:
336+
scramble(source);
337+
break;
338+
default:
339+
break;
340+
}
341+
342+
std::vector<int> pivots = {0, -1};
343+
if (source.size() > 1) {
344+
pivots.push_back(*std::prev(source.end()));
345+
pivots.push_back(source[source.size() / 2]);
346+
}
347+
348+
for (auto pivot : pivots) {
349+
auto pred = [pivot] (const int& em) { return em < pivot; };
350+
std::ptrdiff_t expected_left_size;
351+
{
352+
std::vector<int> work(source);
353+
auto ret = std::partition(work.begin(), work.end(), pred);
354+
expected_left_size = std::distance(work.begin(), ret);
355+
}
356+
{
357+
std::vector<int> work(source);
358+
auto mid = std::partition(poolstl::par_if(false), work.begin(), work.end(), pred);
359+
REQUIRE(expected_left_size == std::distance(work.begin(), mid));
360+
REQUIRE(std::is_partitioned(work.begin(), work.end(), pred));
361+
}
362+
{
363+
std::vector<int> work(source);
364+
auto mid = std::partition(poolstl::par.on(pool), work.begin(), work.end(), pred);
365+
REQUIRE(expected_left_size == std::distance(work.begin(), mid));
366+
REQUIRE(std::is_partitioned(work.begin(), work.end(), pred));
367+
}
368+
}
369+
}
370+
}
371+
}
372+
}
373+
324374
TEST_CASE("sort", "[alg][algorithm]") {
325375
for (auto num_threads : test_thread_counts) {
326376
ttp::task_thread_pool pool(num_threads);
327377

328378
for (auto num_iters : test_arr_sizes) {
329-
for (int scramble_type = 0; scramble_type <= 2; ++scramble_type) {
379+
for (int scramble_type = 0; scramble_type <= 3; ++scramble_type) {
330380
auto source = iota_vector(num_iters);
331381
switch (scramble_type) {
332382
case 0: std::reverse(source.begin(), source.end()); break;
333383
case 1: scramble(source); break;
384+
case 2:
385+
if (source.size() > 2) {
386+
std::swap(source[0], source[1]);
387+
}
334388
default: break;
335389
}
336390

0 commit comments

Comments
 (0)