diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index b9ae791382..0cef17b1d3 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -19,13 +19,17 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import rx.concurrency.Schedulers;
+import rx.joins.Pattern2;
+import rx.joins.Plan0;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
@@ -51,6 +55,8 @@
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
+import rx.operators.OperationJoin;
+import rx.operators.OperationJoinPatterns;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
@@ -64,6 +70,7 @@
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallel;
+import rx.operators.OperationParallelMerge;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
@@ -82,6 +89,8 @@
import rx.operators.OperationTimeInterval;
import rx.operators.OperationTimeout;
import rx.operators.OperationTimestamp;
+import rx.operators.OperationToMap;
+import rx.operators.OperationToMultimap;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
@@ -129,7 +138,7 @@
* The documentation for this interface makes use of marble diagrams. The
* following legend explains these diagrams:
*
- *
+ *
*
* For more information see the
* RxJava Wiki
@@ -302,6 +311,33 @@ private Subscription protectivelyWrapAndSubscribe(Observer super T> o) {
return subscription.wrap(subscribe(new SafeObserver(subscription, o)));
}
+ /**
+ * Subscribe and ignore all events.
+ *
+ * @return
+ */
+ public Subscription subscribe() {
+ return protectivelyWrapAndSubscribe(new Observer() {
+
+ @Override
+ public void onCompleted() {
+ // do nothing
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ handleError(e);
+ throw new OnErrorNotImplementedException(e);
+ }
+
+ @Override
+ public void onNext(T args) {
+ // do nothing
+ }
+
+ });
+ }
+
/**
* An {@link Observer} must call an Observable's {@code subscribe} method
* in order to receive items and notifications from the Observable.
@@ -477,6 +513,7 @@ public Subscription subscribe(final Action1 super T> onNext, final Action1RxJava Wiki: Observable.publish() and Observable.multicast()
*/
public ConnectableObservable multicast(Subject super T, ? extends R> subject) {
return OperationMulticast.multicast(this, subject);
@@ -546,7 +583,7 @@ public Subscription onSubscribe(Observer super T> observer) {
* Creates an Observable that will execute the given function when an
* {@link Observer} subscribes to it.
*
- *
+ *
*
* Write the function you pass to create
so that it behaves as
* an Observable: It should invoke the Observer's
@@ -567,6 +604,7 @@ public Subscription onSubscribe(Observer super T> observer) {
* allow the Observer to cancel the subscription
* @return an Observable that, when an {@link Observer} subscribes to it,
* will execute the given function
+ * @see RxJava Wiki: create()
*/
public static Observable create(OnSubscribeFunc func) {
return new Observable(func);
@@ -582,6 +620,7 @@ public static Observable create(OnSubscribeFunc func) {
* @return an Observable that returns no data to the {@link Observer} and
* immediately invokes the {@link Observer}'s
* {@link Observer#onCompleted() onCompleted} method
+ * @see RxJava Wiki: empty()
* @see MSDN: Observable.Empty Method
*/
public static Observable empty() {
@@ -602,6 +641,7 @@ public static Observable empty() {
* immediately invokes the {@link Observer}'s
* {@link Observer#onCompleted() onCompleted} method with the
* specified scheduler
+ * @see RxJava Wiki: empty()
* @see MSDN: Observable.Empty Method (IScheduler)
*/
public static Observable empty(Scheduler scheduler) {
@@ -620,6 +660,7 @@ public static Observable empty(Scheduler scheduler) {
* @return an Observable that invokes the {@link Observer}'s
* {@link Observer#onError onError} method when the Observer
* subscribes to it
+ * @see RxJava Wiki: error()
* @see MSDN: Observable.Throw Method
*/
public static Observable error(Throwable exception) {
@@ -639,6 +680,7 @@ public static Observable error(Throwable exception) {
* @return an Observable that invokes the {@link Observer}'s
* {@link Observer#onError onError} method with the specified
* scheduler
+ * @see RxJava Wiki: error()
* @see MSDN: Observable.Throw Method
*/
public static Observable error(Throwable exception, Scheduler scheduler) {
@@ -648,7 +690,7 @@ public static Observable error(Throwable exception, Scheduler scheduler)
/**
* Converts an {@link Iterable} sequence into an Observable.
*
- *
+ *
*
* Note: the entire iterable sequence is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
@@ -660,15 +702,34 @@ public static Observable error(Throwable exception, Scheduler scheduler)
* type of items to be emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable}
* sequence
+ * @see RxJava Wiki: from()
*/
public static Observable from(Iterable extends T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
}
+ /**
+ * Converts an {@link Iterable} sequence into an Observable with the specified scheduler.
+ *
+ *
+ *
+ * @param iterable the source {@link Iterable} sequence
+ * @param scheduler the scheduler to emit the items of the iterable
+ * @param the type of items in the {@link Iterable} sequence and the
+ * type of items to be emitted by the resulting Observable
+ * @return an Observable that emits each item in the source {@link Iterable}
+ * sequence with the specified scheduler
+ * @see RxJava Wiki: from()
+ * @see MSDN: Observable.ToObservable
+ */
+ public static Observable from(Iterable extends T> iterable, Scheduler scheduler) {
+ return from(iterable).observeOn(scheduler);
+ }
+
/**
* Converts an Array into an Observable.
*
- *
+ *
*
* Note: the entire array is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
@@ -679,6 +740,7 @@ public static Observable from(Iterable extends T> iterable) {
* @param the type of items in the Array and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
+ * @see RxJava Wiki: from()
*/
public static Observable from(T[] items) {
return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items)));
@@ -687,7 +749,7 @@ public static Observable from(T[] items) {
/**
* Converts an item into an Observable that emits that item.
*
- *
+ *
*
* Note: the item is immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -698,6 +760,7 @@ public static Observable from(T[] items) {
* @param the type of the item, and the type of the item to be
* emitted by the resulting Observable
* @return an Observable that emits the item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -708,7 +771,7 @@ public static Observable from(T t1) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -720,6 +783,7 @@ public static Observable from(T t1) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -730,7 +794,7 @@ public static Observable from(T t1, T t2) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -743,6 +807,7 @@ public static Observable from(T t1, T t2) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -753,7 +818,7 @@ public static Observable from(T t1, T t2, T t3) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -767,6 +832,7 @@ public static Observable from(T t1, T t2, T t3) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -777,7 +843,7 @@ public static Observable from(T t1, T t2, T t3, T t4) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -792,6 +858,7 @@ public static Observable from(T t1, T t2, T t3, T t4) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -802,7 +869,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -818,6 +885,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -828,7 +896,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -845,6 +913,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -855,7 +924,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -873,6 +942,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -883,7 +953,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -902,6 +972,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -912,7 +983,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
/**
* Converts a series of items into an Observable.
*
- *
+ *
*
* Note: the items will be immediately emitted each time an {@link Observer}
* subscribes. Since this occurs before the {@link Subscription} is
@@ -932,6 +1003,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* @param the type of items, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item
+ * @see RxJava Wiki: from()
*/
@SuppressWarnings("unchecked")
// suppress unchecked because we are using varargs inside the method
@@ -943,7 +1015,7 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* Generates an Observable that emits a sequence of integers within a
* specified range.
*
- *
+ *
*
* Note: the entire range is immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the
@@ -953,18 +1025,35 @@ public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
* @param start the value of the first integer in the sequence
* @param count the number of sequential integers to generate
* @return an Observable that emits a range of sequential integers
+ * @see RxJava Wiki: range()
* @see Observable.Range Method (Int32, Int32)
*/
public static Observable range(int start, int count) {
return from(Range.createWithCount(start, count));
}
+ /**
+ * Generates an Observable that emits a sequence of integers within a
+ * specified range with the specified scheduler.
+ *
+ *
+ * @param start the value of the first integer in the sequence
+ * @param count the number of sequential integers to generate
+ * @param scheduler the scheduler to run the generator loop on
+ * @return an Observable that emits a range of sequential integers
+ * @see RxJava Wiki: range()
+ * @see Observable.Range Method (Int32, Int32, IScheduler)
+ */
+ public static Observable range(int start, int count, Scheduler scheduler) {
+ return range(start, count).observeOn(scheduler);
+ }
+
/**
* Returns an Observable that calls an Observable factory to create its
* Observable for each new Observer that subscribes. That is, for each
* subscriber, the actuall Observable is determined by the factory function.
*
- *
+ *
*
* The defer operator allows you to defer or delay emitting items from an
* Observable until such time as an Observer subscribes to the Observable.
@@ -977,6 +1066,7 @@ public static Observable range(int start, int count) {
* @param the type of the items emitted by the Observable
* @return an Observable whose {@link Observer}s trigger an invocation of
* the given Observable factory function
+ * @see RxJava Wiki: defer()
*/
public static Observable defer(Func0 extends Observable extends T>> observableFactory) {
return create(OperationDefer.defer(observableFactory));
@@ -1000,6 +1090,7 @@ public static Observable defer(Func0 extends Observable extends T>> o
* {@link Observer#onNext onNext} method
* @param the type of that item
* @return an Observable that emits a single item and then completes
+ * @see RxJava Wiki: just()
*/
public static Observable just(T value) {
List list = new ArrayList();
@@ -1020,6 +1111,7 @@ public static Observable just(T value) {
* @param scheduler the scheduler to send the single element on
* @return an Observable that emits a single item and then completes on a
* specified scheduler
+ * @see RxJava Wiki: just()
*/
public static Observable just(T value, Scheduler scheduler) {
return just(value).observeOn(scheduler);
@@ -1038,6 +1130,7 @@ public static Observable just(T value, Scheduler scheduler) {
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables emitted by the
* {@code source} Observable
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable extends Observable extends T>> source) {
@@ -1057,6 +1150,7 @@ public static Observable merge(Observable extends Observable extends
* @param t2 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1079,6 +1173,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t3 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1102,6 +1197,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t4 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1126,6 +1222,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t5 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1151,6 +1248,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t6 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1177,6 +1275,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t7 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1204,6 +1303,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t8 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1232,6 +1332,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @param t9 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: merge()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1250,6 +1351,7 @@ public static Observable merge(Observable extends T> t1, Observable e
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
public static Observable concat(Observable extends Observable extends T>> observables) {
@@ -1267,6 +1369,7 @@ public static Observable concat(Observable extends Observable extends
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1288,6 +1391,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1309,6 +1413,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1331,6 +1436,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1354,6 +1460,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1378,6 +1485,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1403,6 +1511,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1429,6 +1538,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
+ * @see RxJava Wiki: concat()
* @see MSDN: Observable.Concat Method
*/
@SuppressWarnings("unchecked")
@@ -1458,6 +1568,7 @@ public static Observable concat(Observable extends T> t1, Observable
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables emitted by the
* {@code source} Observable
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(Observable extends Observable extends T>> source) {
@@ -1485,6 +1596,7 @@ public static Observable mergeDelayError(Observable extends Observable<
* @param t2 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1515,6 +1627,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t3 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1547,6 +1660,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t4 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1579,6 +1693,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t5 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1612,6 +1727,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t6 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1646,6 +1762,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t7 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1681,6 +1798,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t8 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1717,6 +1835,7 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* @param t9 an Observable to be merged
* @return an Observable that emits items that are the result of flattening
* the items emitted by the {@code source} Observables
+ * @see RxJava Wiki: mergeDelayError()
* @see MSDN: Observable.Merge Method
*/
@SuppressWarnings("unchecked")
@@ -1729,13 +1848,14 @@ public static Observable mergeDelayError(Observable extends T> t1, Obse
* Returns an Observable that never sends any items or notifications to an
* {@link Observer}.
*
- *
+ *
*
* This Observable is useful primarily for testing purposes.
*
* @param the type of items (not) emitted by the Observable
* @return an Observable that never sends any items or notifications to an
* {@link Observer}
+ * @see RxJava Wiki: never()
*/
public static Observable never() {
return new NeverObservable();
@@ -1746,11 +1866,12 @@ public static Observable never() {
* that emits the items emitted by the most recently published of those
* Observables.
*
- *
+ *
*
* @param sequenceOfSequences the source Observable that emits Observables
* @return an Observable that emits only the items emitted by the most
* recently published Observable
+ * @see RxJava Wiki: switchOnNext()
* @deprecated use {@link #switchOnNext}
*/
@Deprecated
@@ -1763,11 +1884,12 @@ public static Observable switchDo(Observable extends Observable exten
* that emits the items emitted by the most recently published of those
* Observables.
*
- *
+ *
*
* @param sequenceOfSequences the source Observable that emits Observables
* @return an Observable that emits only the items emitted by the most
* recently published Observable
+ * @see RxJava Wiki: switchOnNext()
*/
public static Observable switchOnNext(Observable extends Observable extends T>> sequenceOfSequences) {
return create(OperationSwitch.switchDo(sequenceOfSequences));
@@ -1777,7 +1899,7 @@ public static Observable switchOnNext(Observable extends Observable e
* Accepts an Observable and wraps it in another Observable that ensures
* that the resulting Observable is chronologically well-behaved.
*
- *
+ *
*
* A well-behaved Observable does not interleave its invocations of the
* {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted},
@@ -1791,6 +1913,7 @@ public static Observable switchOnNext(Observable extends Observable e
* @return an Observable that is a chronologically well-behaved version of
* the source Observable, and that synchronously notifies its
* {@link Observer}s
+ * @see RxJava Wiki: synchronize()
*/
public Observable synchronize() {
return create(OperationSynchronize.synchronize(this));
@@ -1802,7 +1925,7 @@ public Observable synchronize() {
* accomplished by acquiring a mutual-exclusion lock for the object
* provided as the lock parameter.
*
- *
+ *
*
* A well-behaved Observable does not interleave its invocations of the
* {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted},
@@ -1817,6 +1940,7 @@ public Observable synchronize() {
* @return an Observable that is a chronologically well-behaved version of
* the source Observable, and that synchronously notifies its
* {@link Observer}s
+ * @see RxJava Wiki: synchronize()
*/
public Observable synchronize(Object lock) {
return create(OperationSynchronize.synchronize(this, lock));
@@ -1833,12 +1957,13 @@ public static Observable synchronize(Observable source) {
/**
* Emits an item each time interval (containing a sequential number).
*
- *
+ *
*
* @param interval interval size in time units (see below)
* @param unit time units to use for the interval size
* @return an Observable that emits an item each time interval
- * @see MSDN: Observable.Interval
+ * @see RxJava Wiki: interval()
+ * @see MSDN: Observable.Interval
*/
public static Observable interval(long interval, TimeUnit unit) {
return create(OperationInterval.interval(interval, unit));
@@ -1847,13 +1972,14 @@ public static Observable interval(long interval, TimeUnit unit) {
/**
* Emits an item each time interval (containing a sequential number).
*
- *
+ *
*
* @param interval interval size in time units (see below)
* @param unit time units to use for the interval size
* @param scheduler the scheduler to use for scheduling the items
* @return an Observable that emits an item each time interval
- * @see MSDN: Observable.Interval
+ * @see RxJava Wiki: interval()
+ * @see MSDN: Observable.Interval
*/
public static Observable interval(long interval, TimeUnit unit, Scheduler scheduler) {
return create(OperationInterval.interval(interval, unit, scheduler));
@@ -1866,7 +1992,7 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
*
- *
+ *
*
* Information on debounce vs throttle:
*
@@ -1881,6 +2007,7 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler
* @param unit the {@link TimeUnit} for the timeout
* @return an {@link Observable} that filters out items that are too
* quickly followed by newer items
+ * @see RxJava Wiki: debounce()
* @see #throttleWithTimeout(long, TimeUnit)
*/
public Observable debounce(long timeout, TimeUnit unit) {
@@ -1894,7 +2021,7 @@ public Observable debounce(long timeout, TimeUnit unit) {
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
*
- *
+ *
*
* Information on debounce vs throttle:
*
@@ -1911,6 +2038,7 @@ public Observable debounce(long timeout, TimeUnit unit) {
* timers that handle the timeout for each event
* @return an {@link Observable} that filters out items that are too
* quickly followed by newer items
+ * @see RxJava Wiki: debounce()
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
@@ -1924,7 +2052,7 @@ public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler)
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
*
- *
+ *
*
* Information on debounce vs throttle:
*
@@ -1939,6 +2067,7 @@ public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler)
* @param unit the {@link TimeUnit} for the timeout
* @return an {@link Observable} that filters out items that are too
* quickly followed by newer items
+ * @see RxJava Wiki: throttleWithTimeout()
* @see #debounce(long, TimeUnit)
*/
public Observable throttleWithTimeout(long timeout, TimeUnit unit) {
@@ -1952,7 +2081,7 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) {
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
*
- *
+ *
*
* Information on debounce vs throttle:
*
@@ -1969,6 +2098,7 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) {
* timers that handle the timeout for each event
* @return an {@link Observable} that filters out items that are too
* quickly followed by newer items
+ * @see RxJava Wiki: throttleWithTimeout()
* @see #debounce(long, TimeUnit, Scheduler)
*/
public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
@@ -1982,12 +2112,13 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
* This differs from {@link #throttleLast} in that this only tracks passage
* of time whereas {@link #throttleLast} ticks at scheduled intervals.
*
- *
+ *
*
* @param windowDuration time to wait before sending another item after
* emitting the last item
* @param unit the unit of time for the specified timeout
* @return an Observable that performs the throttle operation
+ * @see RxJava Wiki: throttleFirst()
*/
public Observable throttleFirst(long windowDuration, TimeUnit unit) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
@@ -2000,7 +2131,7 @@ public Observable throttleFirst(long windowDuration, TimeUnit unit) {
* This differs from {@link #throttleLast} in that this only tracks passage
* of time whereas {@link #throttleLast} ticks at scheduled intervals.
*
- *
+ *
*
* @param skipDuration time to wait before sending another item after
* emitting the last item
@@ -2008,6 +2139,7 @@ public Observable throttleFirst(long windowDuration, TimeUnit unit) {
* @param scheduler the {@link Scheduler} to use internally to manage the
* timers that handle timeout for each event
* @return an Observable that performs the throttle operation
+ * @see RxJava Wiki: throttleFirst()
*/
public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
@@ -2021,12 +2153,13 @@ public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler s
* scheduled interval whereas {@link #throttleFirst} does not tick, it just
* tracks passage of time.
*
- *
+ *
*
* @param intervalDuration duration of windows within which the last item
* will be emitted
* @param unit the unit of time for the specified interval
* @return an Observable that performs the throttle operation
+ * @see RxJava Wiki: throttleLast()
* @see #sample(long, TimeUnit)
*/
public Observable throttleLast(long intervalDuration, TimeUnit unit) {
@@ -2041,12 +2174,13 @@ public Observable throttleLast(long intervalDuration, TimeUnit unit) {
* scheduled interval whereas {@link #throttleFirst} does not tick, it just
* tracks passage of time.
*
- *
+ *
*
* @param intervalDuration duration of windows within which the last item
* will be emitted
* @param unit the unit of time for the specified interval
* @return an Observable that performs the throttle operation
+ * @see RxJava Wiki: throttleLast()
* @see #sample(long, TimeUnit, Scheduler)
*/
public Observable throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
@@ -2057,10 +2191,11 @@ public Observable throttleLast(long intervalDuration, TimeUnit unit, Schedule
* Wraps each item emitted by a source Observable in a {@link Timestamped}
* object.
*
- *