Skip to content

Commit 05e61d6

Browse files
authored
implement par_iter_many and par_iter_many_unique (#17815)
# Objective Continuation of #16547. We do not yet have parallel versions of `par_iter_many` and `par_iter_many_unique`. It is currently very painful to try and use parallel iteration over entity lists. Even if a list is not long, each operation might still be very expensive, and worth parallelizing. Plus, it has been requested several times! ## Solution Once again, we implement what we lack! These parallel iterators collect their input entity list into a `Vec`/`UniqueEntityVec`, then chunk that over the available threads, inspired by the original `par_iter`. Since no order guarantee is given to the caller, we could sort the input list according to `EntityLocation`, but that would likely only be worth it for very large entity lists. There is some duplication which could likely be improved, but I'd like to leave that for a follow-up. ## Testing The doc tests on `for_each_init` of `QueryParManyIter` and `QueryParManyUniqueIter`.
1 parent 96a4028 commit 05e61d6

File tree

4 files changed

+558
-8
lines changed

4 files changed

+558
-8
lines changed

crates/bevy_ecs/src/query/iter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2433,7 +2433,9 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryIterationCursor<'w, 's, D, F> {
24332433
}
24342434

24352435
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
2436-
// QueryIter, QueryIterationCursor, QuerySortedIter, QueryManyIter, QuerySortedManyIter, QueryCombinationIter, QueryState::par_fold_init_unchecked_manual
2436+
// QueryIter, QueryIterationCursor, QuerySortedIter, QueryManyIter, QuerySortedManyIter, QueryCombinationIter,
2437+
// QueryState::par_fold_init_unchecked_manual, QueryState::par_many_fold_init_unchecked_manual,
2438+
// QueryState::par_many_unique_fold_init_unchecked_manual
24372439
/// # Safety
24382440
/// `tables` and `archetypes` must belong to the same world that the [`QueryIterationCursor`]
24392441
/// was initialized for.

crates/bevy_ecs/src/query/par_iter.rs

Lines changed: 337 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
use crate::{
2-
batching::BatchingStrategy, component::Tick, world::unsafe_world_cell::UnsafeWorldCell,
2+
batching::BatchingStrategy,
3+
component::Tick,
4+
entity::{EntityBorrow, TrustedEntityBorrow, UniqueEntityVec},
5+
world::unsafe_world_cell::UnsafeWorldCell,
36
};
47

5-
use super::{QueryData, QueryFilter, QueryItem, QueryState};
8+
use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9+
10+
use alloc::vec::Vec;
611

712
/// A parallel iterator over query results of a [`Query`](crate::system::Query).
813
///
@@ -54,7 +59,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
5459
/// fn system(query: Query<&T>){
5560
/// let mut queue: Parallel<usize> = Parallel::default();
5661
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
57-
/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue,item| {
62+
/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
5863
/// **local_queue += 1;
5964
/// });
6065
///
@@ -146,3 +151,332 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
146151
.calc_batch_size(max_items, thread_count)
147152
}
148153
}
154+
155+
/// A parallel iterator over the unique query items generated from an [`Entity`] list.
156+
///
157+
/// This struct is created by the [`Query::par_iter_many`] method.
158+
///
159+
/// [`Entity`]: crate::entity::Entity
160+
/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
161+
pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityBorrow> {
162+
pub(crate) world: UnsafeWorldCell<'w>,
163+
pub(crate) state: &'s QueryState<D, F>,
164+
pub(crate) entity_list: Vec<E>,
165+
pub(crate) last_run: Tick,
166+
pub(crate) this_run: Tick,
167+
pub(crate) batching_strategy: BatchingStrategy,
168+
}
169+
170+
impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityBorrow + Sync>
171+
QueryParManyIter<'w, 's, D, F, E>
172+
{
173+
/// Changes the batching strategy used when iterating.
174+
///
175+
/// For more information on how this affects the resultant iteration, see
176+
/// [`BatchingStrategy`].
177+
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
178+
self.batching_strategy = strategy;
179+
self
180+
}
181+
182+
/// Runs `func` on each query result in parallel.
183+
///
184+
/// # Panics
185+
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
186+
/// initialized and run from the ECS scheduler, this should never panic.
187+
///
188+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
189+
#[inline]
190+
pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
191+
self.for_each_init(|| {}, |_, item| func(item));
192+
}
193+
194+
/// Runs `func` on each query result in parallel on a value returned by `init`.
195+
///
196+
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
197+
/// Callers should avoid using this function as if it were a parallel version
198+
/// of [`Iterator::fold`].
199+
///
200+
/// # Example
201+
///
202+
/// ```
203+
/// use bevy_utils::Parallel;
204+
/// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
205+
/// # use core::slice;
206+
/// use bevy_platform_support::prelude::Vec;
207+
/// # fn some_expensive_operation(_item: &T) -> usize {
208+
/// # 0
209+
/// # }
210+
///
211+
/// #[derive(Component)]
212+
/// struct T;
213+
///
214+
/// #[derive(Resource)]
215+
/// struct V(Vec<Entity>);
216+
///
217+
/// impl<'a> IntoIterator for &'a V {
218+
/// // ...
219+
/// # type Item = &'a Entity;
220+
/// # type IntoIter = slice::Iter<'a, Entity>;
221+
/// #
222+
/// # fn into_iter(self) -> Self::IntoIter {
223+
/// # self.0.iter()
224+
/// # }
225+
/// }
226+
///
227+
/// fn system(query: Query<&T>, entities: Res<V>){
228+
/// let mut queue: Parallel<usize> = Parallel::default();
229+
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
230+
/// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
231+
/// **local_queue += some_expensive_operation(item);
232+
/// });
233+
///
234+
/// // collect value from every thread
235+
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
236+
/// }
237+
/// ```
238+
///
239+
/// # Panics
240+
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
241+
/// initialized and run from the ECS scheduler, this should never panic.
242+
///
243+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
244+
#[inline]
245+
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
246+
where
247+
FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
248+
INIT: Fn() -> T + Sync + Send + Clone,
249+
{
250+
let func = |mut init, item| {
251+
func(&mut init, item);
252+
init
253+
};
254+
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
255+
{
256+
let init = init();
257+
// SAFETY:
258+
// This method can only be called once per instance of QueryParManyIter,
259+
// which ensures that mutable queries cannot be executed multiple times at once.
260+
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
261+
// Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
262+
// at the same time.
263+
unsafe {
264+
self.state
265+
.iter_many_unchecked_manual(
266+
&self.entity_list,
267+
self.world,
268+
self.last_run,
269+
self.this_run,
270+
)
271+
.fold(init, func);
272+
}
273+
}
274+
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
275+
{
276+
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
277+
if thread_count <= 1 {
278+
let init = init();
279+
// SAFETY: See the safety comment above.
280+
unsafe {
281+
self.state
282+
.iter_many_unchecked_manual(
283+
&self.entity_list,
284+
self.world,
285+
self.last_run,
286+
self.this_run,
287+
)
288+
.fold(init, func);
289+
}
290+
} else {
291+
// Need a batch size of at least 1.
292+
let batch_size = self.get_batch_size(thread_count).max(1);
293+
// SAFETY: See the safety comment above.
294+
unsafe {
295+
self.state.par_many_fold_init_unchecked_manual(
296+
init,
297+
self.world,
298+
&self.entity_list,
299+
batch_size,
300+
func,
301+
self.last_run,
302+
self.this_run,
303+
);
304+
}
305+
}
306+
}
307+
}
308+
309+
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
310+
fn get_batch_size(&self, thread_count: usize) -> usize {
311+
self.batching_strategy
312+
.calc_batch_size(|| self.entity_list.len(), thread_count)
313+
}
314+
}
315+
316+
/// A parallel iterator over the unique query items generated from an [`EntitySet`].
317+
///
318+
/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
319+
///
320+
/// [`EntitySet`]: crate::entity::EntitySet
321+
/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
322+
/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
323+
pub struct QueryParManyUniqueIter<
324+
'w,
325+
's,
326+
D: QueryData,
327+
F: QueryFilter,
328+
E: TrustedEntityBorrow + Sync,
329+
> {
330+
pub(crate) world: UnsafeWorldCell<'w>,
331+
pub(crate) state: &'s QueryState<D, F>,
332+
pub(crate) entity_list: UniqueEntityVec<E>,
333+
pub(crate) last_run: Tick,
334+
pub(crate) this_run: Tick,
335+
pub(crate) batching_strategy: BatchingStrategy,
336+
}
337+
338+
impl<'w, 's, D: QueryData, F: QueryFilter, E: TrustedEntityBorrow + Sync>
339+
QueryParManyUniqueIter<'w, 's, D, F, E>
340+
{
341+
/// Changes the batching strategy used when iterating.
342+
///
343+
/// For more information on how this affects the resultant iteration, see
344+
/// [`BatchingStrategy`].
345+
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
346+
self.batching_strategy = strategy;
347+
self
348+
}
349+
350+
/// Runs `func` on each query result in parallel.
351+
///
352+
/// # Panics
353+
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
354+
/// initialized and run from the ECS scheduler, this should never panic.
355+
///
356+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
357+
#[inline]
358+
pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
359+
self.for_each_init(|| {}, |_, item| func(item));
360+
}
361+
362+
/// Runs `func` on each query result in parallel on a value returned by `init`.
363+
///
364+
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
365+
/// Callers should avoid using this function as if it were a parallel version
366+
/// of [`Iterator::fold`].
367+
///
368+
/// # Example
369+
///
370+
/// ```
371+
/// use bevy_utils::Parallel;
372+
/// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
373+
/// # use core::slice;
374+
/// # use crate::bevy_ecs::entity::UniqueEntityIter;
375+
/// # fn some_expensive_operation(_item: &T) -> usize {
376+
/// # 0
377+
/// # }
378+
///
379+
/// #[derive(Component)]
380+
/// struct T;
381+
///
382+
/// #[derive(Resource)]
383+
/// struct V(UniqueEntityVec<Entity>);
384+
///
385+
/// impl<'a> IntoIterator for &'a V {
386+
/// // ...
387+
/// # type Item = &'a Entity;
388+
/// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
389+
/// #
390+
/// # fn into_iter(self) -> Self::IntoIter {
391+
/// # self.0.iter()
392+
/// # }
393+
/// }
394+
///
395+
/// fn system(query: Query<&T>, entities: Res<V>){
396+
/// let mut queue: Parallel<usize> = Parallel::default();
397+
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
398+
/// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
399+
/// **local_queue += some_expensive_operation(item);
400+
/// });
401+
///
402+
/// // collect value from every thread
403+
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
404+
/// }
405+
/// ```
406+
///
407+
/// # Panics
408+
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
409+
/// initialized and run from the ECS scheduler, this should never panic.
410+
///
411+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
412+
#[inline]
413+
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
414+
where
415+
FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
416+
INIT: Fn() -> T + Sync + Send + Clone,
417+
{
418+
let func = |mut init, item| {
419+
func(&mut init, item);
420+
init
421+
};
422+
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
423+
{
424+
let init = init();
425+
// SAFETY:
426+
// This method can only be called once per instance of QueryParManyUniqueIter,
427+
// which ensures that mutable queries cannot be executed multiple times at once.
428+
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
429+
// Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
430+
// at the same time.
431+
unsafe {
432+
self.state
433+
.iter_many_unique_unchecked_manual(
434+
self.entity_list,
435+
self.world,
436+
self.last_run,
437+
self.this_run,
438+
)
439+
.fold(init, func);
440+
}
441+
}
442+
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
443+
{
444+
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
445+
if thread_count <= 1 {
446+
let init = init();
447+
// SAFETY: See the safety comment above.
448+
unsafe {
449+
self.state
450+
.iter_many_unique_unchecked_manual(
451+
self.entity_list,
452+
self.world,
453+
self.last_run,
454+
self.this_run,
455+
)
456+
.fold(init, func);
457+
}
458+
} else {
459+
// Need a batch size of at least 1.
460+
let batch_size = self.get_batch_size(thread_count).max(1);
461+
// SAFETY: See the safety comment above.
462+
unsafe {
463+
self.state.par_many_unique_fold_init_unchecked_manual(
464+
init,
465+
self.world,
466+
&self.entity_list,
467+
batch_size,
468+
func,
469+
self.last_run,
470+
self.this_run,
471+
);
472+
}
473+
}
474+
}
475+
}
476+
477+
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
478+
fn get_batch_size(&self, thread_count: usize) -> usize {
479+
self.batching_strategy
480+
.calc_batch_size(|| self.entity_list.len(), thread_count)
481+
}
482+
}

0 commit comments

Comments
 (0)