diff --git a/src/main/java/rx/internal/operators/OperatorConcat.java b/src/main/java/rx/internal/operators/OperatorConcat.java index 8e8514b9ef..e91e669bba 100644 --- a/src/main/java/rx/internal/operators/OperatorConcat.java +++ b/src/main/java/rx/internal/operators/OperatorConcat.java @@ -24,6 +24,7 @@ import rx.Producer; import rx.Subscriber; import rx.functions.Action0; +import rx.internal.producers.ProducerArbiter; import rx.observers.SerializedSubscriber; import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; @@ -85,17 +86,19 @@ static final class ConcatSubscriber extends Subscriber WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip"); + static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip"); - // accessed by REQUESTED_UPDATER + // accessed by REQUESTED private volatile long requested; @SuppressWarnings("rawtypes") - private static final AtomicLongFieldUpdater REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested"); + private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested"); + private final ProducerArbiter arbiter; public ConcatSubscriber(Subscriber s, SerialSubscription current) { super(s); this.child = s; this.current = current; + this.arbiter = new ProducerArbiter(); this.queue = new ConcurrentLinkedQueue(); add(Subscriptions.create(new Action0() { @Override @@ -113,32 +116,27 @@ public void onStart() { } private void requestFromChild(long n) { + if (n <=0) return; // we track 'requested' so we know whether we should subscribe the next or not - ConcatInnerSubscriber actualSubscriber = currentSubscriber; - if (n > 0 && BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n) == 0) { - if (actualSubscriber == null && wip > 0) { + long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + arbiter.request(n); + if (previous == 0) { + if (currentSubscriber == null && wip > 0) { // this means we may be moving from one subscriber to another after having stopped processing // so need to kick off the subscribe via this request notification subscribeNext(); - // return here as we don't want to do the requestMore logic below (which would double request) - return; } } - - if (actualSubscriber != null) { - // otherwise we are just passing it through to the currentSubscriber - actualSubscriber.requestMore(n); - } } private void decrementRequested() { - REQUESTED_UPDATER.decrementAndGet(this); + REQUESTED.decrementAndGet(this); } @Override public void onNext(Observable t) { queue.add(nl.next(t)); - if (WIP_UPDATER.getAndIncrement(this) == 0) { + if (WIP.getAndIncrement(this) == 0) { subscribeNext(); } } @@ -152,14 +150,15 @@ public void onError(Throwable e) { @Override public void onCompleted() { queue.add(nl.completed()); - if (WIP_UPDATER.getAndIncrement(this) == 0) { + if (WIP.getAndIncrement(this) == 0) { subscribeNext(); } } + void completeInner() { currentSubscriber = null; - if (WIP_UPDATER.decrementAndGet(this) > 0) { + if (WIP.decrementAndGet(this) > 0) { subscribeNext(); } request(1); @@ -172,7 +171,7 @@ void subscribeNext() { child.onCompleted(); } else if (o != null) { Observable obs = nl.getValue(o); - currentSubscriber = new ConcatInnerSubscriber(this, child, requested); + currentSubscriber = new ConcatInnerSubscriber(this, child, arbiter); current.set(currentSubscriber); obs.unsafeSubscribe(currentSubscriber); } @@ -193,27 +192,25 @@ static class ConcatInnerSubscriber extends Subscriber { @SuppressWarnings("unused") private volatile int once = 0; @SuppressWarnings("rawtypes") - private final static AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once"); + private final static AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once"); + private final ProducerArbiter arbiter; - public ConcatInnerSubscriber(ConcatSubscriber parent, Subscriber child, long initialRequest) { + public ConcatInnerSubscriber(ConcatSubscriber parent, Subscriber child, ProducerArbiter arbiter) { this.parent = parent; this.child = child; - request(initialRequest); - } - - void requestMore(long n) { - request(n); + this.arbiter = arbiter; } - + @Override public void onNext(T t) { - parent.decrementRequested(); child.onNext(t); + parent.decrementRequested(); + arbiter.produced(1); } @Override public void onError(Throwable e) { - if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { + if (ONCE.compareAndSet(this, 0, 1)) { // terminal error through parent so everything gets cleaned up, including this inner parent.onError(e); } @@ -221,11 +218,16 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { + if (ONCE.compareAndSet(this, 0, 1)) { // terminal completion to parent so it continues to the next parent.completeInner(); } } + + @Override + public void setProducer(Producer producer) { + arbiter.setProducer(producer); + } }; } diff --git a/src/main/java/rx/internal/producers/ProducerArbiter.java b/src/main/java/rx/internal/producers/ProducerArbiter.java index d90a575447..b23904103e 100644 --- a/src/main/java/rx/internal/producers/ProducerArbiter.java +++ b/src/main/java/rx/internal/producers/ProducerArbiter.java @@ -95,7 +95,7 @@ public void produced(long n) { if (r != Long.MAX_VALUE) { long u = r - n; if (u < 0) { - throw new IllegalStateException(); + throw new IllegalStateException("more items arrived than were requested"); } requested = u; } diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index 75bfee65f4..c04b6dd910 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -36,9 +36,7 @@ import org.mockito.InOrder; import rx.Observable.OnSubscribe; -import rx.Scheduler.Worker; import rx.*; -import rx.functions.Action0; import rx.functions.Func1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; @@ -795,4 +793,33 @@ public void onNext(Integer t) { assertTrue(completed.get()); } + @Test//(timeout = 100000) + public void concatMapRangeAsyncLoopIssue2876() { + final long durationSeconds = 2; + final long startTime = System.currentTimeMillis(); + for (int i = 0;; i++) { + //only run this for a max of ten seconds + if (System.currentTimeMillis()-startTime > TimeUnit.SECONDS.toMillis(durationSeconds)) + return; + if (i % 1000 == 0) { + System.out.println("concatMapRangeAsyncLoop > " + i); + } + TestSubscriber ts = new TestSubscriber(); + Observable.range(0, 1000) + .concatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.from(Arrays.asList(t)); + } + }) + .observeOn(Schedulers.computation()).subscribe(ts); + + ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + assertEquals(1000, ts.getOnNextEvents().size()); + assertEquals((Integer)999, ts.getOnNextEvents().get(999)); + } + } + }