From a6f35a58e2feee9d22a525a11c9c009e6ac6ab40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Fri, 12 Feb 2016 22:33:01 +0100 Subject: [PATCH] 1.x: fix mapNotification's last item backpressure handling --- .../operators/OperatorMapNotification.java | 286 ++++++++---------- .../OperatorMapNotificationTest.java | 85 ++++++ 2 files changed, 208 insertions(+), 163 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMapNotification.java b/src/main/java/rx/internal/operators/OperatorMapNotification.java index 8abe7b828e..e7a18cc202 100644 --- a/src/main/java/rx/internal/operators/OperatorMapNotification.java +++ b/src/main/java/rx/internal/operators/OperatorMapNotification.java @@ -15,16 +15,12 @@ */ package rx.internal.operators; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.*; import rx.*; import rx.Observable.Operator; -import rx.exceptions.*; +import rx.exceptions.Exceptions; import rx.functions.*; -import rx.internal.producers.ProducerArbiter; -import rx.internal.util.unsafe.*; /** * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of @@ -45,203 +41,167 @@ public OperatorMapNotification(Func1 onNext, Func1 call(final Subscriber o) { - final ProducerArbiter pa = new ProducerArbiter(); - - MapNotificationSubscriber subscriber = new MapNotificationSubscriber(pa, o); - o.add(subscriber); - subscriber.init(); - return subscriber; + public Subscriber call(final Subscriber child) { + final MapNotificationSubscriber parent = new MapNotificationSubscriber(child, onNext, onError, onCompleted); + child.add(parent); + child.setProducer(new Producer() { + @Override + public void request(long n) { + parent.requestInner(n); + } + }); + return parent; } - final class MapNotificationSubscriber extends Subscriber { - private final Subscriber o; - private final ProducerArbiter pa; - final SingleEmitter emitter; - - MapNotificationSubscriber(ProducerArbiter pa, Subscriber o) { - this.pa = pa; - this.o = o; - this.emitter = new SingleEmitter(o, pa, this); - } + static final class MapNotificationSubscriber extends Subscriber { - void init() { - o.setProducer(emitter); - } + final Subscriber actual; + + final Func1 onNext; + + final Func1 onError; + + final Func0 onCompleted; + + final AtomicLong requested; - @Override - public void setProducer(Producer producer) { - pa.setProducer(producer); + final AtomicLong missedRequested; + + final AtomicReference producer; + + long produced; + + R value; + + static final long COMPLETED_FLAG = Long.MIN_VALUE; + static final long REQUESTED_MASK = Long.MAX_VALUE; + + public MapNotificationSubscriber(Subscriber actual, Func1 onNext, + Func1 onError, Func0 onCompleted) { + this.actual = actual; + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + this.requested = new AtomicLong(); + this.missedRequested = new AtomicLong(); + this.producer = new AtomicReference(); } @Override - public void onCompleted() { + public void onNext(T t) { try { - emitter.offerAndComplete(onCompleted.call()); - } catch (Throwable e) { - Exceptions.throwOrReport(e, o); + produced++; + actual.onNext(onNext.call(t)); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, actual, t); } } - + @Override public void onError(Throwable e) { + accountProduced(); try { - emitter.offerAndComplete(onError.call(e)); - } catch (Throwable e2) { - Exceptions.throwOrReport(e2, o); + value = onError.call(e); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, actual, e); } + tryEmit(); } - + @Override - public void onNext(T t) { + public void onCompleted() { + accountProduced(); try { - emitter.offer(onNext.call(t)); - } catch (Throwable e) { - Exceptions.throwOrReport(e, o, t); + value = onCompleted.call(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, actual); } + tryEmit(); } - } - static final class SingleEmitter extends AtomicLong implements Producer, Subscription { - /** */ - private static final long serialVersionUID = -249869671366010660L; - final NotificationLite nl; - final Subscriber child; - final Producer producer; - final Subscription cancel; - final Queue queue; - volatile boolean complete; - /** Guarded by this. */ - boolean emitting; - /** Guarded by this. */ - boolean missed; - public SingleEmitter(Subscriber child, Producer producer, Subscription cancel) { - this.child = child; - this.producer = producer; - this.cancel = cancel; - this.queue = UnsafeAccess.isUnsafeAvailable() - ? new SpscArrayQueue(2) - : new ConcurrentLinkedQueue(); - - this.nl = NotificationLite.instance(); + void accountProduced() { + long p = produced; + if (p != 0L && producer.get() != null) { + BackpressureUtils.produced(requested, p); + } } + @Override - public void request(long n) { - for (;;) { - long r = get(); - if (r < 0) { - return; - } - long u = r + n; - if (u < 0) { - u = Long.MAX_VALUE; - } - if (compareAndSet(r, u)) { - producer.request(n); - drain(); - return; + public void setProducer(Producer p) { + if (producer.compareAndSet(null, p)) { + long r = missedRequested.getAndSet(0L); + if (r != 0L) { + p.request(r); } + } else { + throw new IllegalStateException("Producer already set!"); } } - void produced(long n) { + void tryEmit() { for (;;) { - long r = get(); - if (r < 0) { - return; - } - long u = r - n; - if (u < 0) { - throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); + long r = requested.get(); + if ((r & COMPLETED_FLAG) != 0) { + break; } - if (compareAndSet(r, u)) { + if (requested.compareAndSet(r, r | COMPLETED_FLAG)) { + if (r != 0 || producer.get() == null) { + if (!actual.isUnsubscribed()) { + actual.onNext(value); + } + if (!actual.isUnsubscribed()) { + actual.onCompleted(); + } + } return; } } } - public void offer(T value) { - if (!queue.offer(value)) { - child.onError(new MissingBackpressureException()); - unsubscribe(); - } else { - drain(); + void requestInner(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); } - } - public void offerAndComplete(T value) { - if (!this.queue.offer(value)) { - child.onError(new MissingBackpressureException()); - unsubscribe(); - } else { - this.complete = true; - drain(); - } - } - - void drain() { - synchronized (this) { - if (emitting) { - missed = true; - return; - } - emitting = true; - missed = false; + if (n == 0L) { + return; } - boolean skipFinal = false; - try { - for (;;) { - - long r = get(); - boolean c = complete; - boolean empty = queue.isEmpty(); - - if (c && empty) { - child.onCompleted(); - skipFinal = true; - return; - } else - if (r > 0) { - Object v = queue.poll(); - if (v != null) { - child.onNext(nl.getValue(v)); - produced(1); - } else - if (c) { - child.onCompleted(); - skipFinal = true; - return; - } - } - - synchronized (this) { - if (!missed) { - skipFinal = true; - emitting = false; - return; + for (;;) { + long r = requested.get(); + + if ((r & COMPLETED_FLAG) != 0L) { + long v = r & REQUESTED_MASK; + long u = BackpressureUtils.addCap(v, n) | COMPLETED_FLAG; + if (requested.compareAndSet(r, u)) { + if (v == 0L) { + if (!actual.isUnsubscribed()) { + actual.onNext(value); + } + if (!actual.isUnsubscribed()) { + actual.onCompleted(); + } } - missed = false; + return; } - } - } finally { - if (!skipFinal) { - synchronized (this) { - emitting = false; + } else { + long u = BackpressureUtils.addCap(r, n); + if (requested.compareAndSet(r, u)) { + break; } } } - } - - @Override - public boolean isUnsubscribed() { - return get() < 0; - } - @Override - public void unsubscribe() { - long r = get(); - if (r != Long.MIN_VALUE) { - r = getAndSet(Long.MIN_VALUE); - if (r != Long.MIN_VALUE) { - cancel.unsubscribe(); + + AtomicReference localProducer = producer; + Producer actualProducer = localProducer.get(); + if (actualProducer != null) { + actualProducer.request(n); + } else { + BackpressureUtils.getAndAddRequest(missedRequested, n); + actualProducer = localProducer.get(); + if (actualProducer != null) { + long r = missedRequested.getAndSet(0L); + if (r != 0L) { + actualProducer.request(r); + } } } } diff --git a/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java index 2f1e603337..3e94a20b8b 100644 --- a/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java +++ b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java @@ -21,6 +21,7 @@ import rx.Observable; import rx.functions.*; import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; public class OperatorMapNotificationTest { @Test @@ -52,4 +53,88 @@ public Observable call() { ts.assertNotCompleted(); ts.assertValue(2); } + + @Test + public void backpressure() { + TestSubscriber ts = TestSubscriber.create(0L); + + Observable.range(1, 3).lift(new OperatorMapNotification( + new Func1() { + @Override + public Integer call(Integer item) { + return item + 1; + } + }, + new Func1() { + @Override + public Integer call(Throwable e) { + return 0; + } + }, + new Func0() { + @Override + public Integer call() { + return 5; + } + } + )).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(3); + + ts.assertValues(2, 3, 4); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValues(2, 3, 4, 5); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void noBackpressure() { + TestSubscriber ts = TestSubscriber.create(0L); + + PublishSubject ps = PublishSubject.create(); + + ps.lift(new OperatorMapNotification( + new Func1() { + @Override + public Integer call(Integer item) { + return item + 1; + } + }, + new Func1() { + @Override + public Integer call(Throwable e) { + return 0; + } + }, + new Func0() { + @Override + public Integer call() { + return 5; + } + } + )).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ps.onNext(1); + ps.onNext(2); + ps.onNext(3); + ps.onCompleted(); + + ts.assertValues(2, 3, 4, 5); + ts.assertNoErrors(); + ts.assertCompleted(); + } + }