From e741bb9dbb29bb29527c0515202cd6d5368f4d7c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 18 Jul 2016 09:33:26 +0200 Subject: [PATCH 1/2] 1.x: merge/flatMap to keep scalar/inner element relative order --- .../rx/internal/operators/OperatorMerge.java | 18 ++++++-- .../internal/operators/OperatorMergeTest.java | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 71d74713bf..1bb18111b9 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -352,9 +352,15 @@ void tryEmit(InnerSubscriber subscriber, T value) { } } if (success) { - emitScalar(subscriber, value, r); + if (subscriber.queue == null || subscriber.queue.isEmpty()) { + emitScalar(subscriber, value, r); + } else { + queueScalar(subscriber, value); + emitLoop(); + } } else { queueScalar(subscriber, value); + emit(); } } @@ -383,7 +389,6 @@ protected void queueScalar(InnerSubscriber subscriber, T value) { } return; } - emit(); } protected void emitScalar(InnerSubscriber subscriber, T value, long r) { @@ -460,9 +465,15 @@ void tryEmit(T value) { } } if (success) { - emitScalar(value, r); + if (queue == null || queue.isEmpty()) { + emitScalar(value, r); + } else { + queueScalar(value); + emitLoop(); + } } else { queueScalar(value); + emit(); } } @@ -495,7 +506,6 @@ protected void queueScalar(T value) { onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value)); return; } - emit(); } protected void emitScalar(T value, long r) { diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 8f6e2a2aa2..76c7b43333 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -34,6 +34,7 @@ import rx.Observer; import rx.Scheduler.Worker; import rx.functions.*; +import rx.internal.operators.OperatorMerge.*; import rx.internal.util.*; import rx.observers.TestSubscriber; import rx.schedulers.*; @@ -1498,4 +1499,44 @@ public void flatMapMaxConcurrentJustRange() { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void noInnerReordering() { + TestSubscriber ts = TestSubscriber.create(0); + MergeSubscriber ms = new MergeSubscriber(ts, false, 128); + ms.producer = new MergeProducer(ms); + ts.setProducer(ms.producer); + + PublishSubject ps = PublishSubject.create(); + + ms.onNext(ps); + + ps.onNext(1); + + BackpressureUtils.getAndAddRequest(ms.producer, 2); + + ps.onNext(2); + + ms.emit(); + + ts.assertValues(1, 2); + } + + @Test + public void noOuterScalarReordering() { + TestSubscriber ts = TestSubscriber.create(0); + MergeSubscriber ms = new MergeSubscriber(ts, false, 128); + ms.producer = new MergeProducer(ms); + ts.setProducer(ms.producer); + + ms.onNext(Observable.just(1)); + + BackpressureUtils.getAndAddRequest(ms.producer, 2); + + ms.onNext(Observable.just(2)); + + ms.emit(); + + ts.assertValues(1, 2); + } } From 7ff13edac5dddbf842fcbf2642608d6dc0f360af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 20 Jul 2016 22:37:27 +0200 Subject: [PATCH 2/2] Read the queue references once --- src/main/java/rx/internal/operators/OperatorMerge.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 1bb18111b9..a5d92a33ae 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -352,7 +352,8 @@ void tryEmit(InnerSubscriber subscriber, T value) { } } if (success) { - if (subscriber.queue == null || subscriber.queue.isEmpty()) { + RxRingBuffer subscriberQueue = subscriber.queue; + if (subscriberQueue == null || subscriberQueue.isEmpty()) { emitScalar(subscriber, value, r); } else { queueScalar(subscriber, value); @@ -465,7 +466,8 @@ void tryEmit(T value) { } } if (success) { - if (queue == null || queue.isEmpty()) { + Queue mainQueue = queue; + if (mainQueue == null || mainQueue.isEmpty()) { emitScalar(value, r); } else { queueScalar(value);