diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index ead34cba61..b74b27c683 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4941,7 +4941,7 @@ public final Observable groupJoin(Observable right, Func1 * @see ReactiveX operators documentation: IgnoreElements */ public final Observable ignoreElements() { - return filter(UtilityFunctions.alwaysFalse()); + return lift(OperatorIgnoreElements. instance()); } /** diff --git a/src/main/java/rx/internal/operators/OperatorIgnoreElements.java b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java new file mode 100644 index 0000000000..3f38d8e585 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java @@ -0,0 +1,60 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.Observable.Operator; +import rx.Subscriber; + +public class OperatorIgnoreElements implements Operator { + + private static class Holder { + static final OperatorIgnoreElements INSTANCE = new OperatorIgnoreElements(); + } + + @SuppressWarnings("unchecked") + public static OperatorIgnoreElements instance() { + return (OperatorIgnoreElements) Holder.INSTANCE; + } + + private OperatorIgnoreElements() { + + } + + @Override + public Subscriber call(final Subscriber child) { + Subscriber parent = new Subscriber() { + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + // ignore element + } + + }; + child.add(parent); + return parent; + } + +} diff --git a/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java b/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java new file mode 100644 index 0000000000..818f228ba8 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java @@ -0,0 +1,130 @@ +package rx.internal.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.observers.TestSubscriber; + +public class OperatorIgnoreElementsTest { + + @Test + public void testWithEmpty() { + assertTrue(Observable.empty().ignoreElements().isEmpty().toBlocking().single()); + } + + @Test + public void testWithNonEmpty() { + assertTrue(Observable.just(1, 2, 3).ignoreElements().isEmpty().toBlocking().single()); + } + + @Test + public void testUpstreamIsProcessedButIgnored() { + final int num = 10; + final AtomicInteger upstreamCount = new AtomicInteger(); + int count = Observable.range(1, num) + .doOnNext(new Action1() { + @Override + public void call(Integer t) { + upstreamCount.incrementAndGet(); + } + }) + .ignoreElements() + .count().toBlocking().single(); + assertEquals(num, upstreamCount.get()); + assertEquals(0, count); + } + + @Test + public void testCompletedOk() { + TestSubscriber ts = new TestSubscriber(); + Observable.range(1, 10).ignoreElements().subscribe(ts); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList()); + ts.assertTerminalEvent(); + ts.assertUnsubscribed(); + } + + @Test + public void testErrorReceived() { + TestSubscriber ts = new TestSubscriber(); + RuntimeException ex = new RuntimeException("boo"); + Observable.error(ex).ignoreElements().subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList()); + ts.assertTerminalEvent(); + ts.assertUnsubscribed(); + assertEquals(1, ts.getOnErrorEvents().size()); + assertEquals("boo", ts.getOnErrorEvents().get(0).getMessage()); + } + + @Test + public void testUnsubscribesFromUpstream() { + final AtomicBoolean unsub = new AtomicBoolean(); + Observable.range(1, 10).doOnUnsubscribe(new Action0() { + @Override + public void call() { + unsub.set(true); + }}) + .subscribe(); + assertTrue(unsub.get()); + } + + @Test(timeout = 10000) + public void testDoesNotHangAndProcessesAllUsingBackpressure() { + final AtomicInteger upstreamCount = new AtomicInteger(); + final AtomicInteger count = new AtomicInteger(0); + int num = 10; + Observable.range(1, num) + // + .doOnNext(new Action1() { + @Override + public void call(Integer t) { + upstreamCount.incrementAndGet(); + } + }) + // + .ignoreElements() + // + .doOnNext(new Action1() { + + @Override + public void call(Integer t) { + upstreamCount.incrementAndGet(); + } + }) + // + .subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer t) { + count.incrementAndGet(); + } + }); + assertEquals(num, upstreamCount.get()); + assertEquals(0, count.get()); + } + +} diff --git a/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java b/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java index 3ca921daf0..2dcae73fc0 100644 --- a/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java +++ b/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java @@ -14,7 +14,6 @@ import rx.Observable; import rx.Subscriber; -import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import rx.observers.TestSubscriber;