Skip to content

Commit 05bbf63

Browse files
Zac SweersZac Sweers
authored andcommitted
Add takeUntil(Completable) support and standardize tests
1 parent e917c77 commit 05bbf63

File tree

2 files changed

+245
-241
lines changed

2 files changed

+245
-241
lines changed

src/main/java/rx/Single.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,79 @@ public void onError(Throwable error) {
18021802
});
18031803
}
18041804

1805+
/**
1806+
* Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
1807+
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
1808+
* {@link SingleSubscriber#onSuccess(Object)}.
1809+
* <p>
1810+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1811+
* <dl>
1812+
* <dt><b>Scheduler:</b></dt>
1813+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1814+
* </dl>
1815+
*
1816+
* @param other
1817+
* the Completable whose termination will cause {@code takeUntil} to emit the item from the source
1818+
* Single
1819+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
1820+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1821+
*/
1822+
public final Single<T> takeUntil(final Completable other) {
1823+
return lift(new Operator<T, T>() {
1824+
@Override
1825+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1826+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1827+
1828+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1829+
@Override
1830+
public void onNext(T t) {
1831+
serial.onNext(t);
1832+
}
1833+
@Override
1834+
public void onError(Throwable e) {
1835+
try {
1836+
serial.onError(e);
1837+
} finally {
1838+
serial.unsubscribe();
1839+
}
1840+
}
1841+
@Override
1842+
public void onCompleted() {
1843+
try {
1844+
serial.onCompleted();
1845+
} finally {
1846+
serial.unsubscribe();
1847+
}
1848+
}
1849+
};
1850+
1851+
final Completable.CompletableSubscriber so = new Completable.CompletableSubscriber() {
1852+
@Override
1853+
public void onCompleted() {
1854+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1855+
}
1856+
1857+
@Override
1858+
public void onError(Throwable e) {
1859+
main.onError(e);
1860+
}
1861+
1862+
@Override
1863+
public void onSubscribe(Subscription d) {
1864+
serial.add(d);
1865+
}
1866+
};
1867+
1868+
serial.add(main);
1869+
child.add(serial);
1870+
1871+
other.subscribe(so);
1872+
1873+
return main;
1874+
}
1875+
});
1876+
}
1877+
18051878
/**
18061879
* Returns a Single that emits the item emitted by the source Single until an Observable emits an item. Upon
18071880
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to

0 commit comments

Comments
 (0)