Description
I've come across a couple of differences in the subscription behavior of Observable
, Single
, and Completable
that I was hoping I could get clarification on or bring up for discussion. The examples specifically revolve around converting an observable to those types and how its events are propagated.
In a normal Observable
, an onCompleted()
event automatically unsubscribes.
PublishSubject<String> stringSubject = PublishSubject.create();
Observable observable = stringSubject.asObservable();
Subscription observableSubscription = observable.subscribe();
stringSubject.onCompleted();
assertTrue(observableSubscription.isUnsubscribed());
This still holds true in Single
, but with the added caveat that an onCompleted
event actually propagates to onError()
if no event has been emitted prior for onSuccess()
. This would be sort of ok considering the Single
contract, but things get a little muddled in the sense that onSuccess()
does not actually unsubscribe despite being considered a terminal event (or so I thought). What this means is that onCompleted()
has to be called manually after onSuccess()
PublishSubject<String> stringSubject = PublishSubject.create();
Single single = stringSubject.toSingle();
Subscription singleSubscription = single.subscribe();
stringSubject.onNext("This is necessary");
stringSubject.onCompleted(); // This is necessary too
assertTrue(singleSubscription.isUnsubscribed());
Things get more confusing in Completable
, which offers no auto-unsubscribe after onComplete()
is called as far as I can tell.
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();
Subscription completableSubscription = completable.subscribe();
stringSubject.onCompleted();
assertTrue(completableSubscription.isUnsubscribed()); // This fails
This would imply that you always need to save the subscription and manually unsubscribe in onComplete()
or doOnComplete()
.
PublishSubject<String> stringSubject = PublishSubject.create();
Completable completable = stringSubject.toCompletable();
final CompositeSubscription set = new CompositeSubscription();
set.add(completable
.doOnComplete(new Action0() {
@Override
public void call() {
// Kludge
set.unsubscribe();
}
})
.subscribe());
stringSubject.onCompleted();
assertTrue(set.isUnsubscribed()); // Now it works
I'm not sure if this behavior still holds true when dealing with "pure" Single
and Completable
subscriptions, I can investigate more if need be.
It does seem inconsistent to me, or at the very least prone to causing passive leaks or unexpected behavior due to subscriptions living on past "terminal" events. Maybe some clarification is needed on what constitutes a "terminal" event in Single
and Completable
. Would love to get some more insight from the collaborators that have worked on these.