Skip to content
This repository was archived by the owner on Mar 21, 2018. It is now read-only.

Fix for bounded replay() not requesting enough for latecommers #1

Merged
merged 1 commit into from
Oct 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public void subscribe(Subscriber<? super T> child) {
// if it fails, no worries because we will still have its buffer
// so it is going to replay it for us
r.add(inner);
// trigger the capturing of the current node and total requested
r.buffer.replay(inner);
// the producer has been registered with the current subscriber-to-source so
// at least it will receive the next terminal event
// setting the producer will trigger the first request to be considered by
Expand Down Expand Up @@ -838,8 +840,10 @@ static final class Node extends AtomicReference<Node> {
/** */
private static final long serialVersionUID = 245354315435971818L;
final Object value;
public Node(Object value) {
final long index;
public Node(Object value, long index) {
this.value = value;
this.index = index;
}
}

Expand All @@ -856,8 +860,10 @@ static class BoundedReplayBuffer<T> extends AtomicReference<Node> implements Rep
Node tail;
int size;

long index;

public BoundedReplayBuffer() {
Node n = new Node(null);
Node n = new Node(null, 0);
tail = n;
set(n);
}
Expand Down Expand Up @@ -906,23 +912,23 @@ final void setFirst(Node n) {
@Override
public final void next(T value) {
Object o = enterTransform(NotificationLite.next(value));
Node n = new Node(o);
Node n = new Node(o, ++index);
addLast(n);
truncate();
}

@Override
public final void error(Throwable e) {
Object o = enterTransform(NotificationLite.error(e));
Node n = new Node(o);
Node n = new Node(o, ++index);
addLast(n);
truncateFinal();
}

@Override
public final void complete() {
Object o = enterTransform(NotificationLite.complete());
Node n = new Node(o);
Node n = new Node(o, ++index);
addLast(n);
truncateFinal();
}
Expand All @@ -942,13 +948,15 @@ public final void replay(InnerSubscription<T> output) {
}

long r = output.get();
long r0 = r;
boolean unbounded = r == Long.MAX_VALUE;
long e = 0L;

Node node = output.index();
if (node == null) {
node = get();
output.index = node;

BackpressureHelper.add(output.totalRequested, node.index);
}

while (r != 0) {
Expand All @@ -969,6 +977,7 @@ public final void replay(InnerSubscription<T> output) {
return;
}
e++;
r--;
node = v;
} else {
break;
Expand All @@ -980,7 +989,7 @@ public final void replay(InnerSubscription<T> output) {

if (e != 0L) {
output.index = node;
if (r0 != Long.MAX_VALUE) {
if (!unbounded) {
output.produced(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,11 @@ public void dispose() {
@Test
public void testBoundedReplayBuffer() {
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>();
buf.addLast(new Node(1));
buf.addLast(new Node(2));
buf.addLast(new Node(3));
buf.addLast(new Node(4));
buf.addLast(new Node(5));
buf.addLast(new Node(1, 0));
buf.addLast(new Node(2, 1));
buf.addLast(new Node(3, 2));
buf.addLast(new Node(4, 3));
buf.addLast(new Node(5, 4));

List<Integer> values = new ArrayList<Integer>();
buf.collect(values);
Expand All @@ -752,8 +752,8 @@ public void testBoundedReplayBuffer() {
buf.collect(values);
Assert.assertTrue(values.isEmpty());

buf.addLast(new Node(5));
buf.addLast(new Node(6));
buf.addLast(new Node(5, 5));
buf.addLast(new Node(6, 6));
buf.collect(values);

Assert.assertEquals(Arrays.asList(5, 6), values);
Expand Down Expand Up @@ -1131,5 +1131,109 @@ public void accept(long t) {

Assert.assertEquals(Arrays.asList(5L, 5L), requests);
}

@Test
public void testSubscribersComeAndGoAtRequestBoundaries() {
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(1);
source.connect();

TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(2L);

source.subscribe(ts1);

ts1.assertValues(1, 2);
ts1.assertNoErrors();
ts1.dispose();

TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(2L);

source.subscribe(ts2);

ts2.assertValues(2, 3);
ts2.assertNoErrors();
ts2.dispose();

TestSubscriber<Integer> ts21 = new TestSubscriber<Integer>(1L);

source.subscribe(ts21);

ts21.assertValues(3);
ts21.assertNoErrors();
ts21.dispose();

TestSubscriber<Integer> ts22 = new TestSubscriber<Integer>(1L);

source.subscribe(ts22);

ts22.assertValues(3);
ts22.assertNoErrors();
ts22.dispose();


TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();

source.subscribe(ts3);

ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}

@Test
public void testSubscribersComeAndGoAtRequestBoundaries2() {
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(2);
source.connect();

TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(2L);

source.subscribe(ts1);

ts1.assertValues(1, 2);
ts1.assertNoErrors();
ts1.dispose();

TestSubscriber<Integer> ts11 = new TestSubscriber<Integer>(2L);

source.subscribe(ts11);

ts11.assertValues(1, 2);
ts11.assertNoErrors();
ts11.dispose();

TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(3L);

source.subscribe(ts2);

ts2.assertValues(1, 2, 3);
ts2.assertNoErrors();
ts2.dispose();

TestSubscriber<Integer> ts21 = new TestSubscriber<Integer>(1L);

source.subscribe(ts21);

ts21.assertValues(2);
ts21.assertNoErrors();
ts21.dispose();

TestSubscriber<Integer> ts22 = new TestSubscriber<Integer>(1L);

source.subscribe(ts22);

ts22.assertValues(2);
ts22.assertNoErrors();
ts22.dispose();


TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();

source.subscribe(ts3);

ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void testCancelledRetention(Scheduler.Worker w, boolean periodic)

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

int n = 200 * 1000;
int n = 100 * 1000;
if (periodic) {
final CountDownLatch cdl = new CountDownLatch(n);
final Runnable action = new Runnable() {
Expand Down