diff --git a/src/main/java/rx/internal/operators/OperatorMaterialize.java b/src/main/java/rx/internal/operators/OperatorMaterialize.java index bd5771747c..e074cd5816 100644 --- a/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -15,8 +15,11 @@ */ package rx.internal.operators; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + import rx.Notification; import rx.Observable.Operator; +import rx.Producer; import rx.Subscriber; import rx.plugins.RxJavaPlugins; @@ -29,41 +32,137 @@ * See here for the Microsoft Rx equivalent. */ public final class OperatorMaterialize implements Operator, T> { + /** Lazy initialization via inner-class holder. */ private static final class Holder { /** A singleton instance. */ static final OperatorMaterialize INSTANCE = new OperatorMaterialize(); } + /** * @return a singleton instance of this stateless operator. */ @SuppressWarnings("unchecked") public static OperatorMaterialize instance() { - return (OperatorMaterialize)Holder.INSTANCE; + return (OperatorMaterialize) Holder.INSTANCE; } - private OperatorMaterialize() { } + + private OperatorMaterialize() { + } + @Override public Subscriber call(final Subscriber> child) { - return new Subscriber(child) { - + final ParentSubscriber parent = new ParentSubscriber(child); + child.add(parent); + child.setProducer(new Producer() { @Override - public void onCompleted() { - child.onNext(Notification. createOnCompleted()); - child.onCompleted(); + public void request(long n) { + if (n > 0) { + parent.requestMore(n); + } } + }); + return parent; + } - @Override - public void onError(Throwable e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - child.onNext(Notification. createOnError(e)); - child.onCompleted(); - } + private static class ParentSubscriber extends Subscriber { - @Override - public void onNext(T t) { - child.onNext(Notification. createOnNext(t)); + private final Subscriber> child; + + private volatile Notification terminalNotification; + + // guarded by this + private boolean busy = false; + // guarded by this + private boolean missed = false; + + private volatile long requested; + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater + .newUpdater(ParentSubscriber.class, "requested"); + + ParentSubscriber(Subscriber> child) { + this.child = child; + } + + @Override + public void onStart() { + request(0); + } + + void requestMore(long n) { + BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + request(n); + drain(); + } + + @Override + public void onCompleted() { + terminalNotification = Notification.createOnCompleted(); + drain(); + } + + @Override + public void onError(Throwable e) { + terminalNotification = Notification.createOnError(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + drain(); + } + + @Override + public void onNext(T t) { + child.onNext(Notification.createOnNext(t)); + decrementRequested(); + } + + private void decrementRequested() { + // atomically decrement requested + while (true) { + long r = requested; + if (r == Long.MAX_VALUE) { + // don't decrement if unlimited requested + return; + } else if (REQUESTED.compareAndSet(this, r, r - 1)) { + return; + } } + } - }; + private void drain() { + synchronized (this) { + if (busy) { + // set flag to force extra loop if drain loop running + missed = true; + return; + } + } + // drain loop + while (!child.isUnsubscribed()) { + Notification tn; + tn = terminalNotification; + if (tn != null) { + if (requested > 0) { + // allow tn to be GC'd after the onNext call + terminalNotification = null; + // emit the terminal notification + child.onNext(tn); + if (!child.isUnsubscribed()) { + child.onCompleted(); + } + // note that we leave busy=true here + // which will prevent further drains + return; + } + } + // continue looping if drain() was called while in + // this loop + synchronized (this) { + if (!missed) { + busy = false; + return; + } + } + } + } } } diff --git a/src/test/java/rx/internal/operators/OperatorMaterializeTest.java b/src/test/java/rx/internal/operators/OperatorMaterializeTest.java index a900da61d6..511a79ed54 100644 --- a/src/test/java/rx/internal/operators/OperatorMaterializeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMaterializeTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.Arrays; import java.util.List; import java.util.Vector; import java.util.concurrent.ExecutionException; @@ -28,13 +29,18 @@ import rx.Notification; import rx.Observable; import rx.Subscriber; +import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; public class OperatorMaterializeTest { @Test public void testMaterialize1() { - // null will cause onError to be triggered before "three" can be returned - final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three"); + // null will cause onError to be triggered before "three" can be + // returned + final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, + "three"); TestObserver Observer = new TestObserver(); Observable> m = Observable.create(o1).materialize(); @@ -53,7 +59,8 @@ public void testMaterialize1() { assertTrue(Observer.notifications.get(0).isOnNext()); assertEquals("two", Observer.notifications.get(1).getValue()); assertTrue(Observer.notifications.get(1).isOnNext()); - assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable().getClass()); + assertEquals(NullPointerException.class, Observer.notifications.get(2).getThrowable() + .getClass()); assertTrue(Observer.notifications.get(2).isOnError()); } @@ -93,6 +100,107 @@ public void testMultipleSubscribes() throws InterruptedException, ExecutionExcep assertEquals(3, m.toList().toBlocking().toFuture().get().size()); } + @Test + public void testBackpressureOnEmptyStream() { + TestSubscriber> ts = TestSubscriber.create(0); + Observable. empty().materialize().subscribe(ts); + ts.assertNoValues(); + ts.requestMore(1); + ts.assertValueCount(1); + assertTrue(ts.getOnNextEvents().get(0).isOnCompleted()); + ts.assertCompleted(); + } + + @Test + public void testBackpressureNoError() { + TestSubscriber> ts = TestSubscriber.create(0); + Observable.just(1, 2, 3).materialize().subscribe(ts); + ts.assertNoValues(); + ts.requestMore(1); + ts.assertValueCount(1); + ts.requestMore(2); + ts.assertValueCount(3); + ts.requestMore(1); + ts.assertValueCount(4); + ts.assertCompleted(); + } + + @Test + public void testBackpressureNoErrorAsync() throws InterruptedException { + TestSubscriber> ts = TestSubscriber.create(0); + Observable.just(1, 2, 3) + .materialize() + .subscribeOn(Schedulers.computation()) + .subscribe(ts); + Thread.sleep(100); + ts.assertNoValues(); + ts.requestMore(1); + Thread.sleep(100); + ts.assertValueCount(1); + ts.requestMore(2); + Thread.sleep(100); + ts.assertValueCount(3); + ts.requestMore(1); + Thread.sleep(100); + ts.assertValueCount(4); + ts.assertCompleted(); + } + + @Test + public void testBackpressureWithError() { + TestSubscriber> ts = TestSubscriber.create(0); + Observable. error(new IllegalArgumentException()).materialize().subscribe(ts); + ts.assertNoValues(); + ts.requestMore(1); + ts.assertValueCount(1); + ts.assertCompleted(); + } + + @Test + public void testBackpressureWithEmissionThenError() { + TestSubscriber> ts = TestSubscriber.create(0); + IllegalArgumentException ex = new IllegalArgumentException(); + Observable.from(Arrays.asList(1)).concatWith(Observable. error(ex)).materialize() + .subscribe(ts); + ts.assertNoValues(); + ts.requestMore(1); + ts.assertValueCount(1); + assertTrue(ts.getOnNextEvents().get(0).hasValue()); + ts.requestMore(1); + ts.assertValueCount(2); + assertTrue(ts.getOnNextEvents().get(1).isOnError()); + assertTrue(ex == ts.getOnNextEvents().get(1).getThrowable()); + ts.assertCompleted(); + } + + @Test + public void testWithCompletionCausingError() { + TestSubscriber> ts = TestSubscriber.create(); + final RuntimeException ex = new RuntimeException("boo"); + Observable.empty().materialize().doOnNext(new Action1() { + @Override + public void call(Object t) { + throw ex; + } + }).subscribe(ts); + ts.assertError(ex); + ts.assertNoValues(); + ts.assertTerminalEvent(); + } + + @Test + public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving() { + TestSubscriber> ts = TestSubscriber.create(0); + IllegalArgumentException ex = new IllegalArgumentException(); + Observable.empty().materialize() + .subscribe(ts); + ts.assertNoValues(); + ts.unsubscribe(); + ts.requestMore(1); + ts.assertNoValues(); + ts.assertUnsubscribed(); + } + private static class TestObserver extends Subscriber> { boolean onCompleted = false;