From 7533849d2d5c69ee8f0ba14f8d35c5d974f60383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sat, 17 Oct 2015 00:08:55 +0200 Subject: [PATCH] Fix for bounded replay() not requesting enough for latecommers --- .../internal/operators/OperatorReplay.java | 23 ++-- .../operators/OperatorReplayTest.java | 118 ++++++++++++++++-- .../schedulers/ExecutorSchedulerTest.java | 2 +- 3 files changed, 128 insertions(+), 15 deletions(-) diff --git a/src/main/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplay.java b/src/main/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplay.java index 3fe10dc..6002b59 100644 --- a/src/main/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplay.java +++ b/src/main/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplay.java @@ -219,6 +219,8 @@ public void subscribe(Subscriber child) { // if it fails, no worries because we will still have its buffer // so it is going to replay it for us r.add(inner); + // trigger the capturing of the current node and total requested + r.buffer.replay(inner); // the producer has been registered with the current subscriber-to-source so // at least it will receive the next terminal event // setting the producer will trigger the first request to be considered by @@ -838,8 +840,10 @@ static final class Node extends AtomicReference { /** */ private static final long serialVersionUID = 245354315435971818L; final Object value; - public Node(Object value) { + final long index; + public Node(Object value, long index) { this.value = value; + this.index = index; } } @@ -856,8 +860,10 @@ static class BoundedReplayBuffer extends AtomicReference implements Rep Node tail; int size; + long index; + public BoundedReplayBuffer() { - Node n = new Node(null); + Node n = new Node(null, 0); tail = n; set(n); } @@ -906,7 +912,7 @@ final void setFirst(Node n) { @Override public final void next(T value) { Object o = enterTransform(NotificationLite.next(value)); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncate(); } @@ -914,7 +920,7 @@ public final void next(T value) { @Override public final void error(Throwable e) { Object o = enterTransform(NotificationLite.error(e)); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncateFinal(); } @@ -922,7 +928,7 @@ public final void error(Throwable e) { @Override public final void complete() { Object o = enterTransform(NotificationLite.complete()); - Node n = new Node(o); + Node n = new Node(o, ++index); addLast(n); truncateFinal(); } @@ -942,13 +948,15 @@ public final void replay(InnerSubscription output) { } long r = output.get(); - long r0 = r; + boolean unbounded = r == Long.MAX_VALUE; long e = 0L; Node node = output.index(); if (node == null) { node = get(); output.index = node; + + BackpressureHelper.add(output.totalRequested, node.index); } while (r != 0) { @@ -969,6 +977,7 @@ public final void replay(InnerSubscription output) { return; } e++; + r--; node = v; } else { break; @@ -980,7 +989,7 @@ public final void replay(InnerSubscription output) { if (e != 0L) { output.index = node; - if (r0 != Long.MAX_VALUE) { + if (!unbounded) { output.produced(e); } } diff --git a/src/test/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplayTest.java b/src/test/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplayTest.java index 4d117ce..4fd4a26 100644 --- a/src/test/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplayTest.java +++ b/src/test/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplayTest.java @@ -733,11 +733,11 @@ public void dispose() { @Test public void testBoundedReplayBuffer() { BoundedReplayBuffer buf = new BoundedReplayBuffer(); - buf.addLast(new Node(1)); - buf.addLast(new Node(2)); - buf.addLast(new Node(3)); - buf.addLast(new Node(4)); - buf.addLast(new Node(5)); + buf.addLast(new Node(1, 0)); + buf.addLast(new Node(2, 1)); + buf.addLast(new Node(3, 2)); + buf.addLast(new Node(4, 3)); + buf.addLast(new Node(5, 4)); List values = new ArrayList(); buf.collect(values); @@ -752,8 +752,8 @@ public void testBoundedReplayBuffer() { buf.collect(values); Assert.assertTrue(values.isEmpty()); - buf.addLast(new Node(5)); - buf.addLast(new Node(6)); + buf.addLast(new Node(5, 5)); + buf.addLast(new Node(6, 6)); buf.collect(values); Assert.assertEquals(Arrays.asList(5, 6), values); @@ -1131,5 +1131,109 @@ public void accept(long t) { Assert.assertEquals(Arrays.asList(5L, 5L), requests); } + + @Test + public void testSubscribersComeAndGoAtRequestBoundaries() { + ConnectableObservable source = Observable.range(1, 10).replay(1); + source.connect(); + + TestSubscriber ts1 = new TestSubscriber(2L); + + source.subscribe(ts1); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.dispose(); + + TestSubscriber ts2 = new TestSubscriber(2L); + + source.subscribe(ts2); + + ts2.assertValues(2, 3); + ts2.assertNoErrors(); + ts2.dispose(); + + TestSubscriber ts21 = new TestSubscriber(1L); + + source.subscribe(ts21); + + ts21.assertValues(3); + ts21.assertNoErrors(); + ts21.dispose(); + + TestSubscriber ts22 = new TestSubscriber(1L); + + source.subscribe(ts22); + + ts22.assertValues(3); + ts22.assertNoErrors(); + ts22.dispose(); + + + TestSubscriber ts3 = new TestSubscriber(); + + source.subscribe(ts3); + + ts3.assertNoErrors(); + System.out.println(ts3.values()); + ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10); + ts3.assertComplete(); + } + @Test + public void testSubscribersComeAndGoAtRequestBoundaries2() { + ConnectableObservable source = Observable.range(1, 10).replay(2); + source.connect(); + + TestSubscriber ts1 = new TestSubscriber(2L); + + source.subscribe(ts1); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.dispose(); + + TestSubscriber ts11 = new TestSubscriber(2L); + + source.subscribe(ts11); + + ts11.assertValues(1, 2); + ts11.assertNoErrors(); + ts11.dispose(); + + TestSubscriber ts2 = new TestSubscriber(3L); + + source.subscribe(ts2); + + ts2.assertValues(1, 2, 3); + ts2.assertNoErrors(); + ts2.dispose(); + + TestSubscriber ts21 = new TestSubscriber(1L); + + source.subscribe(ts21); + + ts21.assertValues(2); + ts21.assertNoErrors(); + ts21.dispose(); + + TestSubscriber ts22 = new TestSubscriber(1L); + + source.subscribe(ts22); + + ts22.assertValues(2); + ts22.assertNoErrors(); + ts22.dispose(); + + + TestSubscriber ts3 = new TestSubscriber(); + + source.subscribe(ts3); + + ts3.assertNoErrors(); + System.out.println(ts3.values()); + ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10); + ts3.assertComplete(); + } + } \ No newline at end of file diff --git a/src/test/java/hu/akarnokd/rxjava2/schedulers/ExecutorSchedulerTest.java b/src/test/java/hu/akarnokd/rxjava2/schedulers/ExecutorSchedulerTest.java index 6a762cd..3d39261 100644 --- a/src/test/java/hu/akarnokd/rxjava2/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/hu/akarnokd/rxjava2/schedulers/ExecutorSchedulerTest.java @@ -63,7 +63,7 @@ public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - int n = 200 * 1000; + int n = 100 * 1000; if (periodic) { final CountDownLatch cdl = new CountDownLatch(n); final Runnable action = new Runnable() {