Skip to content

Commit 0a53a04

Browse files
author
Nitesh Kant
committed
Renamed operators to mergeEmptyWith and concatEmptyWith.
Based on discussions with @abersnaze and @benjchristensen these names better suit the intent of these operators. Also, removed the `swithMap()` + `mergeWith`/`concatWith` implementations with custom operators that are much lighter weight for this case (source never emits an item) vis-a-vis `concat` and `merge`.
1 parent e403b73 commit 0a53a04

File tree

5 files changed

+311
-33
lines changed

5 files changed

+311
-33
lines changed

src/main/java/rx/Observable.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9468,7 +9468,7 @@ public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2
94689468
}
94699469

94709470
/**
9471-
* Returns an Observable that upon completion of the source Observable subscribes to the passed {@code continueWith}
9471+
* Returns an Observable that upon completion of the source Observable subscribes to the passed {@code other}
94729472
* Observable and then emits all items emitted by that Observable. This function does not expect the source
94739473
* Observable to emit any item, in case, the source Observable, emits any item, an {@link IllegalStateException}
94749474
* is raised.
@@ -9482,23 +9482,23 @@ public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2
94829482
*
94839483
* <dl>
94849484
* <dt><b>Scheduler:</b></dt>
9485-
* <dd>{@code continueWith} does not operate by default on a particular {@link Scheduler}.</dd>
9485+
* <dd>{@code concatEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
94869486
* </dl>
94879487
*
9488-
* @return an Observable that upon completion of the source, starts emitting items from the {@code continueWith}
9488+
* <dt><b>Backpressure:</b></dt>
9489+
* <dd>{@code concatEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
9490+
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
9491+
* {@code Observable}.</dd>
9492+
*
9493+
* @return an Observable that upon completion of the source, starts emitting items from the {@code other}
94899494
* Observable.
94909495
* @throws IllegalStateException If the source emits any item.
94919496
*
9492-
* @see #mergeError(Observable)
9497+
* @see #mergeEmptyWith(Observable)
94939498
*/
94949499
@Experimental
9495-
public final <R> Observable<R> continueWith(Observable<R> continueWith) {
9496-
return switchMap(new Func1<T, Observable<R>>() {
9497-
@Override
9498-
public Observable<R> call(T t) {
9499-
return Observable.error(new IllegalStateException());
9500-
}
9501-
}).concatWith(continueWith);
9500+
public final <R> Observable<R> concatEmptyWith(Observable<R> other) {
9501+
return lift(new OperatorConcatEmptyWith<T, R>(other));
95029502
}
95039503

95049504
/**
@@ -9514,24 +9514,26 @@ public Observable<R> call(T t) {
95149514
*
95159515
* <dl>
95169516
* <dt><b>Scheduler:</b></dt>
9517-
* <dd>{@code continueWith} does not operate by default on a particular {@link Scheduler}.</dd>
9517+
* <dd>{@code mergeEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
9518+
* </dl>
9519+
*
9520+
* <dl>
9521+
* <dt><b>Backpressure:</b></dt>
9522+
* <dd>{@code mergeEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
9523+
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
9524+
* {@code Observable}.</dd>
95189525
* </dl>
95199526
*
95209527
* @return an Observable that only listens for errors from the source and starts emitting items from the
95219528
* {@code other} Observable on subscription.
95229529
* Observable.
95239530
* @throws IllegalStateException If the source emits any item.
95249531
*
9525-
* @see #continueWith(Observable)
9532+
* @see #concatEmptyWith(Observable)
95269533
*/
95279534
@Experimental
9528-
public final <R> Observable<R> mergeError(Observable<R> other) {
9529-
return switchMap(new Func1<T, Observable<R>>() {
9530-
@Override
9531-
public Observable<R> call(T t) {
9532-
return Observable.error(new IllegalStateException());
9533-
}
9534-
}).mergeWith(other);
9535+
public final <R> Observable<R> mergeEmptyWith(Observable<R> other) {
9536+
return lift(new OperatorMergeEmptyWith<T, R>(other));
95359537
}
95369538

95379539
/**
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Producer;
21+
import rx.Subscriber;
22+
import rx.functions.Action0;
23+
import rx.internal.producers.ProducerArbiter;
24+
import rx.observers.SerializedSubscriber;
25+
import rx.subscriptions.SerialSubscription;
26+
import rx.subscriptions.Subscriptions;
27+
28+
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
30+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
31+
32+
/**
33+
* Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied
34+
* alternate {@code Observable} after the source completes.
35+
*
36+
* @param <T> the source value type
37+
* @param <R> the result value type
38+
*/
39+
public final class OperatorConcatEmptyWith<T, R> implements Operator<R, T> {
40+
41+
private final Observable<? extends R> alternate;
42+
43+
public OperatorConcatEmptyWith(Observable<? extends R> alternate) {
44+
this.alternate = alternate;
45+
}
46+
47+
@Override
48+
public Subscriber<? super T> call(Subscriber<? super R> child) {
49+
final SerialSubscription ssub = new SerialSubscription();
50+
final ParentSubscriber parent = new ParentSubscriber(child, ssub, alternate);
51+
ssub.set(parent);
52+
child.add(ssub);
53+
child.setProducer(parent.emptyProducer);
54+
return parent;
55+
}
56+
57+
private final class ParentSubscriber extends Subscriber<T> {
58+
59+
private final Subscriber<? super R> child;
60+
private final SerialSubscription ssub;
61+
private final EmptyProducer emptyProducer;
62+
private final Observable<? extends R> alternate;
63+
64+
ParentSubscriber(Subscriber<? super R> child, final SerialSubscription ssub, Observable<? extends R> alternate) {
65+
this.child = child;
66+
this.ssub = ssub;
67+
this.emptyProducer = new EmptyProducer();
68+
this.alternate = alternate;
69+
}
70+
71+
@Override
72+
public void setProducer(final Producer producer) {
73+
/*Nothing from this producer will ever be requested as we never expect any items to be emitted from the
74+
parent. The only thing w.r.t backpressure that is required is to store the requested count from the
75+
child till the time the alternate observable is subscribed. So, this producer is just ignored and the
76+
configured EmptyProducer on the child stores any buffered requested items till subscription to alternate.*/
77+
}
78+
79+
@Override
80+
public void onCompleted() {
81+
if (!child.isUnsubscribed()) {
82+
AlternateSubscriber as = new AlternateSubscriber(child, emptyProducer);
83+
ssub.set(as);
84+
alternate.unsafeSubscribe(as);
85+
}
86+
}
87+
88+
@Override
89+
public void onError(Throwable e) {
90+
child.onError(e);
91+
}
92+
93+
@Override
94+
public void onNext(T t) {
95+
onError(new IllegalStateException("Concat empty with source emitted an item: " + t));
96+
}
97+
}
98+
99+
private final class AlternateSubscriber extends Subscriber<R> {
100+
101+
private final EmptyProducer emptyProducer;
102+
private final Subscriber<? super R> child;
103+
104+
AlternateSubscriber(Subscriber<? super R> child, EmptyProducer emptyProducer) {
105+
this.child = child;
106+
this.emptyProducer = emptyProducer;
107+
}
108+
109+
@Override
110+
public void setProducer(final Producer producer) {
111+
emptyProducer.setAltProducer(producer);
112+
}
113+
114+
@Override
115+
public void onCompleted() {
116+
child.onCompleted();
117+
}
118+
119+
@Override
120+
public void onError(Throwable e) {
121+
child.onError(e);
122+
}
123+
124+
@Override
125+
public void onNext(R r) {
126+
child.onNext(r);
127+
}
128+
}
129+
130+
/**
131+
* This is a producer implementation that only does the following:
132+
* <ul>
133+
<li>If the alternate producer has not yet arrived, store the total requested count from downstream.</li>
134+
<li>If the alternate producer has arrived, then relay the request demand to it.</li>
135+
</ul>
136+
*
137+
* Since, this is only applicable to this operator, it does not check for emissions from the source, as the source
138+
* is never expected to emit any item. Thus it is "lighter" weight than {@link ProducerArbiter}
139+
*/
140+
private static final class EmptyProducer implements Producer {
141+
142+
/*Total requested items till the time the alternate producer arrives.*/
143+
private long requested; /*Guarded by this*/
144+
/*Producer from the alternate Observable for this operator*/
145+
private Producer altProducer; /*Guarded by this*/
146+
147+
@Override
148+
public void request(long requested) {
149+
if (requested < 0) {
150+
throw new IllegalArgumentException("Requested items can not be negative.");
151+
}
152+
if (requested == 0) {
153+
return;
154+
}
155+
156+
boolean requestToAlternate = false;
157+
158+
synchronized (this) {
159+
if (null == altProducer) {
160+
/*Accumulate requested till the time an alternate producer arrives.*/
161+
this.requested += requested;
162+
} else {
163+
/*If the alternate producer exists, then relay.*/
164+
requestToAlternate = true;
165+
}
166+
}
167+
168+
if (requestToAlternate) {
169+
altProducer.request(requested);
170+
}
171+
}
172+
173+
private void setAltProducer(Producer altProducer) {
174+
if (null == altProducer) {
175+
throw new IllegalArgumentException("Producer can not be null.");
176+
}
177+
178+
long requestToAlternate = 0;
179+
180+
synchronized (this) {
181+
if (0 != requested) {
182+
/*Something was requested from the source Observable, relay that to the new producer*/
183+
requestToAlternate = requested;
184+
}
185+
this.altProducer = altProducer;
186+
}
187+
188+
if (0 != requestToAlternate) {
189+
this.altProducer.request(requestToAlternate);
190+
}
191+
}
192+
}
193+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Producer;
21+
import rx.Subscriber;
22+
23+
/**
24+
* Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied
25+
* alternate {@code Observable}. The errors from source are propagated as-is.
26+
*
27+
* @param <T> the source value type
28+
* @param <R> the result value type
29+
*/
30+
public final class OperatorMergeEmptyWith<T, R> implements Operator<R, T> {
31+
32+
private final Observable<? extends R> alternate;
33+
34+
public OperatorMergeEmptyWith(Observable<? extends R> alternate) {
35+
this.alternate = alternate;
36+
}
37+
38+
@Override
39+
public Subscriber<? super T> call(Subscriber<? super R> child) {
40+
final ParentSubscriber parent = new ParentSubscriber(child);
41+
child.add(parent);
42+
alternate.unsafeSubscribe(child);
43+
return parent;
44+
}
45+
46+
private final class ParentSubscriber extends Subscriber<T> {
47+
48+
private final Subscriber<? super R> child;
49+
50+
ParentSubscriber(Subscriber<? super R> child) {
51+
this.child = child;
52+
}
53+
54+
@Override
55+
public void setProducer(final Producer producer) {
56+
/*Nothing from this producer will ever be requested as we never expect any items to be emitted from the
57+
parent.*/
58+
}
59+
60+
@Override
61+
public void onCompleted() {
62+
// Nothing to do as the child completes with the alternate observable.
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
child.onError(e);
68+
}
69+
70+
@Override
71+
public void onNext(T t) {
72+
onError(new IllegalStateException("Merge empty with source emitted an item: " + t));
73+
}
74+
}
75+
}

src/test/java/rx/internal/operators/OperatorContinueWithTest.java renamed to src/test/java/rx/internal/operators/OperatorConcatEmptyWithTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import rx.schedulers.Schedulers;
2222
import rx.schedulers.TestScheduler;
2323

24-
public class OperatorContinueWithTest {
24+
public class OperatorConcatEmptyWithTest {
2525

2626
@Test(timeout = 60000)
2727
public void testWithVoid() {
2828
final String soleValue = "Hello";
2929
Observable<String> source = Observable.<Void>empty()
30-
.continueWith(Observable.just(soleValue));
30+
.concatEmptyWith(Observable.just(soleValue));
3131

3232
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
3333
source.subscribe(testSubscriber);
@@ -40,7 +40,7 @@ public void testWithVoid() {
4040
@Test(timeout = 60000)
4141
public void testErrorOnSourceEmitItem() {
4242
Observable<String> source = Observable.just(1)
43-
.continueWith(Observable.just("Hello"));
43+
.concatEmptyWith(Observable.just("Hello"));
4444

4545
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
4646
source.subscribe(testSubscriber);
@@ -53,7 +53,7 @@ public void testErrorOnSourceEmitItem() {
5353
@Test(timeout = 60000)
5454
public void testSourceError() throws Exception {
5555
Observable<String> source = Observable.<Void>error(new IllegalStateException())
56-
.continueWith(Observable.just("Hello"));
56+
.concatEmptyWith(Observable.just("Hello"));
5757

5858
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
5959
source.subscribe(testSubscriber);
@@ -71,7 +71,7 @@ public void testNoSubscribeBeforeSourceCompletion() {
7171
/*Delaying on complete event so to check that the subscription does not happen before completion*/
7272
Observable<String> source = Observable.<Void>empty()
7373
.observeOn(testScheduler)
74-
.continueWith(Observable.just(soleValue));
74+
.concatEmptyWith(Observable.just(soleValue));
7575

7676
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
7777
source.subscribe(testSubscriber);

0 commit comments

Comments
 (0)