diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 2da1844ca9..a2bc46ff8d 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -213,7 +213,7 @@ private void handleNewSource(Observable t) { InnerSubscriber i = new InnerSubscriber(this, producerIfNeeded); i.sindex = childrenSubscribers.add(i); t.unsafeSubscribe(i); - if (!isUnsubscribed()) { + if ((producerIfNeeded == null || producerIfNeeded.requested > 0) && !isUnsubscribed()) { request(1); } } @@ -523,6 +523,9 @@ private void drainAndComplete() { } } + public void requestMore(long n) { + request(n); + } } private static final class MergeProducer implements Producer { @@ -545,16 +548,23 @@ public void request(long n) { if (n == Long.MAX_VALUE) { requested = Long.MAX_VALUE; } else { - BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + final long count = BackpressureUtils.getAndAddRequest(REQUESTED, this, n); if (ms.drainQueuesIfNeeded()) { boolean sendComplete = false; + boolean requestMore = false; synchronized (ms) { if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) { sendComplete = true; } + + if (count > 0 && ms.wip == 0 && ms.scalarValueQueue == null) { + requestMore = true; + } } if (sendComplete) { ms.drainAndComplete(); + } else if (requestMore) { + ms.requestMore(n); } } } diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 7d785b4088..8a23fb5701 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -21,9 +21,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; @@ -667,6 +665,60 @@ public void onNext(Integer t) { assertTrue(generated1.get() >= RxRingBuffer.SIZE * 2 && generated1.get() <= RxRingBuffer.SIZE * 3); } + @Test + public void testWhenRequestCalledAndNotingPendingThenEmitsMore() throws InterruptedException { + final AtomicInteger generated1 = new AtomicInteger(); + final TestScheduler scheduler = new TestScheduler(); + TestSubscriber nonScalartestSubscriber = spy(new TestSubscriber() { + @Override + public void onStart() { + request(10); + } + }); + Observable o1 = createInfiniteObservable(generated1) + .flatMap(new Func1>() { + @Override + public Observable call(final Integer integer) { + return Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + subscriber.onNext(-integer); + subscriber.onCompleted(); + } + }); + } + }) + .subscribeOn(scheduler); + o1.subscribe(nonScalartestSubscriber); + + TestSubscriber scalartestSubscriber = spy(new TestSubscriber() { + @Override + public void onStart() { + request(10); + } + }); + Observable o2 = createInfiniteObservable(generated1) + .flatMap(new Func1>() { + @Override + public Observable call(final Integer integer) { + return Observable.just(-integer); + } + }) + .subscribeOn(scheduler); + o2.subscribe(scalartestSubscriber); + + scheduler.triggerActions(); + verify(nonScalartestSubscriber, times(10)).onNext(anyInt()); + verify(scalartestSubscriber, times(10)).onNext(anyInt()); + + nonScalartestSubscriber.requestMore(100); + scalartestSubscriber.requestMore(100); + scheduler.triggerActions(); + + verify(nonScalartestSubscriber, times(110)).onNext(anyInt()); + verify(scalartestSubscriber, times(110)).onNext(anyInt()); + } + /** * This is the same as the upstreams ones, but now adds the downstream as well by using observeOn. *