diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 27a97b4b3d..75369bcf08 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5034,6 +5034,44 @@ public final Observable onBackpressureBuffer() { return lift(new OperatorOnBackpressureBuffer()); } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them to buffer + * up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a + * {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all + * undelivered items, and unsubscribing from the source. + *

+ * + *

+ *
Scheduler:
+ *
{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the source Observable modified to buffer items up to the given capacity. + * @see RxJava wiki: Backpressure + */ + public final Observable onBackpressureBuffer(long capacity) { + return lift(new OperatorOnBackpressureBuffer(capacity)); + } + + /** + * Instructs an Observable that is emitting items faster than its observer can consume them to buffer + * up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a + * {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all + * undelivered items, unsubscribing from the source, and notifying the producer with {@code onOverflow}. + *

+ * + *

+ *
Scheduler:
+ *
{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the source Observable modified to buffer items up to the given capacity. + * @see RxJava wiki: Backpressure + */ + public final Observable onBackpressureBuffer(long capacity, Func0 onOverflow) { + return lift(new OperatorOnBackpressureBuffer(capacity, onOverflow)); + } + /** * Instructs an Observable that is emitting items faster than its observer can consume them to discard, * rather than emit, those items that its observer is not prepared to observe. diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index f288132db9..6cdf00a694 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -15,23 +15,46 @@ */ package rx.internal.operators; +import java.nio.BufferOverflowException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; +import rx.functions.Func0; public class OperatorOnBackpressureBuffer implements Operator { private final NotificationLite on = NotificationLite.instance(); + private final Long capacity; + private final Func0 onOverflow; + + public OperatorOnBackpressureBuffer() { + this.capacity = null; + this.onOverflow = null; + } + + public OperatorOnBackpressureBuffer(long capacity) { + this(capacity, null); + } + + public OperatorOnBackpressureBuffer(long capacity, Func0 onOverflow) { + if (capacity <= 0) { + throw new IllegalArgumentException("Buffer capacity must be > 0"); + } + this.capacity = capacity; + this.onOverflow = onOverflow; + } + @Override public Subscriber call(final Subscriber child) { // TODO get a different queue implementation - // TODO start with size hint final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity); final AtomicLong wip = new AtomicLong(); final AtomicLong requested = new AtomicLong(); @@ -40,7 +63,7 @@ public Subscriber call(final Subscriber child) { @Override public void request(long n) { if (requested.getAndAdd(n) == 0) { - pollQueue(wip, requested, queue, child); + pollQueue(wip, requested, capacity, queue, child); } } @@ -48,6 +71,9 @@ public void request(long n) { // don't pass through subscriber as we are async and doing queue draining // a parent being unsubscribed should not affect the children Subscriber parent = new Subscriber() { + + private AtomicBoolean saturated = new AtomicBoolean(false); + @Override public void onStart() { request(Long.MAX_VALUE); @@ -56,21 +82,47 @@ public void onStart() { @Override public void onCompleted() { queue.offer(on.completed()); - pollQueue(wip, requested, queue, child); + pollQueue(wip, requested, capacity, queue, child); } @Override public void onError(Throwable e) { queue.offer(on.error(e)); - pollQueue(wip, requested, queue, child); + pollQueue(wip, requested, capacity, queue, child); } @Override public void onNext(T t) { + if (!ensureCapacity()) { + return; + } queue.offer(on.next(t)); - pollQueue(wip, requested, queue, child); + pollQueue(wip, requested, capacity, queue, child); } + private boolean ensureCapacity() { + if (capacity == null) { + return true; + } + + long currCapacity; + do { + currCapacity = capacity.get(); + if (currCapacity <= 0) { + if (saturated.compareAndSet(false, true)) { + // ensure single completion contract + child.onError(new BufferOverflowException()); + unsubscribe(); + if (onOverflow != null) { + onOverflow.call(); + } + } + return false; + } + // ensure no other thread stole our slot, or retry + } while (!capacity.compareAndSet(currCapacity, currCapacity - 1)); + return true; + } }; // if child unsubscribes it should unsubscribe the parent, but not the other way around @@ -79,7 +131,7 @@ public void onNext(T t) { return parent; } - private void pollQueue(AtomicLong wip, AtomicLong requested, Queue queue, Subscriber child) { + private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue queue, Subscriber child) { // TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue? if (requested.get() > 0) { // only one draining at a time @@ -96,6 +148,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue queue requested.incrementAndGet(); return; } + if (capacity != null) { // it's bounded + capacity.incrementAndGet(); + } on.accept(child, o); } else { // we hit the end ... so increment back to 0 again diff --git a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java index 5ca64a0874..ee057f2e84 100644 --- a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java @@ -15,8 +15,7 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; - +import java.nio.BufferOverflowException; import java.util.concurrent.CountDownLatch; import org.junit.Test; @@ -25,9 +24,15 @@ import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func0; +import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class OperatorOnBackpressureBufferTest { @Test @@ -81,6 +86,68 @@ public void onNext(Long t) { assertEquals(499, ts.getOnNextEvents().get(499).intValue()); } + @Test(expected = IllegalArgumentException.class) + public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException { + Observable.empty().onBackpressureBuffer(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testFixBackpressureBufferZeroCapacity() throws InterruptedException { + Observable.empty().onBackpressureBuffer(-1); + } + + @Test(timeout = 500) + public void testFixBackpressureBoundedBuffer() throws InterruptedException { + final CountDownLatch l1 = new CountDownLatch(250); + final CountDownLatch l2 = new CountDownLatch(500); + final CountDownLatch l3 = new CountDownLatch(1); + TestSubscriber ts = new TestSubscriber(new Observer() { + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Long t) { + l1.countDown(); + l2.countDown(); + } + + }); + + ts.requestMore(500); + + final ConnectableObservable flood = + infinite.subscribeOn(Schedulers.computation()) + .publish(); + final ConnectableObservable batch = + infinite.subscribeOn(Schedulers.computation()) + .onBackpressureBuffer(100, new Func0() { + @Override + public Void call() { + l3.countDown(); + return null; + } + }).publish(); + Subscription s = batch.subscribe(ts); + batch.connect(); // first controlled batch + + l1.await(); + flood.connect(); // open flood + l2.await(); // ts can only swallow 250 more + l3.await(); // hold until it chokes + + assertEquals(500, ts.getOnNextEvents().size()); + assertEquals(0, ts.getOnNextEvents().get(0).intValue()); + assertTrue(ts.getOnErrorEvents().get(0) instanceof BufferOverflowException); + assertTrue(s.isUnsubscribed()); + + } + static final Observable infinite = Observable.create(new OnSubscribe() { @Override @@ -92,4 +159,5 @@ public void call(Subscriber s) { } }); + }