Skip to content

Commit 3e6affd

Browse files
committed
Merge pull request #3712 from hzsweers/z/single_takeuntil
Add takeUntil support in Single
2 parents 3af1afe + 05bbf63 commit 3e6affd

File tree

2 files changed

+523
-8
lines changed

2 files changed

+523
-8
lines changed

src/main/java/rx/Single.java

Lines changed: 227 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
*/
1313
package rx;
1414

15+
import java.util.Collection;
16+
import java.util.concurrent.*;
17+
1518
import rx.Observable.Operator;
1619
import rx.annotations.Beta;
1720
import rx.annotations.Experimental;
@@ -23,15 +26,13 @@
2326
import rx.internal.util.ScalarSynchronousSingle;
2427
import rx.internal.util.UtilityFunctions;
2528
import rx.observers.SafeSubscriber;
29+
import rx.observers.SerializedSubscriber;
2630
import rx.plugins.RxJavaObservableExecutionHook;
2731
import rx.plugins.RxJavaPlugins;
2832
import rx.schedulers.Schedulers;
2933
import rx.singles.BlockingSingle;
3034
import rx.subscriptions.Subscriptions;
3135

32-
import java.util.Collection;
33-
import java.util.concurrent.*;
34-
3536
/**
3637
* The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the
3738
* implementation of the Reactive Pattern for a stream or vector of values.
@@ -1800,6 +1801,229 @@ public void onError(Throwable error) {
18001801
}
18011802
});
18021803
}
1804+
1805+
/**
1806+
* Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
1807+
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
1808+
* {@link SingleSubscriber#onSuccess(Object)}.
1809+
* <p>
1810+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1811+
* <dl>
1812+
* <dt><b>Scheduler:</b></dt>
1813+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1814+
* </dl>
1815+
*
1816+
* @param other
1817+
* the Completable whose termination will cause {@code takeUntil} to emit the item from the source
1818+
* Single
1819+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
1820+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1821+
*/
1822+
public final Single<T> takeUntil(final Completable other) {
1823+
return lift(new Operator<T, T>() {
1824+
@Override
1825+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1826+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1827+
1828+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1829+
@Override
1830+
public void onNext(T t) {
1831+
serial.onNext(t);
1832+
}
1833+
@Override
1834+
public void onError(Throwable e) {
1835+
try {
1836+
serial.onError(e);
1837+
} finally {
1838+
serial.unsubscribe();
1839+
}
1840+
}
1841+
@Override
1842+
public void onCompleted() {
1843+
try {
1844+
serial.onCompleted();
1845+
} finally {
1846+
serial.unsubscribe();
1847+
}
1848+
}
1849+
};
1850+
1851+
final Completable.CompletableSubscriber so = new Completable.CompletableSubscriber() {
1852+
@Override
1853+
public void onCompleted() {
1854+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1855+
}
1856+
1857+
@Override
1858+
public void onError(Throwable e) {
1859+
main.onError(e);
1860+
}
1861+
1862+
@Override
1863+
public void onSubscribe(Subscription d) {
1864+
serial.add(d);
1865+
}
1866+
};
1867+
1868+
serial.add(main);
1869+
child.add(serial);
1870+
1871+
other.subscribe(so);
1872+
1873+
return main;
1874+
}
1875+
});
1876+
}
1877+
1878+
/**
1879+
* Returns a Single that emits the item emitted by the source Single until an Observable emits an item. Upon
1880+
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
1881+
* {@link SingleSubscriber#onSuccess(Object)}.
1882+
* <p>
1883+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1884+
* <dl>
1885+
* <dt><b>Scheduler:</b></dt>
1886+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1887+
* </dl>
1888+
*
1889+
* @param other
1890+
* the Observable whose first emitted item will cause {@code takeUntil} to emit the item from the source
1891+
* Single
1892+
* @param <E>
1893+
* the type of items emitted by {@code other}
1894+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits
1895+
* its first item
1896+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1897+
*/
1898+
public final <E> Single<T> takeUntil(final Observable<? extends E> other) {
1899+
return lift(new Operator<T, T>() {
1900+
@Override
1901+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1902+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1903+
1904+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1905+
@Override
1906+
public void onNext(T t) {
1907+
serial.onNext(t);
1908+
}
1909+
@Override
1910+
public void onError(Throwable e) {
1911+
try {
1912+
serial.onError(e);
1913+
} finally {
1914+
serial.unsubscribe();
1915+
}
1916+
}
1917+
@Override
1918+
public void onCompleted() {
1919+
try {
1920+
serial.onCompleted();
1921+
} finally {
1922+
serial.unsubscribe();
1923+
}
1924+
}
1925+
};
1926+
1927+
final Subscriber<E> so = new Subscriber<E>() {
1928+
1929+
@Override
1930+
public void onCompleted() {
1931+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1932+
}
1933+
1934+
@Override
1935+
public void onError(Throwable e) {
1936+
main.onError(e);
1937+
}
1938+
1939+
@Override
1940+
public void onNext(E e) {
1941+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
1942+
}
1943+
};
1944+
1945+
serial.add(main);
1946+
serial.add(so);
1947+
1948+
child.add(serial);
1949+
1950+
other.unsafeSubscribe(so);
1951+
1952+
return main;
1953+
}
1954+
});
1955+
}
1956+
1957+
/**
1958+
* Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
1959+
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
1960+
* {@link SingleSubscriber#onSuccess(Object)}.
1961+
* <p>
1962+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
1963+
* <dl>
1964+
* <dt><b>Scheduler:</b></dt>
1965+
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
1966+
* </dl>
1967+
*
1968+
* @param other
1969+
* the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
1970+
* @param <E>
1971+
* the type of item emitted by {@code other}
1972+
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
1973+
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
1974+
*/
1975+
public final <E> Single<T> takeUntil(final Single<? extends E> other) {
1976+
return lift(new Operator<T, T>() {
1977+
@Override
1978+
public Subscriber<? super T> call(Subscriber<? super T> child) {
1979+
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);
1980+
1981+
final Subscriber<T> main = new Subscriber<T>(serial, false) {
1982+
@Override
1983+
public void onNext(T t) {
1984+
serial.onNext(t);
1985+
}
1986+
@Override
1987+
public void onError(Throwable e) {
1988+
try {
1989+
serial.onError(e);
1990+
} finally {
1991+
serial.unsubscribe();
1992+
}
1993+
}
1994+
@Override
1995+
public void onCompleted() {
1996+
try {
1997+
serial.onCompleted();
1998+
} finally {
1999+
serial.unsubscribe();
2000+
}
2001+
}
2002+
};
2003+
2004+
final SingleSubscriber<E> so = new SingleSubscriber<E>() {
2005+
@Override
2006+
public void onSuccess(E value) {
2007+
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
2008+
}
2009+
2010+
@Override
2011+
public void onError(Throwable e) {
2012+
main.onError(e);
2013+
}
2014+
};
2015+
2016+
serial.add(main);
2017+
serial.add(so);
2018+
2019+
child.add(serial);
2020+
2021+
other.subscribe(so);
2022+
2023+
return main;
2024+
}
2025+
});
2026+
}
18032027

18042028
/**
18052029
* Converts this Single into an {@link Observable}.

0 commit comments

Comments
 (0)