Skip to content

Commit 5410d68

Browse files
committed
1.x: Enabled Single onSubscribeStart hook
1 parent 31d0ab4 commit 5410d68

File tree

2 files changed

+8
-11
lines changed

2 files changed

+8
-11
lines changed

src/main/java/rx/Single.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,14 +1569,12 @@ public final void onNext(T args) {
15691569
* @param subscriber
15701570
* the Subscriber that will handle the emission or notification from the Single
15711571
*/
1572-
public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
1572+
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
15731573
try {
15741574
// new Subscriber so onStart it
15751575
subscriber.onStart();
1576-
// TODO add back the hook
1577-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1578-
onSubscribe.call(subscriber);
1579-
hook.onSubscribeReturn(subscriber);
1576+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1577+
return hook.onSubscribeReturn(subscriber);
15801578
} catch (Throwable e) {
15811579
// special handling for certain Throwable/Error/Exception types
15821580
Exceptions.throwIfFatal(e);
@@ -1593,6 +1591,7 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
15931591
// TODO why aren't we throwing the hook's return value.
15941592
throw r;
15951593
}
1594+
return Subscriptions.unsubscribed();
15961595
}
15971596
}
15981597

@@ -1660,9 +1659,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
16601659
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
16611660
try {
16621661
// allow the hook to intercept and/or decorate
1663-
// TODO add back the hook
1664-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1665-
onSubscribe.call(subscriber);
1662+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
16661663
return hook.onSubscribeReturn(subscriber);
16671664
} catch (Throwable e) {
16681665
// special handling for certain Throwable/Error/Exception types

src/main/java/rx/plugins/RxJavaSingleExecutionHook.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ public <T> Single.OnSubscribe<T> onCreate(Single.OnSubscribe<T> f) {
6161
* logging, metrics and other such things and pass-thru the function.
6262
*
6363
* @param onSubscribe
64-
* original {@link Single.OnSubscribe}<{@code T}> to be executed
65-
* @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
64+
* original {@link Observable.OnSubscribe}<{@code T}> to be executed
65+
* @return {@link Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
6666
* returned as a pass-thru
6767
*/
68-
public <T> Single.OnSubscribe<T> onSubscribeStart(Single<? extends T> singleInstance, final Single.OnSubscribe<T> onSubscribe) {
68+
public <T> Observable.OnSubscribe<T> onSubscribeStart(Single<? extends T> singleInstance, final Observable.OnSubscribe<T> onSubscribe) {
6969
// pass-thru by default
7070
return onSubscribe;
7171
}

0 commit comments

Comments
 (0)