Skip to content

Commit 60c8ff9

Browse files
authored
1.x: add multi-other withLatestFrom operators (#3966)
1 parent aa183b0 commit 60c8ff9

File tree

3 files changed

+903
-3
lines changed

3 files changed

+903
-3
lines changed

src/main/java/rx/Observable.java

Lines changed: 326 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10129,7 +10129,332 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
1012910129
public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
1013010130
return lift(new OperatorWithLatestFrom<T, U, R>(other, resultSelector));
1013110131
}
10132-
10132+
10133+
/**
10134+
* Combines the value emission from this Observable with the latest emissions from the
10135+
* other Observables via a function to produce the output item.
10136+
*
10137+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10138+
* least one value. The resulting emission only happens when this Observable emits (and
10139+
* not when any of the other sources emit, unlike combineLatest).
10140+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10141+
*
10142+
* <dl>
10143+
* <dt><b>Backpressure Support:</b></dt>
10144+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10145+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10146+
* <dt><b>Scheduler:</b></dt>
10147+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10148+
* </dl>
10149+
*
10150+
* @param <T1> the first other source's value type
10151+
* @param <T2> the second other source's value type
10152+
* @param <R> the result value type
10153+
* @param others the array of other sources
10154+
* @param combiner the function called with an array of values from each participating observable
10155+
* @return the new Observable instance
10156+
* @Experimental The behavior of this can change at any time.
10157+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10158+
*/
10159+
@Experimental
10160+
public final <T1, T2, R> Observable<R> withLatestFrom(Observable<T1> o1, Observable<T2> o2, Func3<? super T, ? super T1, ? super T2, R> combiner) {
10161+
return create(new OperatorWithLatestFromMany<T, R>(this, new Observable<?>[] { o1, o2 }, null, Functions.fromFunc(combiner)));
10162+
}
10163+
10164+
/**
10165+
* Combines the value emission from this Observable with the latest emissions from the
10166+
* other Observables via a function to produce the output item.
10167+
*
10168+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10169+
* least one value. The resulting emission only happens when this Observable emits (and
10170+
* not when any of the other sources emit, unlike combineLatest).
10171+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10172+
*
10173+
* <dl>
10174+
* <dt><b>Backpressure Support:</b></dt>
10175+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10176+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10177+
* <dt><b>Scheduler:</b></dt>
10178+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10179+
* </dl>
10180+
*
10181+
* @param <T1> the first other source's value type
10182+
* @param <T2> the second other source's value type
10183+
* @param <T3> the third other source's value type
10184+
* @param <R> the result value type
10185+
* @param others the array of other sources
10186+
* @param combiner the function called with an array of values from each participating observable
10187+
* @return the new Observable instance
10188+
* @Experimental The behavior of this can change at any time.
10189+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10190+
*/
10191+
@Experimental
10192+
public final <T1, T2, T3, R> Observable<R> withLatestFrom(
10193+
Observable<T1> o1, Observable<T2> o2,
10194+
Observable<T3> o3,
10195+
Func4<? super T, ? super T1, ? super T2, ? super T3, R> combiner) {
10196+
return create(new OperatorWithLatestFromMany<T, R>(this,
10197+
new Observable<?>[] { o1, o2, o3 }, null, Functions.fromFunc(combiner)));
10198+
}
10199+
10200+
/**
10201+
* Combines the value emission from this Observable with the latest emissions from the
10202+
* other Observables via a function to produce the output item.
10203+
*
10204+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10205+
* least one value. The resulting emission only happens when this Observable emits (and
10206+
* not when any of the other sources emit, unlike combineLatest).
10207+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10208+
*
10209+
* <dl>
10210+
* <dt><b>Backpressure Support:</b></dt>
10211+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10212+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10213+
* <dt><b>Scheduler:</b></dt>
10214+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10215+
* </dl>
10216+
*
10217+
* @param <T1> the first other source's value type
10218+
* @param <T2> the second other source's value type
10219+
* @param <T3> the third other source's value type
10220+
* @param <T4> the fourth other source's value type
10221+
* @param <R> the result value type
10222+
* @param others the array of other sources
10223+
* @param combiner the function called with an array of values from each participating observable
10224+
* @return the new Observable instance
10225+
* @Experimental The behavior of this can change at any time.
10226+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10227+
*/
10228+
@Experimental
10229+
public final <T1, T2, T3, T4, R> Observable<R> withLatestFrom(
10230+
Observable<T1> o1, Observable<T2> o2,
10231+
Observable<T3> o3, Observable<T4> o4,
10232+
Func5<? super T, ? super T1, ? super T2, ? super T3, ? super T4, R> combiner) {
10233+
return create(new OperatorWithLatestFromMany<T, R>(this,
10234+
new Observable<?>[] { o1, o2, o3, o4 }, null, Functions.fromFunc(combiner)));
10235+
}
10236+
/**
10237+
* Combines the value emission from this Observable with the latest emissions from the
10238+
* other Observables via a function to produce the output item.
10239+
*
10240+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10241+
* least one value. The resulting emission only happens when this Observable emits (and
10242+
* not when any of the other sources emit, unlike combineLatest).
10243+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10244+
*
10245+
* <dl>
10246+
* <dt><b>Backpressure Support:</b></dt>
10247+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10248+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10249+
* <dt><b>Scheduler:</b></dt>
10250+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10251+
* </dl>
10252+
*
10253+
* @param <T1> the first other source's value type
10254+
* @param <T2> the second other source's value type
10255+
* @param <T3> the third other source's value type
10256+
* @param <T4> the fourth other source's value type
10257+
* @param <T5> the fifth other source's value type
10258+
* @param <R> the result value type
10259+
* @param others the array of other sources
10260+
* @param combiner the function called with an array of values from each participating observable
10261+
* @return the new Observable instance
10262+
* @Experimental The behavior of this can change at any time.
10263+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10264+
*/
10265+
@Experimental
10266+
public final <T1, T2, T3, T4, T5, R> Observable<R> withLatestFrom(
10267+
Observable<T1> o1, Observable<T2> o2,
10268+
Observable<T1> o3, Observable<T2> o4,
10269+
Observable<T1> o5,
10270+
Func6<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, R> combiner) {
10271+
return create(new OperatorWithLatestFromMany<T, R>(this,
10272+
new Observable<?>[] { o1, o2, o3, o4, o5 }, null, Functions.fromFunc(combiner)));
10273+
}
10274+
10275+
/**
10276+
* Combines the value emission from this Observable with the latest emissions from the
10277+
* other Observables via a function to produce the output item.
10278+
*
10279+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10280+
* least one value. The resulting emission only happens when this Observable emits (and
10281+
* not when any of the other sources emit, unlike combineLatest).
10282+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10283+
*
10284+
* <dl>
10285+
* <dt><b>Backpressure Support:</b></dt>
10286+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10287+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10288+
* <dt><b>Scheduler:</b></dt>
10289+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10290+
* </dl>
10291+
*
10292+
* @param <T1> the first other source's value type
10293+
* @param <T2> the second other source's value type
10294+
* @param <T3> the third other source's value type
10295+
* @param <T4> the fourth other source's value type
10296+
* @param <T5> the fifth other source's value type
10297+
* @param <T6> the sixth other source's value type
10298+
* @param <R> the result value type
10299+
* @param others the array of other sources
10300+
* @param combiner the function called with an array of values from each participating observable
10301+
* @return the new Observable instance
10302+
* @Experimental The behavior of this can change at any time.
10303+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10304+
*/
10305+
@Experimental
10306+
public final <T1, T2, T3, T4, T5, T6, R> Observable<R> withLatestFrom(
10307+
Observable<T1> o1, Observable<T2> o2,
10308+
Observable<T1> o3, Observable<T2> o4,
10309+
Observable<T1> o5, Observable<T2> o6,
10310+
Func7<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, R> combiner) {
10311+
return create(new OperatorWithLatestFromMany<T, R>(this,
10312+
new Observable<?>[] { o1, o2, o3, o4, o5, o6 }, null, Functions.fromFunc(combiner)));
10313+
}
10314+
10315+
/**
10316+
* Combines the value emission from this Observable with the latest emissions from the
10317+
* other Observables via a function to produce the output item.
10318+
*
10319+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10320+
* least one value. The resulting emission only happens when this Observable emits (and
10321+
* not when any of the other sources emit, unlike combineLatest).
10322+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10323+
*
10324+
* <dl>
10325+
* <dt><b>Backpressure Support:</b></dt>
10326+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10327+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10328+
* <dt><b>Scheduler:</b></dt>
10329+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10330+
* </dl>
10331+
*
10332+
* @param <T1> the first other source's value type
10333+
* @param <T2> the second other source's value type
10334+
* @param <T3> the third other source's value type
10335+
* @param <T4> the fourth other source's value type
10336+
* @param <T5> the fifth other source's value type
10337+
* @param <T6> the sixth other source's value type
10338+
* @param <T7> the seventh other source's value type
10339+
* @param <R> the result value type
10340+
* @param others the array of other sources
10341+
* @param combiner the function called with an array of values from each participating observable
10342+
* @return the new Observable instance
10343+
* @Experimental The behavior of this can change at any time.
10344+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10345+
*/
10346+
@Experimental
10347+
public final <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> withLatestFrom(
10348+
Observable<T1> o1, Observable<T2> o2,
10349+
Observable<T1> o3, Observable<T2> o4,
10350+
Observable<T1> o5, Observable<T2> o6,
10351+
Observable<T1> o7,
10352+
Func8<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, R> combiner) {
10353+
return create(new OperatorWithLatestFromMany<T, R>(this,
10354+
new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7 }, null, Functions.fromFunc(combiner)));
10355+
}
10356+
10357+
/**
10358+
* Combines the value emission from this Observable with the latest emissions from the
10359+
* other Observables via a function to produce the output item.
10360+
*
10361+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10362+
* least one value. The resulting emission only happens when this Observable emits (and
10363+
* not when any of the other sources emit, unlike combineLatest).
10364+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10365+
*
10366+
* <dl>
10367+
* <dt><b>Backpressure Support:</b></dt>
10368+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10369+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10370+
* <dt><b>Scheduler:</b></dt>
10371+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10372+
* </dl>
10373+
*
10374+
* @param <T1> the first other source's value type
10375+
* @param <T2> the second other source's value type
10376+
* @param <T3> the third other source's value type
10377+
* @param <T4> the fourth other source's value type
10378+
* @param <T5> the fifth other source's value type
10379+
* @param <T6> the sixth other source's value type
10380+
* @param <T7> the seventh other source's value type
10381+
* @param <T8> the eigth other source's value type
10382+
* @param <R> the result value type
10383+
* @param others the array of other sources
10384+
* @param combiner the function called with an array of values from each participating observable
10385+
* @return the new Observable instance
10386+
* @Experimental The behavior of this can change at any time.
10387+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10388+
*/
10389+
@Experimental
10390+
public final <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> withLatestFrom(
10391+
Observable<T1> o1, Observable<T2> o2,
10392+
Observable<T1> o3, Observable<T2> o4,
10393+
Observable<T1> o5, Observable<T2> o6,
10394+
Observable<T1> o7, Observable<T2> o8,
10395+
Func9<? super T, ? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, R> combiner) {
10396+
return create(new OperatorWithLatestFromMany<T, R>(this,
10397+
new Observable<?>[] { o1, o2, o3, o4, o5, o6, o7, o8 }, null, Functions.fromFunc(combiner)));
10398+
}
10399+
10400+
/**
10401+
* Combines the value emission from this Observable with the latest emissions from the
10402+
* other Observables via a function to produce the output item.
10403+
*
10404+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10405+
* least one value. The resulting emission only happens when this Observable emits (and
10406+
* not when any of the other sources emit, unlike combineLatest).
10407+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10408+
*
10409+
* <dl>
10410+
* <dt><b>Backpressure Support:</b></dt>
10411+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10412+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10413+
* <dt><b>Scheduler:</b></dt>
10414+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10415+
* </dl>
10416+
*
10417+
* @param <R> the result value type
10418+
* @param others the array of other sources
10419+
* @param combiner the function called with an array of values from each participating observable
10420+
* @return the new Observable instance
10421+
* @Experimental The behavior of this can change at any time.
10422+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10423+
*/
10424+
@Experimental
10425+
public final <R> Observable<R> withLatestFrom(Observable<?>[] others, FuncN<R> combiner) {
10426+
return create(new OperatorWithLatestFromMany<T, R>(this, others, null, combiner));
10427+
}
10428+
10429+
/**
10430+
* Combines the value emission from this Observable with the latest emissions from the
10431+
* other Observables via a function to produce the output item.
10432+
*
10433+
* <p>Note that this operator doesn't emit anything until all other sources have produced at
10434+
* least one value. The resulting emission only happens when this Observable emits (and
10435+
* not when any of the other sources emit, unlike combineLatest).
10436+
* If a source doesn't produce any value and just completes, the sequence is completed immediately.
10437+
*
10438+
* <dl>
10439+
* <dt><b>Backpressure Support:</b></dt>
10440+
* <dd>This operator is a pass-through for backpressure behavior between this {@code Observable}
10441+
* and the downstream Subscriber. The other {@code Observable}s are consumed in an unbounded manner.</dd>
10442+
* <dt><b>Scheduler:</b></dt>
10443+
* <dd>This operator does not operate by default on a particular {@link Scheduler}.</dd>
10444+
* </dl>
10445+
*
10446+
* @param <R> the result value type
10447+
* @param others the iterable of other sources
10448+
* @param combiner the function called with an array of values from each participating observable
10449+
* @return the new Observable instance
10450+
* @Experimental The behavior of this can change at any time.
10451+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
10452+
*/
10453+
@Experimental
10454+
public final <R> Observable<R> withLatestFrom(Iterable<Observable<?>> others, FuncN<R> combiner) {
10455+
return create(new OperatorWithLatestFromMany<T, R>(this, null, others, combiner));
10456+
}
10457+
1013310458
/**
1013410459
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
1013510460
* Observable emits connected, non-overlapping windows. It emits the current window and opens a new one

0 commit comments

Comments
 (0)