diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 948499da08..c5d262b93b 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -29,6 +29,7 @@ import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; +import io.reactivex.parallel.ParallelFlowable; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -10363,6 +10364,100 @@ public final Flowable onTerminateDetach() { return RxJavaPlugins.onAssembly(new FlowableDetach(this)); } + /** + * Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs) + * and dispatches the upstream items to them in a round-robin fashion. + *

+ * Note that the rails don't execute in parallel on their own and one needs to + * apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where + * each rail will execute. + *

+ * To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}. + *

+ * + *

+ *
Backpressure:
+ *
The operator requires the upstream to honor backpressure and each 'rail' honors backpressure + * as well.
+ *
Scheduler:
+ *
{@code parallel} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new ParallelFlowable instance + * @since 2.0.5 - experimental + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + @Experimental + public final ParallelFlowable parallel() { + return ParallelFlowable.from(this); + } + + /** + * Parallelizes the flow by creating the specified number of 'rails' + * and dispatches the upstream items to them in a round-robin fashion. + *

+ * Note that the rails don't execute in parallel on their own and one needs to + * apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where + * each rail will execute. + *

+ * To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}. + *

+ * + *

+ *
Backpressure:
+ *
The operator requires the upstream to honor backpressure and each 'rail' honors backpressure + * as well.
+ *
Scheduler:
+ *
{@code parallel} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param parallelism the number of 'rails' to use + * @return the new ParallelFlowable instance + * @since 2.0.5 - experimental + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + @Experimental + public final ParallelFlowable parallel(int parallelism) { + ObjectHelper.verifyPositive(parallelism, "parallelism"); + return ParallelFlowable.from(this, parallelism); + } + + /** + * Parallelizes the flow by creating the specified number of 'rails' + * and dispatches the upstream items to them in a round-robin fashion and + * uses the defined per-'rail' prefetch amount. + *

+ * Note that the rails don't execute in parallel on their own and one needs to + * apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where + * each rail will execute. + *

+ * To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}. + *

+ * + *

+ *
Backpressure:
+ *
The operator requires the upstream to honor backpressure and each 'rail' honors backpressure + * as well.
+ *
Scheduler:
+ *
{@code parallel} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param parallelism the number of 'rails' to use + * @param prefetch the number of items each 'rail' should prefetch + * @return the new ParallelFlowable instance + * @since 2.0.5 - experimental + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + @Experimental + public final ParallelFlowable parallel(int parallelism, int prefetch) { + ObjectHelper.verifyPositive(parallelism, "parallelism"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return ParallelFlowable.from(this, parallelism, prefetch); + } + /** * Returns a {@link ConnectableFlowable}, which is a variety of Publisher that waits until its * {@link ConnectableFlowable#connect connect} method is called before it begins emitting items to those diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java new file mode 100644 index 0000000000..7635fb57cd --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelCollect.java @@ -0,0 +1,165 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.Callable; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiConsumer; +import io.reactivex.internal.subscribers.DeferredScalarSubscriber; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduce the sequence of values in each 'rail' to a single value. + * + * @param the input value type + * @param the collection type + */ +public final class ParallelCollect extends ParallelFlowable { + + final ParallelFlowable source; + + final Callable initialCollection; + + final BiConsumer collector; + + public ParallelCollect(ParallelFlowable source, + Callable initialCollection, BiConsumer collector) { + this.source = source; + this.initialCollection = initialCollection; + this.collector = collector; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + + C initialValue; + + try { + initialValue = initialCollection.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + reportError(subscribers, ex); + return; + } + + if (initialValue == null) { + reportError(subscribers, new NullPointerException("The initialSupplier returned a null value")); + return; + } + + parents[i] = new ParallelCollectSubscriber(subscribers[i], initialValue, collector); + } + + source.subscribe(parents); + } + + void reportError(Subscriber[] subscribers, Throwable ex) { + for (Subscriber s : subscribers) { + EmptySubscription.error(ex, s); + } + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelCollectSubscriber extends DeferredScalarSubscriber { + + + private static final long serialVersionUID = -4767392946044436228L; + + final BiConsumer collector; + + C collection; + + boolean done; + + ParallelCollectSubscriber(Subscriber subscriber, + C initialValue, BiConsumer collector) { + super(subscriber); + this.collection = initialValue; + this.collector = collector; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + + try { + collector.accept(collection, t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + collection = null; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + C c = collection; + collection = null; + complete(c); + } + + @Override + public void cancel() { + super.cancel(); + s.cancel(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java new file mode 100644 index 0000000000..4f552ff3ab --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelConcatMap.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.flowable.FlowableConcatMap; +import io.reactivex.internal.util.ErrorMode; +import io.reactivex.parallel.ParallelFlowable; + +/** + * Concatenates the generated Publishers on each rail. + * + * @param the input value type + * @param the output value type + */ +public final class ParallelConcatMap extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + final int prefetch; + + final ErrorMode errorMode; + + public ParallelConcatMap( + ParallelFlowable source, + Function> mapper, + int prefetch, ErrorMode errorMode) { + this.source = source; + this.mapper = ObjectHelper.requireNonNull(mapper, "mapper"); + this.prefetch = prefetch; + this.errorMode = ObjectHelper.requireNonNull(errorMode, "errorMode"); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + @SuppressWarnings("unchecked") + final Subscriber[] parents = new Subscriber[n]; + + // FIXME cheat until we have support from RxJava2 internals + Publisher p = new Publisher() { + int i; + + @SuppressWarnings("unchecked") + @Override + public void subscribe(Subscriber s) { + parents[i++] = (Subscriber)s; + } + }; + + FlowableConcatMap op = new FlowableConcatMap(p, mapper, prefetch, errorMode); + + for (int i = 0; i < n; i++) { + + op.subscribe(subscribers[i]); +// FIXME needs a FlatMap subscriber +// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode); + } + + source.subscribe(parents); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java new file mode 100644 index 0000000000..6393deaeba --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java @@ -0,0 +1,139 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Predicate; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Filters each 'rail' of the source ParallelFlowable with a predicate function. + * + * @param the input value type + */ +public final class ParallelFilter extends ParallelFlowable { + + final ParallelFlowable source; + + final Predicate predicate; + + public ParallelFilter(ParallelFlowable source, Predicate predicate) { + this.source = source; + this.predicate = predicate; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + parents[i] = new ParallelFilterSubscriber(subscribers[i], predicate); + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelFilterSubscriber implements Subscriber, Subscription { + + final Subscriber actual; + + final Predicate predicate; + + Subscription s; + + boolean done; + + ParallelFilterSubscriber(Subscriber actual, Predicate predicate) { + this.actual = actual; + this.predicate = predicate; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + boolean b; + + try { + b = predicate.test(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + + if (b) { + actual.onNext(t); + } else { + s.request(1); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java new file mode 100644 index 0000000000..e11789fcab --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFlatMap.java @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import io.reactivex.functions.Function; + +import org.reactivestreams.*; + +import io.reactivex.internal.operators.flowable.FlowableFlatMap; +import io.reactivex.parallel.ParallelFlowable; + +/** + * Flattens the generated Publishers on each rail. + * + * @param the input value type + * @param the output value type + */ +public final class ParallelFlatMap extends ParallelFlowable { + + final ParallelFlowable source; + + final Function> mapper; + + final boolean delayError; + + final int maxConcurrency; + + final int prefetch; + + public ParallelFlatMap( + ParallelFlowable source, + Function> mapper, + boolean delayError, + int maxConcurrency, + int prefetch) { + this.source = source; + this.mapper = mapper; + this.delayError = delayError; + this.maxConcurrency = maxConcurrency; + this.prefetch = prefetch; + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + @SuppressWarnings("unchecked") + final Subscriber[] parents = new Subscriber[n]; + + // FIXME cheat until we have support from RxJava2 internals + Publisher p = new Publisher() { + int i; + + @SuppressWarnings("unchecked") + @Override + public void subscribe(Subscriber s) { + parents[i++] = (Subscriber)s; + } + }; + + FlowableFlatMap op = new FlowableFlatMap(p, mapper, delayError, maxConcurrency, prefetch); + + for (int i = 0; i < n; i++) { + + op.subscribe(subscribers[i]); +// FIXME needs a FlatMap subscriber +// parents[i] = FlowableFlatMap.createSubscriber(s, mapper, delayError, maxConcurrency, prefetch); + } + + source.subscribe(parents); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromArray.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromArray.java new file mode 100644 index 0000000000..a41939b92a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromArray.java @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import io.reactivex.parallel.ParallelFlowable; + +/** + * Wraps multiple Publishers into a ParallelFlowable which runs them + * in parallel. + * + * @param the value type + */ +public final class ParallelFromArray extends ParallelFlowable { + final Publisher[] sources; + + public ParallelFromArray(Publisher[] sources) { + this.sources = sources; + } + + @Override + public int parallelism() { + return sources.length; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + for (int i = 0; i < n; i++) { + sources[i].subscribe(subscribers[i]); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java new file mode 100644 index 0000000000..855bf4ac53 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java @@ -0,0 +1,435 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.parallel.ParallelFlowable; + +/** + * Dispatches the values from upstream in a round robin fashion to subscribers which are + * ready to consume elements. A value from upstream is sent to only one of the subscribers. + * + * @param the value type + */ +public final class ParallelFromPublisher extends ParallelFlowable { + final Publisher source; + + final int parallelism; + + final int prefetch; + + public ParallelFromPublisher(Publisher source, int parallelism, int prefetch) { + this.source = source; + this.parallelism = parallelism; + this.prefetch = prefetch; + } + + @Override + public int parallelism() { + return parallelism; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + source.subscribe(new ParallelDispatcher(subscribers, prefetch)); + } + + static final class ParallelDispatcher + extends AtomicInteger + implements Subscriber { + + + private static final long serialVersionUID = -4470634016609963609L; + + final Subscriber[] subscribers; + + final AtomicLongArray requests; + + final long[] emissions; + + final int prefetch; + + final int limit; + + Subscription s; + + SimpleQueue queue; + + Throwable error; + + volatile boolean done; + + int index; + + volatile boolean cancelled; + + /** + * Counts how many subscribers were setup to delay triggering the + * drain of upstream until all of them have been setup. + */ + final AtomicInteger subscriberCount = new AtomicInteger(); + + int produced; + + int sourceMode; + + ParallelDispatcher(Subscriber[] subscribers, int prefetch) { + this.subscribers = subscribers; + this.prefetch = prefetch; + this.limit = prefetch - (prefetch >> 2); + this.requests = new AtomicLongArray(subscribers.length); + this.emissions = new long[subscribers.length]; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription) s; + + int m = qs.requestFusion(QueueSubscription.ANY); + + if (m == QueueSubscription.SYNC) { + sourceMode = m; + queue = qs; + done = true; + setupSubscribers(); + drain(); + return; + } else + if (m == QueueSubscription.ASYNC) { + sourceMode = m; + queue = qs; + + setupSubscribers(); + + s.request(prefetch); + + return; + } + } + + queue = new SpscArrayQueue(prefetch); + + setupSubscribers(); + + s.request(prefetch); + } + } + + void setupSubscribers() { + final int m = subscribers.length; + + for (int i = 0; i < m; i++) { + if (cancelled) { + return; + } + final int j = i; + + subscriberCount.lazySet(i + 1); + + subscribers[i].onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + AtomicLongArray ra = requests; + for (;;) { + long r = ra.get(j); + if (r == Long.MAX_VALUE) { + return; + } + long u = BackpressureHelper.addCap(r, n); + if (ra.compareAndSet(j, r, u)) { + break; + } + } + if (subscriberCount.get() == m) { + drain(); + } + } + } + + @Override + public void cancel() { + ParallelDispatcher.this.cancel(); + } + }); + } + } + + @Override + public void onNext(T t) { + if (sourceMode == QueueSubscription.NONE) { + if (!queue.offer(t)) { + cancel(); + onError(new IllegalStateException("Queue is full?")); + return; + } + } + drain(); + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + void cancel() { + if (!cancelled) { + cancelled = true; + this.s.cancel(); + + if (getAndIncrement() == 0) { + queue.clear(); + } + } + } + + void drainAsync() { + int missed = 1; + + SimpleQueue q = queue; + Subscriber[] a = this.subscribers; + AtomicLongArray r = this.requests; + long[] e = this.emissions; + int n = e.length; + int idx = index; + int consumed = produced; + + for (;;) { + + int notReady = 0; + + for (;;) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + if (d) { + Throwable ex = error; + if (ex != null) { + q.clear(); + for (Subscriber s : a) { + s.onError(ex); + } + return; + } + } + + boolean empty = q.isEmpty(); + + if (d && empty) { + for (Subscriber s : a) { + s.onComplete(); + } + return; + } + + if (empty) { + break; + } + + long ridx = r.get(idx); + long eidx = e[idx]; + if (ridx != eidx) { + + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + for (Subscriber s : a) { + s.onError(ex); + } + return; + } + + if (v == null) { + break; + } + + a[idx].onNext(v); + + e[idx] = eidx + 1; + + int c = ++consumed; + if (c == limit) { + consumed = 0; + s.request(c); + } + notReady = 0; + } else { + notReady++; + } + + idx++; + if (idx == n) { + idx = 0; + } + + if (notReady == n) { + break; + } + } + + int w = get(); + if (w == missed) { + index = idx; + produced = consumed; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + + void drainSync() { + int missed = 1; + + SimpleQueue q = queue; + Subscriber[] a = this.subscribers; + AtomicLongArray r = this.requests; + long[] e = this.emissions; + int n = e.length; + int idx = index; + + for (;;) { + + int notReady = 0; + + for (;;) { + if (cancelled) { + q.clear(); + return; + } + + boolean empty; + + try { + empty = q.isEmpty(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + for (Subscriber s : a) { + s.onError(ex); + } + return; + } + + if (empty) { + for (Subscriber s : a) { + s.onComplete(); + } + return; + } + + long ridx = r.get(idx); + long eidx = e[idx]; + if (ridx != eidx) { + + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + for (Subscriber s : a) { + s.onError(ex); + } + return; + } + + if (v == null) { + for (Subscriber s : a) { + s.onComplete(); + } + return; + } + + a[idx].onNext(v); + + e[idx] = eidx + 1; + + notReady = 0; + } else { + notReady++; + } + + idx++; + if (idx == n) { + idx = 0; + } + + if (notReady == n) { + break; + } + } + + int w = get(); + if (w == missed) { + index = idx; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + if (sourceMode == QueueSubscription.SYNC) { + drainSync(); + } else { + drainAsync(); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java new file mode 100644 index 0000000000..29c5e6fd73 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java @@ -0,0 +1,368 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Merges the individual 'rails' of the source ParallelFlowable, unordered, + * into a single regular Publisher sequence (exposed as Px). + * + * @param the value type + */ +public final class ParallelJoin extends Flowable { + + final ParallelFlowable source; + + final int prefetch; + + public ParallelJoin(ParallelFlowable source, int prefetch) { + this.source = source; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + JoinSubscription parent = new JoinSubscription(s, source.parallelism(), prefetch); + s.onSubscribe(parent); + source.subscribe(parent.subscribers); + } + + static final class JoinSubscription + extends AtomicInteger + implements Subscription { + + private static final long serialVersionUID = 3100232009247827843L; + + final Subscriber actual; + + final JoinInnerSubscriber[] subscribers; + + final AtomicReference error = new AtomicReference(); + + final AtomicLong requested = new AtomicLong(); + + volatile boolean cancelled; + + final AtomicInteger done = new AtomicInteger(); + + JoinSubscription(Subscriber actual, int n, int prefetch) { + this.actual = actual; + @SuppressWarnings("unchecked") + JoinInnerSubscriber[] a = new JoinInnerSubscriber[n]; + + for (int i = 0; i < n; i++) { + a[i] = new JoinInnerSubscriber(this, prefetch); + } + + this.subscribers = a; + done.lazySet(n); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + + cancelAll(); + + if (getAndIncrement() == 0) { + cleanup(); + } + } + } + + void cancelAll() { + for (JoinInnerSubscriber s : subscribers) { + s.cancel(); + } + } + + void cleanup() { + for (JoinInnerSubscriber s : subscribers) { + s.queue = null; + } + } + + void onNext(JoinInnerSubscriber inner, T value) { + if (get() == 0 && compareAndSet(0, 1)) { + if (requested.get() != 0) { + actual.onNext(value); + if (requested.get() != Long.MAX_VALUE) { + requested.decrementAndGet(); + } + inner.request(1); + } else { + SimpleQueue q = inner.getQueue(); + + if (!q.offer(value)) { + cancelAll(); + Throwable mbe = new MissingBackpressureException("Queue full?!"); + if (error.compareAndSet(null, mbe)) { + actual.onError(mbe); + } else { + RxJavaPlugins.onError(mbe); + } + return; + } + } + if (decrementAndGet() == 0) { + return; + } + } else { + SimpleQueue q = inner.getQueue(); + + // FIXME overflow handling + q.offer(value); + + if (getAndIncrement() != 0) { + return; + } + } + + drainLoop(); + } + + void onError(Throwable e) { + if (error.compareAndSet(null, e)) { + cancelAll(); + drain(); + } else { + if (e != error.get()) { + RxJavaPlugins.onError(e); + } + } + } + + void onComplete() { + done.decrementAndGet(); + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + drainLoop(); + } + + void drainLoop() { + int missed = 1; + + JoinInnerSubscriber[] s = this.subscribers; + int n = s.length; + Subscriber a = this.actual; + + for (;;) { + + long r = requested.get(); + long e = 0; + + middle: + while (e != r) { + if (cancelled) { + cleanup(); + return; + } + + Throwable ex = error.get(); + if (ex != null) { + cleanup(); + a.onError(ex); + return; + } + + boolean d = done.get() == 0; + + boolean empty = true; + + for (int i = 0; i < n; i++) { + JoinInnerSubscriber inner = s[i]; + + SimplePlainQueue q = inner.queue; + if (q != null) { + T v = q.poll(); + + if (v != null) { + empty = false; + a.onNext(v); + inner.requestOne(); + if (++e == r) { + break middle; + } + } + } + } + + if (d && empty) { + a.onComplete(); + return; + } + + if (empty) { + break; + } + } + + if (e == r) { + if (cancelled) { + cleanup(); + return; + } + + Throwable ex = error.get(); + if (ex != null) { + cleanup(); + a.onError(ex); + return; + } + + boolean d = done.get() == 0; + + boolean empty = true; + + for (int i = 0; i < n; i++) { + JoinInnerSubscriber inner = s[i]; + + SimpleQueue q = inner.queue; + if (q != null && !q.isEmpty()) { + empty = false; + break; + } + } + + if (d && empty) { + a.onComplete(); + return; + } + } + + if (e != 0 && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + int w = get(); + if (w == missed) { + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + } + + static final class JoinInnerSubscriber + extends AtomicReference + implements Subscriber { + + + private static final long serialVersionUID = 8410034718427740355L; + + final JoinSubscription parent; + + final int prefetch; + + final int limit; + + long produced; + + volatile SimplePlainQueue queue; + + volatile boolean done; + + JoinInnerSubscriber(JoinSubscription parent, int prefetch) { + this.parent = parent; + this.prefetch = prefetch ; + this.limit = prefetch - (prefetch >> 2); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + parent.onNext(this, t); + } + + @Override + public void onError(Throwable t) { + parent.onError(t); + } + + @Override + public void onComplete() { + parent.onComplete(); + } + + public void requestOne() { + long p = produced + 1; + if (p == limit) { + produced = 0; + get().request(p); + } else { + produced = p; + } + } + + public void request(long n) { + long p = produced + n; + if (p >= limit) { + produced = 0; + get().request(p); + } else { + produced = p; + } + } + + public void cancel() { + SubscriptionHelper.cancel(this); + } + + SimplePlainQueue getQueue() { + SimplePlainQueue q = queue; + if (q == null) { + q = new SpscArrayQueue(prefetch); + this.queue = q; + } + return q; + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java new file mode 100644 index 0000000000..4d35001c52 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMap.java @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each 'rail' of the source ParallelFlowable with a mapper function. + * + * @param the input value type + * @param the output value type + */ +public final class ParallelMap extends ParallelFlowable { + + final ParallelFlowable source; + + final Function mapper; + + public ParallelMap(ParallelFlowable source, Function mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + parents[i] = new ParallelMapSubscriber(subscribers[i], mapper); + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelMapSubscriber implements Subscriber, Subscription { + + final Subscriber actual; + + final Function mapper; + + Subscription s; + + boolean done; + + ParallelMapSubscriber(Subscriber actual, Function mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + R v; + + try { + v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + + actual.onNext(v); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java new file mode 100644 index 0000000000..c1e8b881c7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelPeek.java @@ -0,0 +1,216 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute a Consumer in each 'rail' for the current element passing through. + * + * @param the value type + */ +public final class ParallelPeek extends ParallelFlowable { + + final ParallelFlowable source; + + final Consumer onNext; + final Consumer onAfterNext; + final Consumer onError; + final Action onComplete; + final Action onAfterTerminated; + final Consumer onSubscribe; + final LongConsumer onRequest; + final Action onCancel; + + public ParallelPeek(ParallelFlowable source, + Consumer onNext, + Consumer onAfterNext, + Consumer onError, + Action onComplete, + Action onAfterTerminated, + Consumer onSubscribe, + LongConsumer onRequest, + Action onCancel + ) { + this.source = source; + + this.onNext = ObjectHelper.requireNonNull(onNext, "onNext"); + this.onAfterNext = ObjectHelper.requireNonNull(onAfterNext, "onAfterNext"); + this.onError = ObjectHelper.requireNonNull(onError, "onError"); + this.onComplete = ObjectHelper.requireNonNull(onComplete, "onComplete"); + this.onAfterTerminated = ObjectHelper.requireNonNull(onAfterTerminated, "onAfterTerminated"); + this.onSubscribe = ObjectHelper.requireNonNull(onSubscribe, "onSubscribe"); + this.onRequest = ObjectHelper.requireNonNull(onRequest, "onRequest"); + this.onCancel = ObjectHelper.requireNonNull(onCancel, "onCancel"); + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + parents[i] = new ParallelPeekSubscriber(subscribers[i], this); + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelPeekSubscriber implements Subscriber, Subscription { + + final Subscriber actual; + + final ParallelPeek parent; + + Subscription s; + + boolean done; + + ParallelPeekSubscriber(Subscriber actual, ParallelPeek parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + public void request(long n) { + try { + parent.onRequest.accept(n); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + s.request(n); + } + + @Override + public void cancel() { + try { + parent.onCancel.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + try { + parent.onSubscribe.accept(s); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + actual.onSubscribe(EmptySubscription.INSTANCE); + onError(ex); + return; + } + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + + try { + parent.onNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + actual.onNext(t); + + try { + parent.onAfterNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + + try { + parent.onError.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + t = new CompositeException(t, ex); + } + actual.onError(t); + + try { + parent.onAfterTerminated.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + try { + parent.onComplete.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + actual.onComplete(); + + try { + parent.onAfterTerminated.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java new file mode 100644 index 0000000000..3b3e32234e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduce.java @@ -0,0 +1,174 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.Callable; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.subscribers.DeferredScalarSubscriber; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduce the sequence of values in each 'rail' to a single value. + * + * @param the input value type + * @param the result value type + */ +public final class ParallelReduce extends ParallelFlowable { + + final ParallelFlowable source; + + final Callable initialSupplier; + + final BiFunction reducer; + + public ParallelReduce(ParallelFlowable source, Callable initialSupplier, BiFunction reducer) { + this.source = source; + this.initialSupplier = initialSupplier; + this.reducer = reducer; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + + R initialValue; + + try { + initialValue = initialSupplier.call(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + reportError(subscribers, ex); + return; + } + + if (initialValue == null) { + reportError(subscribers, new NullPointerException("The initialSupplier returned a null value")); + return; + } + + parents[i] = new ParallelReduceSubscriber(subscribers[i], initialValue, reducer); + } + + source.subscribe(parents); + } + + void reportError(Subscriber[] subscribers, Throwable ex) { + for (Subscriber s : subscribers) { + EmptySubscription.error(ex, s); + } + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelReduceSubscriber extends DeferredScalarSubscriber { + + + private static final long serialVersionUID = 8200530050639449080L; + + final BiFunction reducer; + + R accumulator; + + boolean done; + + ParallelReduceSubscriber(Subscriber subscriber, R initialValue, BiFunction reducer) { + super(subscriber); + this.accumulator = initialValue; + this.reducer = reducer; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + + R v; + + try { + v = reducer.apply(accumulator, t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cancel(); + onError(ex); + return; + } + + if (v == null) { + cancel(); + onError(new NullPointerException("The reducer returned a null value")); + return; + } + + accumulator = v; + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + accumulator = null; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + + R a = accumulator; + accumulator = null; + complete(a); + } + + @Override + public void cancel() { + super.cancel(); + s.cancel(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java new file mode 100644 index 0000000000..020b62d7be --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelReduceFull.java @@ -0,0 +1,272 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduces all 'rails' into a single value which then gets reduced into a single + * Publisher sequence. + * + * @param the value type + */ +public final class ParallelReduceFull extends Flowable { + + final ParallelFlowable source; + + final BiFunction reducer; + + public ParallelReduceFull(ParallelFlowable source, BiFunction reducer) { + this.source = source; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(Subscriber s) { + ParallelReduceFullMainSubscriber parent = new ParallelReduceFullMainSubscriber(s, source.parallelism(), reducer); + s.onSubscribe(parent); + + source.subscribe(parent.subscribers); + } + + static final class ParallelReduceFullMainSubscriber extends DeferredScalarSubscription { + + + private static final long serialVersionUID = -5370107872170712765L; + + final ParallelReduceFullInnerSubscriber[] subscribers; + + final BiFunction reducer; + + final AtomicReference> current = new AtomicReference>(); + + final AtomicInteger remaining = new AtomicInteger(); + + final AtomicBoolean once = new AtomicBoolean(); + + ParallelReduceFullMainSubscriber(Subscriber subscriber, int n, BiFunction reducer) { + super(subscriber); + @SuppressWarnings("unchecked") + ParallelReduceFullInnerSubscriber[] a = new ParallelReduceFullInnerSubscriber[n]; + for (int i = 0; i < n; i++) { + a[i] = new ParallelReduceFullInnerSubscriber(this, reducer); + } + this.subscribers = a; + this.reducer = reducer; + remaining.lazySet(n); + } + + SlotPair addValue(T value) { + for (;;) { + SlotPair curr = current.get(); + + if (curr == null) { + curr = new SlotPair(); + if (!current.compareAndSet(null, curr)) { + continue; + } + } + + int c = curr.tryAcquireSlot(); + if (c < 0) { + current.compareAndSet(curr, null); + continue; + } + if (c == 0) { + curr.first = value; + } else { + curr.second = value; + } + + if (curr.releaseSlot()) { + current.compareAndSet(curr, null); + return curr; + } + return null; + } + } + + @Override + public void cancel() { + for (ParallelReduceFullInnerSubscriber inner : subscribers) { + inner.cancel(); + } + } + + void innerError(Throwable ex) { + if (once.compareAndSet(false, true)) { + cancel(); + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + } + + void innerComplete(T value) { + if (value != null) { + for (;;) { + SlotPair sp = addValue(value); + + if (sp != null) { + + try { + value = reducer.apply(sp.first, sp.second); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + innerError(ex); + return; + } + + if (value == null) { + innerError(new NullPointerException("The reducer returned a null value")); + return; + } + } else { + break; + } + } + } + + if (remaining.decrementAndGet() == 0) { + SlotPair sp = current.get(); + current.lazySet(null); + + if (sp != null) { + complete(sp.first); + } else { + actual.onComplete(); + } + } + } + } + + static final class ParallelReduceFullInnerSubscriber + extends AtomicReference + implements Subscriber { + + private static final long serialVersionUID = -7954444275102466525L; + + final ParallelReduceFullMainSubscriber parent; + + final BiFunction reducer; + + T value; + + boolean done; + + ParallelReduceFullInnerSubscriber(ParallelReduceFullMainSubscriber parent, BiFunction reducer) { + this.parent = parent; + this.reducer = reducer; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + T v = value; + + if (v == null) { + value = t; + } else { + + try { + v = ObjectHelper.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + get().cancel(); + onError(ex); + return; + } + + value = v; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + parent.innerError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + parent.innerComplete(value); + } + + void cancel() { + SubscriptionHelper.cancel(this); + } + } + + static final class SlotPair { + + T first; + + T second; + + volatile int acquireIndex; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ACQ = + AtomicIntegerFieldUpdater.newUpdater(SlotPair.class, "acquireIndex"); + + + volatile int releaseIndex; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater REL = + AtomicIntegerFieldUpdater.newUpdater(SlotPair.class, "releaseIndex"); + + int tryAcquireSlot() { + for (;;) { + int acquired = acquireIndex; + if (acquired >= 2) { + return -1; + } + + if (ACQ.compareAndSet(this, acquired, acquired + 1)) { + return acquired; + } + } + } + + boolean releaseSlot() { + return REL.incrementAndGet(this) == 2; + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java new file mode 100644 index 0000000000..4582ac50e4 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java @@ -0,0 +1,287 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Scheduler; +import io.reactivex.Scheduler.Worker; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Ensures each 'rail' from upstream runs on a Worker from a Scheduler. + * + * @param the value type + */ +public final class ParallelRunOn extends ParallelFlowable { + final ParallelFlowable source; + + final Scheduler scheduler; + + final int prefetch; + + public ParallelRunOn(ParallelFlowable parent, + Scheduler scheduler, int prefetch) { + this.source = parent; + this.scheduler = scheduler; + this.prefetch = prefetch; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + int prefetch = this.prefetch; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + + Worker w = scheduler.createWorker(); + SpscArrayQueue q = new SpscArrayQueue(prefetch); + + RunOnSubscriber parent = new RunOnSubscriber(a, prefetch, q, w); + parents[i] = parent; + } + + source.subscribe(parents); + } + + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class RunOnSubscriber + extends AtomicInteger + implements Subscriber, Subscription, Runnable { + + + private static final long serialVersionUID = 1075119423897941642L; + + final Subscriber actual; + + final int prefetch; + + final int limit; + + final SpscArrayQueue queue; + + final Worker worker; + + Subscription s; + + volatile boolean done; + + Throwable error; + + final AtomicLong requested = new AtomicLong(); + + volatile boolean cancelled; + + int consumed; + + RunOnSubscriber(Subscriber actual, int prefetch, SpscArrayQueue queue, Worker worker) { + this.actual = actual; + this.prefetch = prefetch; + this.queue = queue; + this.limit = prefetch - (prefetch >> 2); + this.worker = worker; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + if (!queue.offer(t)) { + onError(new IllegalStateException("Queue is full?!")); + return; + } + schedule(); + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + error = t; + done = true; + schedule(); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + schedule(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + schedule(); + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + s.cancel(); + worker.dispose(); + + if (getAndIncrement() == 0) { + queue.clear(); + } + } + } + + void schedule() { + if (getAndIncrement() == 0) { + worker.schedule(this); + } + } + + @Override + public void run() { + int missed = 1; + int c = consumed; + SpscArrayQueue q = queue; + Subscriber a = actual; + int lim = limit; + + for (;;) { + + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + q.clear(); + return; + } + + boolean d = done; + + if (d) { + Throwable ex = error; + if (ex != null) { + q.clear(); + + a.onError(ex); + + worker.dispose(); + return; + } + } + + T v = q.poll(); + + boolean empty = v == null; + + if (d && empty) { + a.onComplete(); + + worker.dispose(); + return; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + + int p = ++c; + if (p == lim) { + c = 0; + s.request(p); + } + } + + if (e == r) { + if (cancelled) { + q.clear(); + return; + } + + if (done) { + Throwable ex = error; + if (ex != null) { + q.clear(); + + a.onError(ex); + + worker.dispose(); + return; + } + if (q.isEmpty()) { + a.onComplete(); + + worker.dispose(); + return; + } + } + } + + if (e != 0L && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + int w = get(); + if (w == missed) { + consumed = c; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java new file mode 100644 index 0000000000..e0b55be161 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelSortedJoin.java @@ -0,0 +1,289 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Given sorted rail sequences (according to the provided comparator) as List + * emit the smallest item from these parallel Lists to the Subscriber. + *

+ * It expects the source to emit exactly one list (which could be empty). + * + * @param the value type + */ +public final class ParallelSortedJoin extends Flowable { + + final ParallelFlowable> source; + + final Comparator comparator; + + public ParallelSortedJoin(ParallelFlowable> source, Comparator comparator) { + this.source = source; + this.comparator = comparator; + } + + @Override + protected void subscribeActual(Subscriber s) { + SortedJoinSubscription parent = new SortedJoinSubscription(s, source.parallelism(), comparator); + s.onSubscribe(parent); + + source.subscribe(parent.subscribers); + } + + static final class SortedJoinSubscription + extends AtomicInteger + implements Subscription { + + private static final long serialVersionUID = 3481980673745556697L; + + final Subscriber actual; + + final SortedJoinInnerSubscriber[] subscribers; + + final List[] lists; + + final int[] indexes; + + final Comparator comparator; + + final AtomicLong requested = new AtomicLong(); + + volatile boolean cancelled; + + final AtomicInteger remaining = new AtomicInteger(); + + final AtomicThrowable error = new AtomicThrowable(); + + @SuppressWarnings("unchecked") + SortedJoinSubscription(Subscriber actual, int n, Comparator comparator) { + this.actual = actual; + this.comparator = comparator; + + SortedJoinInnerSubscriber[] s = new SortedJoinInnerSubscriber[n]; + + for (int i = 0; i < n; i++) { + s[i] = new SortedJoinInnerSubscriber(this, i); + } + this.subscribers = s; + this.lists = new List[n]; + this.indexes = new int[n]; + remaining.lazySet(n); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + if (remaining.get() == 0) { + drain(); + } + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + cancelAll(); + if (getAndIncrement() == 0) { + Arrays.fill(lists, null); + } + } + } + + void cancelAll() { + for (SortedJoinInnerSubscriber s : subscribers) { + s.cancel(); + } + } + + void innerNext(List value, int index) { + lists[index] = value; + if (remaining.decrementAndGet() == 0) { + drain(); + } + } + + void innerError(Throwable e) { + if (error.addThrowable(e)) { + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber a = actual; + List[] lists = this.lists; + int[] indexes = this.indexes; + int n = indexes.length; + + for (;;) { + + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + Arrays.fill(lists, null); + return; + } + + Throwable ex = error.get(); + if (ex != null) { + cancelAll(); + Arrays.fill(lists, null); + a.onError(error.terminate()); + return; + } + + T min = null; + int minIndex = -1; + + for (int i = 0; i < n; i++) { + List list = lists[i]; + int index = indexes[i]; + + if (list.size() != index) { + if (min == null) { + min = list.get(index); + minIndex = i; + } else { + T b = list.get(index); + if (comparator.compare(min, b) > 0) { + min = b; + minIndex = i; + } + } + } + } + + if (min == null) { + Arrays.fill(lists, null); + a.onComplete(); + return; + } + + a.onNext(min); + + indexes[minIndex]++; + + e++; + } + + if (e == r) { + if (cancelled) { + Arrays.fill(lists, null); + return; + } + + Throwable ex = error.get(); + if (ex != null) { + cancelAll(); + Arrays.fill(lists, null); + a.onError(error.terminate()); + return; + } + + boolean empty = true; + + for (int i = 0; i < n; i++) { + if (indexes[i] != lists[i].size()) { + empty = false; + break; + } + } + + if (empty) { + Arrays.fill(lists, null); + a.onComplete(); + return; + } + } + + if (e != 0 && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + int w = get(); + if (w == missed) { + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + } + + static final class SortedJoinInnerSubscriber + extends AtomicReference + implements Subscriber> { + + + private static final long serialVersionUID = 6751017204873808094L; + + final SortedJoinSubscription parent; + + final int index; + + SortedJoinInnerSubscriber(SortedJoinSubscription parent, int index) { + this.parent = parent; + this.index = index; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(List t) { + parent.innerNext(t, index); + } + + @Override + public void onError(Throwable t) { + parent.innerError(t); + } + + @Override + public void onComplete() { + // ignored + } + + void cancel() { + SubscriptionHelper.cancel(this); + } + } +} diff --git a/src/main/java/io/reactivex/internal/util/ListAddBiConsumer.java b/src/main/java/io/reactivex/internal/util/ListAddBiConsumer.java new file mode 100644 index 0000000000..787c5efbe2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/ListAddBiConsumer.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.List; + +import io.reactivex.functions.*; + +@SuppressWarnings("rawtypes") +public enum ListAddBiConsumer implements BiFunction { + INSTANCE; + + @SuppressWarnings("unchecked") + public static BiFunction, T, List> instance() { + return (BiFunction)INSTANCE; + } + + @SuppressWarnings("unchecked") + @Override + public List apply(List t1, Object t2) throws Exception { + t1.add(t2); + return t1; + } +} diff --git a/src/main/java/io/reactivex/internal/util/MergerBiFunction.java b/src/main/java/io/reactivex/internal/util/MergerBiFunction.java new file mode 100644 index 0000000000..f4fc6a7119 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/MergerBiFunction.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.*; + +import io.reactivex.functions.BiFunction; + +/** + * A BiFunction that merges two Lists into a new list. + * @param the value type + */ +public final class MergerBiFunction implements BiFunction, List, List> { + + Comparator comparator; + + public MergerBiFunction(Comparator comparator) { + this.comparator = comparator; + } + + @Override + public List apply(List a, List b) throws Exception { + int n = a.size() + b.size(); + if (n == 0) { + return new ArrayList(); + } + List both = new ArrayList(n); + + Iterator at = a.iterator(); + Iterator bt = b.iterator(); + + T s1 = at.hasNext() ? at.next() : null; + T s2 = bt.hasNext() ? bt.next() : null; + + while (s1 != null && s2 != null) { + if (comparator.compare(s1, s2) < 0) { // s1 comes before s2 + both.add(s1); + s1 = at.hasNext() ? at.next() : null; + } else { + both.add(s2); + s2 = bt.hasNext() ? bt.next() : null; + } + } + + if (s1 != null) { + both.add(s1); + while (at.hasNext()) { + both.add(at.next()); + } + } else + if (s2 != null) { + both.add(s2); + while (bt.hasNext()) { + both.add(bt.next()); + } + } + + return both; + } +} diff --git a/src/main/java/io/reactivex/internal/util/SorterFunction.java b/src/main/java/io/reactivex/internal/util/SorterFunction.java new file mode 100644 index 0000000000..da8457909a --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/SorterFunction.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.*; + +import io.reactivex.functions.Function; + +public final class SorterFunction implements Function, List> { + + Comparator comparator; + + public SorterFunction(Comparator comparator) { + this.comparator = comparator; + } + + @Override + public List apply(List t) throws Exception { + Collections.sort(t, comparator); + return t; + } +} diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java new file mode 100644 index 0000000000..f5ae690975 --- /dev/null +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -0,0 +1,701 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import java.util.*; +import java.util.concurrent.Callable; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.*; +import io.reactivex.internal.operators.parallel.*; +import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Abstract base class for Parallel publishers that take an array of Subscribers. + *

+ * Use {@code from()} to start processing a regular Publisher in 'rails'. + * Use {@code runOn()} to introduce where each 'rail' shoud run on thread-vise. + * Use {@code sequential()} to merge the sources back into a single Flowable. + * + * @param the value type + * @since 2.0.5 - experimental + */ +@Experimental +public abstract class ParallelFlowable { + + /** + * Subscribes an array of Subscribers to this ParallelFlowable and triggers + * the execution chain for all 'rails'. + * + * @param subscribers the subscribers array to run in parallel, the number + * of items must be equal to the parallelism level of this ParallelFlowable + * @see #parallelism() + */ + public abstract void subscribe(Subscriber[] subscribers); + + /** + * Returns the number of expected parallel Subscribers. + * @return the number of expected parallel Subscribers + */ + public abstract int parallelism(); + + /** + * Validates the number of subscribers and returns true if their number + * matches the parallelism level of this ParallelFlowable. + * + * @param subscribers the array of Subscribers + * @return true if the number of subscribers equals to the parallelism level + */ + protected final boolean validate(Subscriber[] subscribers) { + int p = parallelism(); + if (subscribers.length != p) { + for (Subscriber s : subscribers) { + EmptySubscription.error(new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length), s); + } + return false; + } + return true; + } + + /** + * Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs) + * in a round-robin fashion. + * @param the value type + * @param source the source Publisher + * @return the ParallelFlowable instance + */ + @CheckReturnValue + public static ParallelFlowable from(Publisher source) { + return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize()); + } + + /** + * Take a Publisher and prepare to consume it on parallallism number of 'rails' in a round-robin fashion. + * @param the value type + * @param source the source Publisher + * @param parallelism the number of parallel rails + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public static ParallelFlowable from(Publisher source, int parallelism) { + return from(source, parallelism, Flowable.bufferSize()); + } + + /** + * Take a Publisher and prepare to consume it on parallallism number of 'rails' , + * possibly ordered and round-robin fashion and use custom prefetch amount and queue + * for dealing with the source Publisher's values. + * @param the value type + * @param source the source Publisher + * @param parallelism the number of parallel rails + * @param prefetch the number of values to prefetch from the source + * the source until there is a rail ready to process it. + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public static ParallelFlowable from(Publisher source, + int parallelism, int prefetch) { + ObjectHelper.requireNonNull(source, "source"); + ObjectHelper.verifyPositive(parallelism, "parallelism"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + + return new ParallelFromPublisher(source, parallelism, prefetch); + } + + /** + * Maps the source values on each 'rail' to another value. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + * @param the output value type + * @param mapper the mapper function turning Ts into Us. + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable map(Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper"); + return new ParallelMap(this, mapper); + } + + /** + * Filters the source values on each 'rail'. + *

+ * Note that the same predicate may be called from multiple threads concurrently. + * @param predicate the function returning true to keep a value or false to drop a value + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable filter(Predicate predicate) { + ObjectHelper.requireNonNull(predicate, "predicate"); + return new ParallelFilter(this, predicate); + } + + /** + * Specifies where each 'rail' will observe its incoming values with + * no work-stealing and default prefetch amount. + *

+ * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}. + *

+ * The operator will call {@code Scheduler.createWorker()} as many + * times as this ParallelFlowable's parallelism level is. + *

+ * No assumptions are made about the Scheduler's parallelism level, + * if the Scheduler's parallelism level is lower than the ParallelFlowable's, + * some rails may end up on the same thread/worker. + *

+ * This operator doesn't require the Scheduler to be trampolining as it + * does its own built-in trampolining logic. + * + * @param scheduler the scheduler to use + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable runOn(Scheduler scheduler) { + return runOn(scheduler, Flowable.bufferSize()); + } + + /** + * Specifies where each 'rail' will observe its incoming values with + * possibly work-stealing and a given prefetch amount. + *

+ * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}. + *

+ * The operator will call {@code Scheduler.createWorker()} as many + * times as this ParallelFlowable's parallelism level is. + *

+ * No assumptions are made about the Scheduler's parallelism level, + * if the Scheduler's parallelism level is lower than the ParallelFlowable's, + * some rails may end up on the same thread/worker. + *

+ * This operator doesn't require the Scheduler to be trampolining as it + * does its own built-in trampolining logic. + * + * @param scheduler the scheduler to use + * that rail's worker has run out of work. + * @param prefetch the number of values to request on each 'rail' from the source + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable runOn(Scheduler scheduler, int prefetch) { + ObjectHelper.requireNonNull(scheduler, "scheduler"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return new ParallelRunOn(this, scheduler, prefetch); + } + + /** + * Reduces all values within a 'rail' and across 'rails' with a reducer function into a single + * sequential value. + *

+ * Note that the same reducer function may be called from multiple threads concurrently. + * @param reducer the function to reduce two values into one. + * @return the new Flowable instance emitting the reduced value or empty if the ParallelFlowable was empty + */ + @CheckReturnValue + public final Flowable reduce(BiFunction reducer) { + ObjectHelper.requireNonNull(reducer, "reducer"); + return RxJavaPlugins.onAssembly(new ParallelReduceFull(this, reducer)); + } + + /** + * Reduces all values within a 'rail' to a single value (with a possibly different type) via + * a reducer function that is initialized on each rail from an initialSupplier value. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + * @param the reduced output type + * @param initialSupplier the supplier for the initial value + * @param reducer the function to reduce a previous output of reduce (or the initial value supplied) + * with a current source value. + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable reduce(Callable initialSupplier, BiFunction reducer) { + ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); + ObjectHelper.requireNonNull(reducer, "reducer"); + return new ParallelReduce(this, initialSupplier, reducer); + } + + /** + * Merges the values from each 'rail' in a round-robin or same-order fashion and + * exposes it as a regular Publisher sequence, running with a default prefetch value + * for the rails. + *

+ * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}. + * + *

+ *
Backpressure:
+ *
The operator honors backpressure.
+ *
Scheduler:
+ *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Flowable instance + * @see ParallelFlowable#sequential(int) + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + public final Flowable sequential() { + return sequential(Flowable.bufferSize()); + } + + /** + * Merges the values from each 'rail' in a round-robin or same-order fashion and + * exposes it as a regular Publisher sequence, running with a give prefetch value + * for the rails. + * + *
+ *
Backpressure:
+ *
The operator honors backpressure.
+ *
Scheduler:
+ *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param prefetch the prefetch amount to use for each rail + * @return the new Flowable instance + * @see ParallelFlowable#sequential() + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + public final Flowable sequential(int prefetch) { + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch)); + } + + /** + * Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially + * picks the smallest next value from the rails. + *

+ * This operator requires a finite source ParallelFlowable. + * + * @param comparator the comparator to use + * @return the new Flowable instance + */ + @CheckReturnValue + public final Flowable sorted(Comparator comparator) { + return sorted(comparator, 16); + } + + /** + * Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially + * picks the smallest next value from the rails. + *

+ * This operator requires a finite source ParallelFlowable. + * + * @param comparator the comparator to use + * @param capacityHint the expected number of total elements + * @return the new Flowable instance + */ + @CheckReturnValue + public final Flowable sorted(Comparator comparator, int capacityHint) { + int ch = capacityHint / parallelism() + 1; + ParallelFlowable> railReduced = reduce(Functions.createArrayList(ch), ListAddBiConsumer.instance()); + ParallelFlowable> railSorted = railReduced.map(new SorterFunction(comparator)); + + return RxJavaPlugins.onAssembly(new ParallelSortedJoin(railSorted, comparator)); + } + + /** + * Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher. + *

+ * This operator requires a finite source ParallelFlowable. + * + * @param comparator the comparator to compare elements + * @return the new Px instannce + */ + @CheckReturnValue + public final Flowable> toSortedList(Comparator comparator) { + return toSortedList(comparator, 16); + } + /** + * Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher. + *

+ * This operator requires a finite source ParallelFlowable. + * + * @param comparator the comparator to compare elements + * @param capacityHint the expected number of total elements + * @return the new Px instannce + */ + @CheckReturnValue + public final Flowable> toSortedList(Comparator comparator, int capacityHint) { + int ch = capacityHint / parallelism() + 1; + ParallelFlowable> railReduced = reduce(Functions.createArrayList(ch), ListAddBiConsumer.instance()); + ParallelFlowable> railSorted = railReduced.map(new SorterFunction(comparator)); + + Flowable> merged = railSorted.reduce(new MergerBiFunction(comparator)); + + return RxJavaPlugins.onAssembly(merged); + } + + /** + * Call the specified consumer with the current element passing through any 'rail'. + * + * @param onNext the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnNext(Consumer onNext) { + return new ParallelPeek(this, + onNext, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Call the specified consumer with the current element passing through any 'rail' + * after it has been delivered to downstream within the rail. + * + * @param onAfterNext the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doAfterNext(Consumer onAfterNext) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + onAfterNext, + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Call the specified consumer with the exception passing through any 'rail'. + * + * @param onError the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnError(Consumer onError) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + onError, + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Run the specified Action when a 'rail' completes. + * + * @param onComplete the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnComplete(Action onComplete) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.emptyConsumer(), + onComplete, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Run the specified Action when a 'rail' completes or signals an error. + * + * @param onAfterTerminate the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + onAfterTerminate, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Call the specified callback when a 'rail' receives a Subscription from its upstream. + * + * @param onSubscribe the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnSubscribe(Consumer onSubscribe) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + onSubscribe, + Functions.EMPTY_LONG_CONSUMER, + Functions.EMPTY_ACTION + ); + } + + /** + * Call the specified consumer with the request amount if any rail receives a request. + * + * @param onRequest the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnRequest(LongConsumer onRequest) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + onRequest, + Functions.EMPTY_ACTION + ); + } + + /** + * Run the specified Action when a 'rail' receives a cancellation. + * + * @param onCancel the callback + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable doOnCancel(Action onCancel) { + return new ParallelPeek(this, + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, + Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + onCancel + ); + } + + /** + * Collect the elements in each rail into a collection supplied via a collectionSupplier + * and collected into with a collector action, emitting the collection at the end. + * + * @param the collection type + * @param collectionSupplier the supplier of the collection in each rail + * @param collector the collector, taking the per-rali collection and the current item + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable collect(Callable collectionSupplier, BiConsumer collector) { + return new ParallelCollect(this, collectionSupplier, collector); + } + + /** + * Wraps multiple Publishers into a ParallelFlowable which runs them + * in parallel and unordered. + * + * @param the value type + * @param publishers the array of publishers + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public static ParallelFlowable fromArray(Publisher... publishers) { + if (publishers.length == 0) { + throw new IllegalArgumentException("Zero publishers not supported"); + } + return new ParallelFromArray(publishers); + } + + /** + * Perform a fluent transformation to a value via a converter function which + * receives this ParallelFlowable. + * + * @param the output value type + * @param converter the converter function from ParallelFlowable to some type + * @return the value returned by the converter function + */ + @CheckReturnValue + public final U to(Function, U> converter) { + try { + return converter.apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + /** + * Allows composing operators, in assembly time, on top of this ParallelFlowable + * and returns another ParallelFlowable with composed features. + * + * @param the output value type + * @param composer the composer function from ParallelFlowable (this) to another ParallelFlowable + * @return the ParallelFlowable returned by the function + */ + @CheckReturnValue + public final ParallelFlowable compose(Function, ParallelFlowable> composer) { + return to(composer); + } + + /** + * Generates and flattens Publishers on each 'rail'. + *

+ * Errors are not delayed and uses unbounded concurrency along with default inner prefetch. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable flatMap(Function> mapper) { + return flatMap(mapper, false, Integer.MAX_VALUE, Flowable.bufferSize()); + } + + /** + * Generates and flattens Publishers on each 'rail', optionally delaying errors. + *

+ * It uses unbounded concurrency along with default inner prefetch. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param delayError should the errors from the main and the inner sources delayed till everybody terminates? + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable flatMap( + Function> mapper, boolean delayError) { + return flatMap(mapper, delayError, Integer.MAX_VALUE, Flowable.bufferSize()); + } + + /** + * Generates and flattens Publishers on each 'rail', optionally delaying errors + * and having a total number of simultaneous subscriptions to the inner Publishers. + *

+ * It uses a default inner prefetch. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param delayError should the errors from the main and the inner sources delayed till everybody terminates? + * @param maxConcurrency the maximum number of simultaneous subscriptions to the generated inner Publishers + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable flatMap( + Function> mapper, boolean delayError, int maxConcurrency) { + return flatMap(mapper, delayError, maxConcurrency, Flowable.bufferSize()); + } + + /** + * Generates and flattens Publishers on each 'rail', optionally delaying errors, + * having a total number of simultaneous subscriptions to the inner Publishers + * and using the given prefetch amount for the inner Publishers. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param delayError should the errors from the main and the inner sources delayed till everybody terminates? + * @param maxConcurrency the maximum number of simultaneous subscriptions to the generated inner Publishers + * @param prefetch the number of items to prefetch from each inner Publisher + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable flatMap( + Function> mapper, + boolean delayError, int maxConcurrency, int prefetch) { + return new ParallelFlatMap(this, mapper, delayError, maxConcurrency, prefetch); + } + + /** + * Generates and concatenates Publishers on each 'rail', signalling errors immediately + * and generating 2 publishers upfront. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * source and the inner Publishers (immediate, boundary, end) + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable concatMap( + Function> mapper) { + return concatMap(mapper, 2); + } + + /** + * Generates and concatenates Publishers on each 'rail', signalling errors immediately + * and using the given prefetch amount for generating Publishers upfront. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param prefetch the number of items to prefetch from each inner Publisher + * source and the inner Publishers (immediate, boundary, end) + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable concatMap( + Function> mapper, + int prefetch) { + return new ParallelConcatMap(this, mapper, prefetch, ErrorMode.IMMEDIATE); + } + + /** + * Generates and concatenates Publishers on each 'rail', optionally delaying errors + * and generating 2 publishers upfront. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param tillTheEnd if true all errors from the upstream and inner Publishers are delayed + * till all of them terminate, if false, the error is emitted when an inner Publisher terminates. + * source and the inner Publishers (immediate, boundary, end) + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable concatMapDelayError( + Function> mapper, + boolean tillTheEnd) { + return concatMapDelayError(mapper, 2, tillTheEnd); + } + + /** + * Generates and concatenates Publishers on each 'rail', optionally delaying errors + * and using the given prefetch amount for generating Publishers upfront. + * + * @param the result type + * @param mapper the function to map each rail's value into a Publisher + * @param prefetch the number of items to prefetch from each inner Publisher + * @param tillTheEnd if true all errors from the upstream and inner Publishers are delayed + * till all of them terminate, if false, the error is emitted when an inner Publisher terminates. + * @return the new ParallelFlowable instance + */ + @CheckReturnValue + public final ParallelFlowable concatMapDelayError( + Function> mapper, + int prefetch, boolean tillTheEnd) { + return new ParallelConcatMap(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY); + } +} diff --git a/src/main/java/io/reactivex/parallel/package-info.java b/src/main/java/io/reactivex/parallel/package-info.java new file mode 100644 index 0000000000..3e4105ea51 --- /dev/null +++ b/src/main/java/io/reactivex/parallel/package-info.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Base type for the parallel type offering a sub-DSL for working with Flowable items + * in parallel. + */ +package io.reactivex.parallel; \ No newline at end of file diff --git a/src/perf/java/io/reactivex/parallel/ParallelPerf.java b/src/perf/java/io/reactivex/parallel/ParallelPerf.java new file mode 100644 index 0000000000..1630e82eef --- /dev/null +++ b/src/perf/java/io/reactivex/parallel/ParallelPerf.java @@ -0,0 +1,112 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.flowables.GroupedFlowable; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1,jvmArgsAppend = { "-XX:MaxInlineLevel=20" }) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class ParallelPerf implements Function { + + @Param({"10000"}) + public int count; + + @Param({"1", "10", "100", "1000", "10000"}) + public int compute; + + @Param({"1", "2", "3", "4"}) + public int parallelism; + + Flowable flatMap; + + Flowable groupBy; + + Flowable parallel; + + @Override + public Integer apply(Integer t) throws Exception { + Blackhole.consumeCPU(compute); + return t; + } + + @Setup + public void setup() { + + final int cpu = parallelism; + + Integer[] ints = new Integer[count]; + Arrays.fill(ints, 777); + + Flowable source = Flowable.fromArray(ints); + + flatMap = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.just(v).subscribeOn(Schedulers.computation()) + .map(ParallelPerf.this); + } + }, cpu); + + groupBy = source.groupBy(new Function() { + int i; + @Override + public Integer apply(Integer v) throws Exception { + return (i++) % cpu; + } + }) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Exception { + return g.observeOn(Schedulers.computation()).map(ParallelPerf.this); + } + }); + + parallel = source.parallel(cpu).runOn(Schedulers.computation()).map(this).sequential(); + } + + void subscribe(Flowable f, Blackhole bh) { + PerfAsyncConsumer consumer = new PerfAsyncConsumer(bh); + f.subscribe(consumer); + consumer.await(count); + } + + @Benchmark + public void flatMap(Blackhole bh) { + subscribe(flatMap, bh); + } + + @Benchmark + public void groupBy(Blackhole bh) { + subscribe(groupBy, bh); + } + + @Benchmark + public void parallel(Blackhole bh) { + subscribe(parallel, bh); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/TransformerTest.java b/src/test/java/io/reactivex/TransformerTest.java index d554dfc056..a3c7fc8d2c 100644 --- a/src/test/java/io/reactivex/TransformerTest.java +++ b/src/test/java/io/reactivex/TransformerTest.java @@ -126,8 +126,8 @@ public void flowableGenericsSignatureTest() { Flowable.just(a).compose(TransformerTest.testFlowableTransformerCreator()); } - interface A {} - interface B {} + interface A { } + interface B { } private static ObservableTransformer, B> testObservableTransformerCreator() { return new ObservableTransformer, B>() { diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java new file mode 100644 index 0000000000..7fe6266206 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -0,0 +1,1290 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; + +import org.junit.*; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelFlowableTest { + + @Test + public void sequentialMode() { + Flowable source = Flowable.range(1, 1000000).hide(); + for (int i = 1; i < 33; i++) { + Flowable result = ParallelFlowable.from(source, i) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }) + .sequential() + ; + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + ts + .assertSubscribed() + .assertValueCount(1000000) + .assertComplete() + .assertNoErrors() + ; + } + + } + + @Test + public void sequentialModeFused() { + Flowable source = Flowable.range(1, 1000000); + for (int i = 1; i < 33; i++) { + Flowable result = ParallelFlowable.from(source, i) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }) + .sequential() + ; + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + ts + .assertSubscribed() + .assertValueCount(1000000) + .assertComplete() + .assertNoErrors() + ; + } + + } + + @Test + public void parallelMode() { + Flowable source = Flowable.range(1, 1000000).hide(); + int ncpu = Math.max(8, Runtime.getRuntime().availableProcessors()); + for (int i = 1; i < ncpu + 1; i++) { + + ExecutorService exec = Executors.newFixedThreadPool(i); + + Scheduler scheduler = Schedulers.from(exec); + + try { + Flowable result = ParallelFlowable.from(source, i) + .runOn(scheduler) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }) + .sequential() + ; + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + ts.awaitDone(10, TimeUnit.SECONDS); + + ts + .assertSubscribed() + .assertValueCount(1000000) + .assertComplete() + .assertNoErrors() + ; + } finally { + exec.shutdown(); + } + } + + } + + @Test + public void parallelModeFused() { + Flowable source = Flowable.range(1, 1000000); + int ncpu = Math.max(8, Runtime.getRuntime().availableProcessors()); + for (int i = 1; i < ncpu + 1; i++) { + + ExecutorService exec = Executors.newFixedThreadPool(i); + + Scheduler scheduler = Schedulers.from(exec); + + try { + Flowable result = ParallelFlowable.from(source, i) + .runOn(scheduler) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }) + .sequential() + ; + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + ts.awaitDone(10, TimeUnit.SECONDS); + + ts + .assertSubscribed() + .assertValueCount(1000000) + .assertComplete() + .assertNoErrors() + ; + } finally { + exec.shutdown(); + } + } + + } + + @Test + public void reduceFull() { + for (int i = 1; i <= Runtime.getRuntime().availableProcessors() * 2; i++) { + TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, 10) + .parallel(i) + .reduce(new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .subscribe(ts); + + ts.assertResult(55); + } + } + + @Test + public void parallelReduceFull() { + int m = 100000; + for (int n = 1; n <= m; n *= 10) { +// System.out.println(n); + for (int i = 1; i <= Runtime.getRuntime().availableProcessors(); i++) { +// System.out.println(" " + i); + + ExecutorService exec = Executors.newFixedThreadPool(i); + + Scheduler scheduler = Schedulers.from(exec); + + try { + TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, n) + .map(new Function() { + @Override + public Long apply(Integer v) throws Exception { + return (long)v; + } + }) + .parallel(i) + .runOn(scheduler) + .reduce(new BiFunction() { + @Override + public Long apply(Long a, Long b) throws Exception { + return a + b; + } + }) + .subscribe(ts); + + ts.awaitDone(500, TimeUnit.SECONDS); + + long e = ((long)n) * (1 + n) / 2; + + ts.assertResult(e); + } finally { + exec.shutdown(); + } + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void toSortedList() { + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.fromArray(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + .parallel() + .toSortedList(Functions.naturalComparator()) + .subscribe(ts); + + ts.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + } + + @Test + public void sorted() { + TestSubscriber ts = new TestSubscriber(0); + + Flowable.fromArray(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + .parallel() + .sorted(Functions.naturalComparator()) + .subscribe(ts); + + ts.assertNoValues(); + + ts.request(2); + + ts.assertValues(1, 2); + + ts.request(5); + + ts.assertValues(1, 2, 3, 4, 5, 6, 7); + + ts.request(3); + + ts.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void collect() { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + + TestSubscriber ts = new TestSubscriber(); + Flowable.range(1, 10) + .parallel() + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .sequential() + .flatMapIterable(new Function, Iterable>() { + @Override + public Iterable apply(List v) throws Exception { + return v; + } + }) + .subscribe(ts); + + ts.assertValueSet(new HashSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))) + .assertNoErrors() + .assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void from() { + TestSubscriber ts = new TestSubscriber(); + + ParallelFlowable.fromArray(Flowable.range(1, 5), Flowable.range(6, 5)) + .sequential() + .subscribe(ts); + + ts.assertValueSet(new HashSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void concatMapUnordered() { + TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, 5) + .parallel() + .concatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.range(v * 10 + 1, 3); + } + }) + .sequential() + .subscribe(ts); + + ts.assertValueSet(new HashSet(Arrays.asList(11, 12, 13, 21, 22, 23, 31, 32, 33, 41, 42, 43, 51, 52, 53))) + .assertNoErrors() + .assertComplete(); + + } + + @Test + public void flatMapUnordered() { + TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, 5) + .parallel() + .flatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.range(v * 10 + 1, 3); + } + }) + .sequential() + .subscribe(ts); + + ts.assertValueSet(new HashSet(Arrays.asList(11, 12, 13, 21, 22, 23, 31, 32, 33, 41, 42, 43, 51, 52, 53))) + .assertNoErrors() + .assertComplete(); + + } + + @Test + public void collectAsyncFused() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(100000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + @Test + public void collectAsync() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000).hide() + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(100000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + + @Test + public void collectAsync2() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000).hide() + .observeOn(s) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(100000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + @Test + public void collectAsync3() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000).hide() + .observeOn(s) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(100000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + + @Test + public void collectAsync3Fused() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000) + .observeOn(s) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(100000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + @Test + public void collectAsync3Take() { + ExecutorService exec = Executors.newFixedThreadPool(4); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + Flowable.range(1, 100000) + .take(1000) + .observeOn(s) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(1000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + @Test + public void collectAsync4Take() { + ExecutorService exec = Executors.newFixedThreadPool(3); + + Scheduler s = Schedulers.from(exec); + + try { + Callable> as = new Callable>() { + @Override + public List call() throws Exception { + return new ArrayList(); + } + }; + TestSubscriber> ts = new TestSubscriber>(); + + UnicastProcessor up = UnicastProcessor.create(); + + for (int i = 0; i < 1000; i++) { + up.onNext(i); + } + + up + .take(1000) + .observeOn(s) + .parallel(3) + .runOn(s) + .collect(as, new BiConsumer, Integer>() { + @Override + public void accept(List a, Integer b) throws Exception { + a.add(b); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) throws Exception { + System.out.println(v.size()); + } + }) + .sequential() + .subscribe(ts); + + ts.awaitDone(5, TimeUnit.SECONDS); + ts.assertValueCount(3) + .assertNoErrors() + .assertComplete() + ; + + List> list = ts.values(); + + Assert.assertEquals(1000, list.get(0).size() + list.get(1).size() + list.get(2).size()); + } finally { + exec.shutdown(); + } + } + + @Test + public void emptySourceZeroRequest() { + TestSubscriber ts = new TestSubscriber(0); + + Flowable.range(1, 3).parallel(3).sequential().subscribe(ts); + + ts.request(1); + + ts.assertValue(1); + } + + @Test + public void parallelismAndPrefetch() { + for (int parallelism = 1; parallelism <= 8; parallelism++) { + for (int prefetch = 1; prefetch <= 1024; prefetch *= 2) { + Flowable.range(1, 1024 * 1024) + .parallel(parallelism, prefetch) + .map(Functions.identity()) + .sequential() + .test() + .assertSubscribed() + .assertValueCount(1024 * 1024) + .assertNoErrors() + .assertComplete(); + } + } + } + + @Test + public void parallelismAndPrefetchAsync() { + for (int parallelism = 1; parallelism <= 8; parallelism *= 2) { + for (int prefetch = 1; prefetch <= 1024; prefetch *= 2) { + System.out.println("parallelismAndPrefetchAsync >> " + parallelism + ", " + prefetch); + + Flowable.range(1, 1024 * 1024) + .parallel(parallelism, prefetch) + .runOn(Schedulers.computation()) + .map(Functions.identity()) + .sequential(prefetch) + .test() + .awaitDone(10, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1024 * 1024) + .assertNoErrors() + .assertComplete(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void badParallelismStage() { + TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, 10) + .parallel(2) + .subscribe(new Subscriber[] { ts }); + + ts.assertFailure(IllegalArgumentException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void badParallelismStage2() { + TestSubscriber ts1 = new TestSubscriber(); + TestSubscriber ts2 = new TestSubscriber(); + TestSubscriber ts3 = new TestSubscriber(); + + Flowable.range(1, 10) + .parallel(2) + .subscribe(new Subscriber[] { ts1, ts2, ts3 }); + + ts1.assertFailure(IllegalArgumentException.class); + ts2.assertFailure(IllegalArgumentException.class); + ts3.assertFailure(IllegalArgumentException.class); + } + + @Test + public void filter() { + Flowable.range(1, 20) + .parallel() + .runOn(Schedulers.computation()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueSet(Arrays.asList(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void filterThrows() throws Exception { + final boolean[] cancelled = { false }; + Flowable.range(1, 20) + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + cancelled[0] = true; + } + }) + .parallel() + .runOn(Schedulers.computation()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + if (v == 10) { + throw new TestException(); + } + return v % 2 == 0; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertError(TestException.class) + .assertNotComplete(); + + Thread.sleep(100); + + assertTrue(cancelled[0]); + } + + @Test + public void doAfterNext() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel() + .doAfterNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + count[0]++; + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void doOnNextThrows() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel() + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (v == 3) { + throw new TestException(); + } else { + count[0]++; + } + } + }) + .sequential() + .test() + .assertError(TestException.class) + .assertNotComplete(); + + assertTrue("" + count[0], count[0] < 5); + } + + @Test + public void doAfterNextThrows() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel() + .doAfterNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (v == 3) { + throw new TestException(); + } else { + count[0]++; + } + } + }) + .sequential() + .test() + .assertError(TestException.class) + .assertNotComplete(); + + assertTrue("" + count[0], count[0] < 5); + } + + @Test + public void errorNotRepeating() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.error(new TestException()) + .parallel() + .runOn(Schedulers.computation()) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class) + ; + + Thread.sleep(300); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doOnError() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + if (v == 3) { + throw new TestException(); + } + return v; + } + }) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + + @Test + public void doOnErrorThrows() { + TestSubscriber ts = Flowable.range(1, 5) + .parallel(2) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + if (v == 3) { + throw new TestException(); + } + return v; + } + }) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + throw new IOException(); + } + } + }) + .sequential() + .test() + .assertError(CompositeException.class) + .assertNotComplete(); + + List errors = TestHelper.errorList(ts); + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, IOException.class); + } + + @Test + public void doOnComplete() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + count[0]++; + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(2, count[0]); + } + + @Test + public void doAfterTerminate() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .doAfterTerminated(new Action() { + @Override + public void run() throws Exception { + count[0]++; + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(2, count[0]); + } + + @Test + public void doOnSubscribe() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + count[0]++; + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(2, count[0]); + } + + @Test + public void doOnRequest() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .doOnRequest(new LongConsumer() { + @Override + public void accept(long s) throws Exception { + count[0]++; + } + }) + .sequential() + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(2, count[0]); + } + + @Test + public void doOnCancel() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + count[0]++; + } + }) + .sequential() + .take(2) + .test() + .assertResult(1, 2); + + assertEquals(2, count[0]); + } + + @SuppressWarnings("unchecked") + @Test(expected = IllegalArgumentException.class) + public void fromPublishers() { + ParallelFlowable.fromArray(new Publisher[0]); + } + + @Test + public void to() { + Flowable.range(1, 5) + .parallel() + .to(new Function, Flowable>() { + @Override + public Flowable apply(ParallelFlowable pf) throws Exception { + return pf.sequential(); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test(expected = TestException.class) + public void toThrows() { + Flowable.range(1, 5) + .parallel() + .to(new Function, Flowable>() { + @Override + public Flowable apply(ParallelFlowable pf) throws Exception { + throw new TestException(); + } + }); + } + + @Test + public void compose() { + Flowable.range(1, 5) + .parallel() + .compose(new Function, ParallelFlowable>() { + @Override + public ParallelFlowable apply(ParallelFlowable pf) throws Exception { + return pf.map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }); + } + }) + .sequential() + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void flatMapDelayError() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .flatMap(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + if (v == 3) { + return Flowable.error(new TestException()); + } + return Flowable.just(v); + } + }, true) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertValues(1, 2, 4, 5) + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + + @Test + public void flatMapDelayErrorMaxConcurrency() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .flatMap(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + if (v == 3) { + return Flowable.error(new TestException()); + } + return Flowable.just(v); + } + }, true, 1) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertValues(1, 2, 4, 5) + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + + @Test + public void concatMapDelayError() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + if (v == 3) { + return Flowable.error(new TestException()); + } + return Flowable.just(v); + } + }, true) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertValues(1, 2, 4, 5) + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + + @Test + public void concatMapDelayErrorPrefetch() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + if (v == 3) { + return Flowable.error(new TestException()); + } + return Flowable.just(v); + } + }, 1, true) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertValues(1, 2, 4, 5) + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + + @Test + public void concatMapDelayErrorBoundary() { + final int[] count = { 0 }; + + Flowable.range(1, 5) + .parallel(2) + .concatMapDelayError(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + if (v == 3) { + return Flowable.error(new TestException()); + } + return Flowable.just(v); + } + }, false) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + if (e instanceof TestException) { + count[0]++; + } + } + }) + .sequential() + .test() + .assertValues(1, 2) + .assertError(TestException.class) + .assertNotComplete(); + + assertEquals(1, count[0]); + } + +} diff --git a/src/test/java/io/reactivex/tck/FlowableTck.java b/src/test/java/io/reactivex/tck/FlowableTck.java index 90482d8244..d3ec12533e 100644 --- a/src/test/java/io/reactivex/tck/FlowableTck.java +++ b/src/test/java/io/reactivex/tck/FlowableTck.java @@ -16,6 +16,11 @@ import io.reactivex.Flowable; public final class FlowableTck { + /** Utility class (remnant).*/ + private FlowableTck() { + throw new IllegalStateException("No instances!"); + } + /** * Enable strict mode. * @param the value type