Skip to content

Commit 4b61060

Browse files
akarnokdzsxwing
authored andcommitted
1.x: add Completable.safeSubscribe option + RxJavaPlugins hook support (#3942)
Add option to safely subscribe a CompletableSubscriber / regular Subscriber and handle onXXX failures.
1 parent 44947d9 commit 4b61060

18 files changed

+685
-109
lines changed

src/main/java/rx/Completable.java

Lines changed: 120 additions & 60 deletions
Large diffs are not rendered by default.

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ public void onSubscribe(Subscription d) {
20242024
serial.add(main);
20252025
child.add(serial);
20262026

2027-
other.subscribe(so);
2027+
other.unsafeSubscribe(so);
20282028

20292029
return main;
20302030
}

src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void next() {
130130
return;
131131
}
132132

133-
c.subscribe(inner);
133+
c.unsafeSubscribe(inner);
134134
}
135135

136136
final class ConcatInnerSubscriber implements CompletableSubscriber {

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ void next() {
8989
return;
9090
}
9191

92-
a[idx].subscribe(this);
92+
a[idx].unsafeSubscribe(this);
9393
} while (decrementAndGet() != 0);
9494
}
9595
}

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void next() {
128128
return;
129129
}
130130

131-
c.subscribe(this);
131+
c.unsafeSubscribe(this);
132132
} while (decrementAndGet() != 0);
133133
}
134134
}

src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onNext(Completable t) {
101101

102102
wip.getAndIncrement();
103103

104-
t.subscribe(new CompletableSubscriber() {
104+
t.unsafeSubscribe(new CompletableSubscriber() {
105105
Subscription d;
106106
boolean innerDone;
107107
@Override

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void call(final CompletableSubscriber s) {
5454
}
5555
}
5656

57-
c.subscribe(new CompletableSubscriber() {
57+
c.unsafeSubscribe(new CompletableSubscriber() {
5858
@Override
5959
public void onSubscribe(Subscription d) {
6060
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void call(final CompletableSubscriber s) {
5151
continue;
5252
}
5353

54-
c.subscribe(new CompletableSubscriber() {
54+
c.unsafeSubscribe(new CompletableSubscriber() {
5555
@Override
5656
public void onSubscribe(Subscription d) {
5757
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void call(final CompletableSubscriber s) {
117117

118118
wip.getAndIncrement();
119119

120-
c.subscribe(new CompletableSubscriber() {
120+
c.unsafeSubscribe(new CompletableSubscriber() {
121121
@Override
122122
public void onSubscribe(Subscription d) {
123123
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void call(final CompletableSubscriber s) {
110110

111111
wip.getAndIncrement();
112112

113-
c.subscribe(new CompletableSubscriber() {
113+
c.unsafeSubscribe(new CompletableSubscriber() {
114114
@Override
115115
public void onSubscribe(Subscription d) {
116116
set.add(d);

0 commit comments

Comments
 (0)