Skip to content

Commit 0b8e4e0

Browse files
author
Aaron Tull
committed
Merge pull request #3447 from akarnokd/OnSubscribeDelaySubscriptionOther
1.x: DelaySubscription with a plain other Observable.
2 parents 7ba9067 + e41b215 commit 0b8e4e0

File tree

3 files changed

+351
-0
lines changed

3 files changed

+351
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4188,6 +4188,32 @@ public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>>
41884188
return create(new OnSubscribeDelaySubscriptionWithSelector<T, U>(this, subscriptionDelay));
41894189
}
41904190

4191+
/**
4192+
* Returns an Observable that delays the subscription to this Observable
4193+
* until the other Observable emits an element or completes normally.
4194+
* <p>
4195+
* <dl>
4196+
* <dt><b>Backpressure:</b></dt>
4197+
* <dd>The operator forwards the backpressure requests to this Observable once
4198+
* the subscription happens and requests Long.MAX_VALUE from the other Observable</dd>
4199+
* <dt><b>Scheduler:</b></dt>
4200+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
4201+
* </dl>
4202+
*
4203+
* @param <U> the value type of the other Observable, irrelevant
4204+
* @param other the other Observable that should trigger the subscription
4205+
* to this Observable.
4206+
* @return an Observable that delays the subscription to this Observable
4207+
* until the other Observable emits an element or completes normally.
4208+
*/
4209+
@Experimental
4210+
public final <U> Observable<T> delaySubscription(Observable<U> other) {
4211+
if (other == null) {
4212+
throw new NullPointerException();
4213+
}
4214+
return create(new OnSubscribeDelaySubscriptionOther<T, U>(this, other));
4215+
}
4216+
41914217
/**
41924218
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the
41934219
* {@link Notification} objects emitted by the source Observable into the items or notifications they
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.OnSubscribe;
21+
import rx.observers.Subscribers;
22+
import rx.plugins.*;
23+
import rx.subscriptions.SerialSubscription;
24+
25+
/**
26+
* Delays the subscription to the main source until the other
27+
* observable fires an event or completes.
28+
* @param <T> the main type
29+
* @param <U> the other value type, ignored
30+
*/
31+
public final class OnSubscribeDelaySubscriptionOther<T, U> implements OnSubscribe<T> {
32+
final Observable<? extends T> main;
33+
final Observable<U> other;
34+
35+
public OnSubscribeDelaySubscriptionOther(Observable<? extends T> main, Observable<U> other) {
36+
this.main = main;
37+
this.other = other;
38+
}
39+
40+
@Override
41+
public void call(Subscriber<? super T> t) {
42+
final Subscriber<T> child = Subscribers.wrap(t);
43+
44+
final SerialSubscription serial = new SerialSubscription();
45+
46+
Subscriber<U> otherSubscriber = new Subscriber<U>() {
47+
boolean done;
48+
@Override
49+
public void onNext(U t) {
50+
onCompleted();
51+
}
52+
53+
@Override
54+
public void onError(Throwable e) {
55+
if (done) {
56+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
57+
return;
58+
}
59+
done = true;
60+
child.onError(e);
61+
}
62+
63+
@Override
64+
public void onCompleted() {
65+
if (done) {
66+
return;
67+
}
68+
done = true;
69+
serial.set(child);
70+
71+
main.unsafeSubscribe(child);
72+
}
73+
};
74+
75+
serial.set(otherSubscriber);
76+
77+
other.unsafeSubscribe(otherSubscriber);
78+
}
79+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import org.junit.*;
22+
23+
import rx.Observable;
24+
import rx.exceptions.TestException;
25+
import rx.functions.Action0;
26+
import rx.observers.TestSubscriber;
27+
import rx.subjects.PublishSubject;
28+
29+
public class OnSubscribeDelaySubscriptionOtherTest {
30+
@Test
31+
public void testNoPrematureSubscription() {
32+
PublishSubject<Object> other = PublishSubject.create();
33+
34+
TestSubscriber<Integer> ts = TestSubscriber.create();
35+
36+
final AtomicInteger subscribed = new AtomicInteger();
37+
38+
Observable.just(1)
39+
.doOnSubscribe(new Action0() {
40+
@Override
41+
public void call() {
42+
subscribed.getAndIncrement();
43+
}
44+
})
45+
.delaySubscription(other)
46+
.subscribe(ts);
47+
48+
ts.assertNotCompleted();
49+
ts.assertNoErrors();
50+
ts.assertNoValues();
51+
52+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
53+
54+
other.onNext(1);
55+
56+
Assert.assertEquals("No subscription", 1, subscribed.get());
57+
58+
ts.assertValue(1);
59+
ts.assertNoErrors();
60+
ts.assertCompleted();
61+
}
62+
63+
@Test
64+
public void testNoMultipleSubscriptions() {
65+
PublishSubject<Object> other = PublishSubject.create();
66+
67+
TestSubscriber<Integer> ts = TestSubscriber.create();
68+
69+
final AtomicInteger subscribed = new AtomicInteger();
70+
71+
Observable.just(1)
72+
.doOnSubscribe(new Action0() {
73+
@Override
74+
public void call() {
75+
subscribed.getAndIncrement();
76+
}
77+
})
78+
.delaySubscription(other)
79+
.subscribe(ts);
80+
81+
ts.assertNotCompleted();
82+
ts.assertNoErrors();
83+
ts.assertNoValues();
84+
85+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
86+
87+
other.onNext(1);
88+
other.onNext(2);
89+
90+
Assert.assertEquals("No subscription", 1, subscribed.get());
91+
92+
ts.assertValue(1);
93+
ts.assertNoErrors();
94+
ts.assertCompleted();
95+
}
96+
97+
@Test
98+
public void testCompleteTriggersSubscription() {
99+
PublishSubject<Object> other = PublishSubject.create();
100+
101+
TestSubscriber<Integer> ts = TestSubscriber.create();
102+
103+
final AtomicInteger subscribed = new AtomicInteger();
104+
105+
Observable.just(1)
106+
.doOnSubscribe(new Action0() {
107+
@Override
108+
public void call() {
109+
subscribed.getAndIncrement();
110+
}
111+
})
112+
.delaySubscription(other)
113+
.subscribe(ts);
114+
115+
ts.assertNotCompleted();
116+
ts.assertNoErrors();
117+
ts.assertNoValues();
118+
119+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
120+
121+
other.onCompleted();
122+
123+
Assert.assertEquals("No subscription", 1, subscribed.get());
124+
125+
ts.assertValue(1);
126+
ts.assertNoErrors();
127+
ts.assertCompleted();
128+
}
129+
130+
@Test
131+
public void testNoPrematureSubscriptionToError() {
132+
PublishSubject<Object> other = PublishSubject.create();
133+
134+
TestSubscriber<Integer> ts = TestSubscriber.create();
135+
136+
final AtomicInteger subscribed = new AtomicInteger();
137+
138+
Observable.<Integer>error(new TestException())
139+
.doOnSubscribe(new Action0() {
140+
@Override
141+
public void call() {
142+
subscribed.getAndIncrement();
143+
}
144+
})
145+
.delaySubscription(other)
146+
.subscribe(ts);
147+
148+
ts.assertNotCompleted();
149+
ts.assertNoErrors();
150+
ts.assertNoValues();
151+
152+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
153+
154+
other.onCompleted();
155+
156+
Assert.assertEquals("No subscription", 1, subscribed.get());
157+
158+
ts.assertNoValues();
159+
ts.assertNotCompleted();
160+
ts.assertError(TestException.class);
161+
}
162+
163+
@Test
164+
public void testNoSubscriptionIfOtherErrors() {
165+
PublishSubject<Object> other = PublishSubject.create();
166+
167+
TestSubscriber<Integer> ts = TestSubscriber.create();
168+
169+
final AtomicInteger subscribed = new AtomicInteger();
170+
171+
Observable.<Integer>error(new TestException())
172+
.doOnSubscribe(new Action0() {
173+
@Override
174+
public void call() {
175+
subscribed.getAndIncrement();
176+
}
177+
})
178+
.delaySubscription(other)
179+
.subscribe(ts);
180+
181+
ts.assertNotCompleted();
182+
ts.assertNoErrors();
183+
ts.assertNoValues();
184+
185+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
186+
187+
other.onError(new TestException());
188+
189+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
190+
191+
ts.assertNoValues();
192+
ts.assertNotCompleted();
193+
ts.assertError(TestException.class);
194+
}
195+
196+
@Test
197+
public void testBackpressurePassesThrough() {
198+
199+
PublishSubject<Object> other = PublishSubject.create();
200+
201+
TestSubscriber<Integer> ts = TestSubscriber.create(0L);
202+
203+
final AtomicInteger subscribed = new AtomicInteger();
204+
205+
Observable.just(1, 2, 3, 4, 5)
206+
.doOnSubscribe(new Action0() {
207+
@Override
208+
public void call() {
209+
subscribed.getAndIncrement();
210+
}
211+
})
212+
.delaySubscription(other)
213+
.subscribe(ts);
214+
215+
ts.assertNotCompleted();
216+
ts.assertNoErrors();
217+
ts.assertNoValues();
218+
219+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
220+
221+
other.onNext(1);
222+
223+
Assert.assertEquals("No subscription", 1, subscribed.get());
224+
225+
Assert.assertFalse("Not unsubscribed from other", other.hasObservers());
226+
227+
ts.assertNotCompleted();
228+
ts.assertNoErrors();
229+
ts.assertNoValues();
230+
231+
ts.requestMore(1);
232+
ts.assertValue(1);
233+
ts.assertNoErrors();
234+
ts.assertNotCompleted();
235+
236+
ts.requestMore(2);
237+
ts.assertValues(1, 2, 3);
238+
ts.assertNoErrors();
239+
ts.assertNotCompleted();
240+
241+
ts.requestMore(10);
242+
ts.assertValues(1, 2, 3, 4, 5);
243+
ts.assertNoErrors();
244+
ts.assertCompleted();
245+
}
246+
}

0 commit comments

Comments
 (0)