diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 937f186535..0d4adef0a8 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -103,4 +103,27 @@ public static long addCap(long a, long b) { return u; } + /** + * Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE. + * @param requested the requested amount holder + * @param n the value to subtract from the requested amount, has to be positive (not verified) + * @return the new requested amount + * @throws IllegalStateException if n is greater than the current requested amount, which + * indicates a bug in the request accounting logic + */ + public static long produced(AtomicLong requested, long n) { + for (;;) { + long current = requested.get(); + if (current == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long next = current - n; + if (next < 0L) { + throw new IllegalStateException("More produced than requested: " + next); + } + if (requested.compareAndSet(current, next)) { + return next; + } + } + } } diff --git a/src/main/java/rx/internal/operators/OperatorConcat.java b/src/main/java/rx/internal/operators/OperatorConcat.java index 8455cc55b3..e251841f18 100644 --- a/src/main/java/rx/internal/operators/OperatorConcat.java +++ b/src/main/java/rx/internal/operators/OperatorConcat.java @@ -16,18 +16,14 @@ package rx.internal.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.*; -import rx.Observable; +import rx.*; import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; import rx.functions.Action0; import rx.internal.producers.ProducerArbiter; import rx.observers.SerializedSubscriber; -import rx.subscriptions.SerialSubscription; -import rx.subscriptions.Subscriptions; +import rx.subscriptions.*; /** * Returns an Observable that emits the items emitted by two or more Observables, one after the other. @@ -112,9 +108,19 @@ public void onStart() { } private void requestFromChild(long n) { - if (n <=0) return; + if (n <= 0) return; // we track 'requested' so we know whether we should subscribe the next or not - long previous = BackpressureUtils.getAndAddRequest(requested, n); + + final AtomicLong requestedField = requested; + + long previous; + + if (requestedField.get() != Long.MAX_VALUE) { + previous = BackpressureUtils.getAndAddRequest(requestedField, n); + } else { + previous = Long.MAX_VALUE; + } + arbiter.request(n); if (previous == 0) { if (currentSubscriber == null && wip.get() > 0) { @@ -125,10 +131,6 @@ private void requestFromChild(long n) { } } - private void decrementRequested() { - requested.decrementAndGet(); - } - @Override public void onNext(Observable t) { queue.add(nl.next(t)); @@ -167,8 +169,10 @@ void subscribeNext() { child.onCompleted(); } else if (o != null) { Observable obs = nl.getValue(o); + currentSubscriber = new ConcatInnerSubscriber(this, child, arbiter); current.set(currentSubscriber); + obs.unsafeSubscribe(currentSubscriber); } } else { @@ -179,14 +183,23 @@ void subscribeNext() { } } } + + void produced(long c) { + if (c != 0L) { + arbiter.produced(c); + BackpressureUtils.produced(requested, c); + } + } } static class ConcatInnerSubscriber extends Subscriber { private final Subscriber child; private final ConcatSubscriber parent; - private final AtomicInteger once = new AtomicInteger(); + private final AtomicBoolean once = new AtomicBoolean(); private final ProducerArbiter arbiter; + + long produced; public ConcatInnerSubscriber(ConcatSubscriber parent, Subscriber child, ProducerArbiter arbiter) { this.parent = parent; @@ -196,14 +209,14 @@ public ConcatInnerSubscriber(ConcatSubscriber parent, Subscriber child, Pr @Override public void onNext(T t) { + produced++; + child.onNext(t); - parent.decrementRequested(); - arbiter.produced(1); } @Override public void onError(Throwable e) { - if (once.compareAndSet(0, 1)) { + if (once.compareAndSet(false, true)) { // terminal error through parent so everything gets cleaned up, including this inner parent.onError(e); } @@ -211,9 +224,12 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (once.compareAndSet(0, 1)) { + if (once.compareAndSet(false, true)) { + ConcatSubscriber p = parent; + // signal the production count at once instead of one by one + p.produced(produced); // terminal completion to parent so it continues to the next - parent.completeInner(); + p.completeInner(); } } @@ -221,6 +237,5 @@ public void onCompleted() { public void setProducer(Producer producer) { arbiter.setProducer(producer); } - } } diff --git a/src/perf/java/rx/operators/ConcatPerf.java b/src/perf/java/rx/operators/ConcatPerf.java new file mode 100644 index 0000000000..c9c5e8e18f --- /dev/null +++ b/src/perf/java/rx/operators/ConcatPerf.java @@ -0,0 +1,75 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import rx.Observable; +import rx.jmh.LatchedObserver; + +/** + * Benchmark typical atomic operations on volatile fields and AtomicXYZ classes. + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*ConcatPerf.*" + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ConcatPerf.*" + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class ConcatPerf { + + Observable source; + + Observable baseline; + + @Param({"1", "1000", "1000000"}) + int count; + + @Setup + public void setup() { + Integer[] array = new Integer[count]; + + for (int i = 0; i < count; i++) { + array[i] = 777; + } + + baseline = Observable.from(array); + + source = Observable.concat(baseline, Observable.empty()); + } + + @Benchmark + public void normal(Blackhole bh) { + source.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void baseline(Blackhole bh) { + baseline.subscribe(new LatchedObserver(bh)); + } +}