Skip to content

Commit d4367d4

Browse files
committed
1.x: concat reduce overhead when streaming a source
1 parent 6aa760e commit d4367d4

File tree

3 files changed

+126
-9
lines changed

3 files changed

+126
-9
lines changed

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,27 @@ public static long addCap(long a, long b) {
103103
return u;
104104
}
105105

106+
/**
107+
* Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
108+
* @param requested the requested amount holder
109+
* @param n the value to subtract from the requested amount, has to be positive (not verified)
110+
* @return the new requested amount
111+
* @throws IllegalStateException if n is greater than the current requested amount, which
112+
* indicates a bug in the request accounting logic
113+
*/
114+
public static long produced(AtomicLong requested, long n) {
115+
for (;;) {
116+
long current = requested.get();
117+
if (current == Long.MAX_VALUE) {
118+
return Long.MAX_VALUE;
119+
}
120+
long next = current + n;
121+
if (next < 0L) {
122+
throw new IllegalStateException("More produced than requested: " + next);
123+
}
124+
if (requested.compareAndSet(current, next)) {
125+
return next;
126+
}
127+
}
128+
}
106129
}

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,19 @@ public void onStart() {
112112
}
113113

114114
private void requestFromChild(long n) {
115-
if (n <=0) return;
115+
if (n <= 0) return;
116116
// we track 'requested' so we know whether we should subscribe the next or not
117-
long previous = BackpressureUtils.getAndAddRequest(requested, n);
117+
118+
final AtomicLong requestedField = requested;
119+
120+
long previous;
121+
122+
if (requestedField.get() != Long.MAX_VALUE) {
123+
previous = BackpressureUtils.getAndAddRequest(requestedField, n);
124+
} else {
125+
previous = Long.MAX_VALUE;
126+
}
127+
118128
arbiter.request(n);
119129
if (previous == 0) {
120130
if (currentSubscriber == null && wip.get() > 0) {
@@ -125,10 +135,6 @@ private void requestFromChild(long n) {
125135
}
126136
}
127137

128-
private void decrementRequested() {
129-
requested.decrementAndGet();
130-
}
131-
132138
@Override
133139
public void onNext(Observable<? extends T> t) {
134140
queue.add(nl.next(t));
@@ -167,8 +173,10 @@ void subscribeNext() {
167173
child.onCompleted();
168174
} else if (o != null) {
169175
Observable<? extends T> obs = nl.getValue(o);
176+
170177
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter);
171178
current.set(currentSubscriber);
179+
172180
obs.unsafeSubscribe(currentSubscriber);
173181
}
174182
} else {
@@ -179,6 +187,11 @@ void subscribeNext() {
179187
}
180188
}
181189
}
190+
191+
void produced(long c) {
192+
arbiter.produced(c);
193+
BackpressureUtils.produced(requested, c);
194+
}
182195
}
183196

184197
static class ConcatInnerSubscriber<T> extends Subscriber<T> {
@@ -187,6 +200,8 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
187200
private final ConcatSubscriber<T> parent;
188201
private final AtomicInteger once = new AtomicInteger();
189202
private final ProducerArbiter arbiter;
203+
204+
long produced;
190205

191206
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
192207
this.parent = parent;
@@ -196,9 +211,9 @@ public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, Pr
196211

197212
@Override
198213
public void onNext(T t) {
214+
produced++;
215+
199216
child.onNext(t);
200-
parent.decrementRequested();
201-
arbiter.produced(1);
202217
}
203218

204219
@Override
@@ -212,6 +227,11 @@ public void onError(Throwable e) {
212227
@Override
213228
public void onCompleted() {
214229
if (once.compareAndSet(0, 1)) {
230+
// signal the production count at once instead of one by one
231+
long c = produced;
232+
if (c != 0L) {
233+
parent.produced(c);
234+
}
215235
// terminal completion to parent so it continues to the next
216236
parent.completeInner();
217237
}
@@ -221,6 +241,5 @@ public void onCompleted() {
221241
public void setProducer(Producer producer) {
222242
arbiter.setProducer(producer);
223243
}
224-
225244
}
226245
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Mode;
24+
import org.openjdk.jmh.annotations.OutputTimeUnit;
25+
import org.openjdk.jmh.annotations.Param;
26+
import org.openjdk.jmh.annotations.Scope;
27+
import org.openjdk.jmh.annotations.Setup;
28+
import org.openjdk.jmh.annotations.State;
29+
import org.openjdk.jmh.infra.Blackhole;
30+
31+
import rx.Observable;
32+
import rx.jmh.LatchedObserver;
33+
34+
/**
35+
* Benchmark typical atomic operations on volatile fields and AtomicXYZ classes.
36+
* <p>
37+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*ConcatPerf.*"
38+
* <p>
39+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ConcatPerf.*"
40+
*/
41+
@BenchmarkMode(Mode.Throughput)
42+
@OutputTimeUnit(TimeUnit.SECONDS)
43+
@State(Scope.Thread)
44+
public class ConcatPerf {
45+
46+
Observable<Integer> source;
47+
48+
Observable<Integer> baseline;
49+
50+
@Param({"1", "1000", "1000000"})
51+
int count;
52+
53+
@Setup
54+
public void setup() {
55+
Integer[] array = new Integer[count];
56+
57+
for (int i = 0; i < count; i++) {
58+
array[i] = 777;
59+
}
60+
61+
baseline = Observable.from(array);
62+
63+
source = Observable.concat(baseline, Observable.<Integer>empty());
64+
}
65+
66+
@Benchmark
67+
public void normal(Blackhole bh) {
68+
source.subscribe(new LatchedObserver<Integer>(bh));
69+
}
70+
71+
@Benchmark
72+
public void baseline(Blackhole bh) {
73+
baseline.subscribe(new LatchedObserver<Integer>(bh));
74+
}
75+
}

0 commit comments

Comments
 (0)