Skip to content
This repository was archived by the owner on Mar 21, 2018. It is now read-only.

Commit 10926d4

Browse files
committed
Merge pull request #1 from akarnokd/ReplayRequestCoordinationFix2x
Fix for bounded replay() not requesting enough for latecommers
2 parents da1d710 + 7533849 commit 10926d4

File tree

3 files changed

+128
-15
lines changed

3 files changed

+128
-15
lines changed

src/main/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplay.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ public void subscribe(Subscriber<? super T> child) {
219219
// if it fails, no worries because we will still have its buffer
220220
// so it is going to replay it for us
221221
r.add(inner);
222+
// trigger the capturing of the current node and total requested
223+
r.buffer.replay(inner);
222224
// the producer has been registered with the current subscriber-to-source so
223225
// at least it will receive the next terminal event
224226
// setting the producer will trigger the first request to be considered by
@@ -838,8 +840,10 @@ static final class Node extends AtomicReference<Node> {
838840
/** */
839841
private static final long serialVersionUID = 245354315435971818L;
840842
final Object value;
841-
public Node(Object value) {
843+
final long index;
844+
public Node(Object value, long index) {
842845
this.value = value;
846+
this.index = index;
843847
}
844848
}
845849

@@ -856,8 +860,10 @@ static class BoundedReplayBuffer<T> extends AtomicReference<Node> implements Rep
856860
Node tail;
857861
int size;
858862

863+
long index;
864+
859865
public BoundedReplayBuffer() {
860-
Node n = new Node(null);
866+
Node n = new Node(null, 0);
861867
tail = n;
862868
set(n);
863869
}
@@ -906,23 +912,23 @@ final void setFirst(Node n) {
906912
@Override
907913
public final void next(T value) {
908914
Object o = enterTransform(NotificationLite.next(value));
909-
Node n = new Node(o);
915+
Node n = new Node(o, ++index);
910916
addLast(n);
911917
truncate();
912918
}
913919

914920
@Override
915921
public final void error(Throwable e) {
916922
Object o = enterTransform(NotificationLite.error(e));
917-
Node n = new Node(o);
923+
Node n = new Node(o, ++index);
918924
addLast(n);
919925
truncateFinal();
920926
}
921927

922928
@Override
923929
public final void complete() {
924930
Object o = enterTransform(NotificationLite.complete());
925-
Node n = new Node(o);
931+
Node n = new Node(o, ++index);
926932
addLast(n);
927933
truncateFinal();
928934
}
@@ -942,13 +948,15 @@ public final void replay(InnerSubscription<T> output) {
942948
}
943949

944950
long r = output.get();
945-
long r0 = r;
951+
boolean unbounded = r == Long.MAX_VALUE;
946952
long e = 0L;
947953

948954
Node node = output.index();
949955
if (node == null) {
950956
node = get();
951957
output.index = node;
958+
959+
BackpressureHelper.add(output.totalRequested, node.index);
952960
}
953961

954962
while (r != 0) {
@@ -969,6 +977,7 @@ public final void replay(InnerSubscription<T> output) {
969977
return;
970978
}
971979
e++;
980+
r--;
972981
node = v;
973982
} else {
974983
break;
@@ -980,7 +989,7 @@ public final void replay(InnerSubscription<T> output) {
980989

981990
if (e != 0L) {
982991
output.index = node;
983-
if (r0 != Long.MAX_VALUE) {
992+
if (!unbounded) {
984993
output.produced(e);
985994
}
986995
}

src/test/java/hu/akarnokd/rxjava2/internal/operators/OperatorReplayTest.java

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -733,11 +733,11 @@ public void dispose() {
733733
@Test
734734
public void testBoundedReplayBuffer() {
735735
BoundedReplayBuffer<Integer> buf = new BoundedReplayBuffer<Integer>();
736-
buf.addLast(new Node(1));
737-
buf.addLast(new Node(2));
738-
buf.addLast(new Node(3));
739-
buf.addLast(new Node(4));
740-
buf.addLast(new Node(5));
736+
buf.addLast(new Node(1, 0));
737+
buf.addLast(new Node(2, 1));
738+
buf.addLast(new Node(3, 2));
739+
buf.addLast(new Node(4, 3));
740+
buf.addLast(new Node(5, 4));
741741

742742
List<Integer> values = new ArrayList<Integer>();
743743
buf.collect(values);
@@ -752,8 +752,8 @@ public void testBoundedReplayBuffer() {
752752
buf.collect(values);
753753
Assert.assertTrue(values.isEmpty());
754754

755-
buf.addLast(new Node(5));
756-
buf.addLast(new Node(6));
755+
buf.addLast(new Node(5, 5));
756+
buf.addLast(new Node(6, 6));
757757
buf.collect(values);
758758

759759
Assert.assertEquals(Arrays.asList(5, 6), values);
@@ -1131,5 +1131,109 @@ public void accept(long t) {
11311131

11321132
Assert.assertEquals(Arrays.asList(5L, 5L), requests);
11331133
}
1134+
1135+
@Test
1136+
public void testSubscribersComeAndGoAtRequestBoundaries() {
1137+
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(1);
1138+
source.connect();
1139+
1140+
TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(2L);
1141+
1142+
source.subscribe(ts1);
1143+
1144+
ts1.assertValues(1, 2);
1145+
ts1.assertNoErrors();
1146+
ts1.dispose();
1147+
1148+
TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(2L);
1149+
1150+
source.subscribe(ts2);
1151+
1152+
ts2.assertValues(2, 3);
1153+
ts2.assertNoErrors();
1154+
ts2.dispose();
1155+
1156+
TestSubscriber<Integer> ts21 = new TestSubscriber<Integer>(1L);
1157+
1158+
source.subscribe(ts21);
1159+
1160+
ts21.assertValues(3);
1161+
ts21.assertNoErrors();
1162+
ts21.dispose();
1163+
1164+
TestSubscriber<Integer> ts22 = new TestSubscriber<Integer>(1L);
1165+
1166+
source.subscribe(ts22);
1167+
1168+
ts22.assertValues(3);
1169+
ts22.assertNoErrors();
1170+
ts22.dispose();
1171+
1172+
1173+
TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();
1174+
1175+
source.subscribe(ts3);
1176+
1177+
ts3.assertNoErrors();
1178+
System.out.println(ts3.values());
1179+
ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
1180+
ts3.assertComplete();
1181+
}
11341182

1183+
@Test
1184+
public void testSubscribersComeAndGoAtRequestBoundaries2() {
1185+
ConnectableObservable<Integer> source = Observable.range(1, 10).replay(2);
1186+
source.connect();
1187+
1188+
TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>(2L);
1189+
1190+
source.subscribe(ts1);
1191+
1192+
ts1.assertValues(1, 2);
1193+
ts1.assertNoErrors();
1194+
ts1.dispose();
1195+
1196+
TestSubscriber<Integer> ts11 = new TestSubscriber<Integer>(2L);
1197+
1198+
source.subscribe(ts11);
1199+
1200+
ts11.assertValues(1, 2);
1201+
ts11.assertNoErrors();
1202+
ts11.dispose();
1203+
1204+
TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(3L);
1205+
1206+
source.subscribe(ts2);
1207+
1208+
ts2.assertValues(1, 2, 3);
1209+
ts2.assertNoErrors();
1210+
ts2.dispose();
1211+
1212+
TestSubscriber<Integer> ts21 = new TestSubscriber<Integer>(1L);
1213+
1214+
source.subscribe(ts21);
1215+
1216+
ts21.assertValues(2);
1217+
ts21.assertNoErrors();
1218+
ts21.dispose();
1219+
1220+
TestSubscriber<Integer> ts22 = new TestSubscriber<Integer>(1L);
1221+
1222+
source.subscribe(ts22);
1223+
1224+
ts22.assertValues(2);
1225+
ts22.assertNoErrors();
1226+
ts22.dispose();
1227+
1228+
1229+
TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();
1230+
1231+
source.subscribe(ts3);
1232+
1233+
ts3.assertNoErrors();
1234+
System.out.println(ts3.values());
1235+
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
1236+
ts3.assertComplete();
1237+
}
1238+
11351239
}

src/test/java/hu/akarnokd/rxjava2/schedulers/ExecutorSchedulerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static void testCancelledRetention(Scheduler.Worker w, boolean periodic)
6363

6464
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
6565

66-
int n = 200 * 1000;
66+
int n = 100 * 1000;
6767
if (periodic) {
6868
final CountDownLatch cdl = new CountDownLatch(n);
6969
final Runnable action = new Runnable() {

0 commit comments

Comments
 (0)