diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 71d74713bf..a5d92a33ae 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -352,9 +352,16 @@ void tryEmit(InnerSubscriber subscriber, T value) { } } if (success) { - emitScalar(subscriber, value, r); + RxRingBuffer subscriberQueue = subscriber.queue; + if (subscriberQueue == null || subscriberQueue.isEmpty()) { + emitScalar(subscriber, value, r); + } else { + queueScalar(subscriber, value); + emitLoop(); + } } else { queueScalar(subscriber, value); + emit(); } } @@ -383,7 +390,6 @@ protected void queueScalar(InnerSubscriber subscriber, T value) { } return; } - emit(); } protected void emitScalar(InnerSubscriber subscriber, T value, long r) { @@ -460,9 +466,16 @@ void tryEmit(T value) { } } if (success) { - emitScalar(value, r); + Queue mainQueue = queue; + if (mainQueue == null || mainQueue.isEmpty()) { + emitScalar(value, r); + } else { + queueScalar(value); + emitLoop(); + } } else { queueScalar(value); + emit(); } } @@ -495,7 +508,6 @@ protected void queueScalar(T value) { onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value)); return; } - emit(); } protected void emitScalar(T value, long r) { diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 8f6e2a2aa2..76c7b43333 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -34,6 +34,7 @@ import rx.Observer; import rx.Scheduler.Worker; import rx.functions.*; +import rx.internal.operators.OperatorMerge.*; import rx.internal.util.*; import rx.observers.TestSubscriber; import rx.schedulers.*; @@ -1498,4 +1499,44 @@ public void flatMapMaxConcurrentJustRange() { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void noInnerReordering() { + TestSubscriber ts = TestSubscriber.create(0); + MergeSubscriber ms = new MergeSubscriber(ts, false, 128); + ms.producer = new MergeProducer(ms); + ts.setProducer(ms.producer); + + PublishSubject ps = PublishSubject.create(); + + ms.onNext(ps); + + ps.onNext(1); + + BackpressureUtils.getAndAddRequest(ms.producer, 2); + + ps.onNext(2); + + ms.emit(); + + ts.assertValues(1, 2); + } + + @Test + public void noOuterScalarReordering() { + TestSubscriber ts = TestSubscriber.create(0); + MergeSubscriber ms = new MergeSubscriber(ts, false, 128); + ms.producer = new MergeProducer(ms); + ts.setProducer(ms.producer); + + ms.onNext(Observable.just(1)); + + BackpressureUtils.getAndAddRequest(ms.producer, 2); + + ms.onNext(Observable.just(2)); + + ms.emit(); + + ts.assertValues(1, 2); + } }