diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 98cb548391..d2f52cb204 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -315,7 +315,8 @@ void tryEmit(InnerSubscriber subscriber, T value) { if (r != 0L) { synchronized (this) { // if nobody is emitting and child has available requests - if (!emitting) { + r = producer.get(); + if (!emitting && r != 0L) { emitting = true; success = true; } @@ -422,7 +423,8 @@ void tryEmit(T value) { if (r != 0L) { synchronized (this) { // if nobody is emitting and child has available requests - if (!emitting) { + r = producer.get(); + if (!emitting && r != 0L) { emitting = true; success = true; } diff --git a/src/test/java/rx/BackpressureTests.java b/src/test/java/rx/BackpressureTests.java index ffa2e01129..439b18a08f 100644 --- a/src/test/java/rx/BackpressureTests.java +++ b/src/test/java/rx/BackpressureTests.java @@ -123,6 +123,30 @@ public void testMergeAsync() { assertTrue(c2.get() < RxRingBuffer.SIZE * 5); } + @Test + public void testMergeAsyncThenObserveOnLoop() { + for (int i = 0; i < 500; i++) { + if (i % 10 == 0) { + System.out.println("testMergeAsyncThenObserveOnLoop >> " + i); + } + // Verify there is no MissingBackpressureException + int NUM = (int) (RxRingBuffer.SIZE * 4.1); + AtomicInteger c1 = new AtomicInteger(); + AtomicInteger c2 = new AtomicInteger(); + + TestSubscriber ts = new TestSubscriber(); + Observable merged = Observable.merge( + incrementingIntegers(c1).subscribeOn(Schedulers.computation()), + incrementingIntegers(c2).subscribeOn(Schedulers.computation())); + + merged.observeOn(Schedulers.io()).take(NUM).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get()); + assertEquals(NUM, ts.getOnNextEvents().size()); + } + } + @Test public void testMergeAsyncThenObserveOn() { int NUM = (int) (RxRingBuffer.SIZE * 4.1);