diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 3f31491b08..491cc3c495 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -11438,6 +11438,57 @@ public final Observable> toSortedList(Func2(sortFunction, initialCapacity)); } + /** + * Returns an Observable that emits the events emitted by source Observable, in a + * sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all + * other items in the sequence. + * + *

Note that calling {@code sorted} with long, non-terminating or infinite sources + * might cause {@link OutOfMemoryError} + * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Observable} in an + * unbounded manner (i.e., without applying backpressure to it).
+ *
Scheduler:
+ *
{@code sorted} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @throws ClassCastException + * if any item emitted by the Observable does not implement {@link Comparable} with respect to + * all other items emitted by the Observable + * @return an Observable that emits the items emitted by the source Observable in sorted order + */ + @Experimental + public final Observable sorted(){ + return toSortedList().flatMapIterable(UtilityFunctions.>identity()); + } + + /** + * Returns an Observable that emits the events emitted by source Observable, in a + * sorted order based on a specified comparison function. + * + *

Note that calling {@code sorted} with long, non-terminating or infinite sources + * might cause {@link OutOfMemoryError} + * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the source {@code Observable} in an + * unbounded manner (i.e., without applying backpressure to it).
+ *
Scheduler:
+ *
{@code sorted} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param sortFunction + * a function that compares two items emitted by the source Observable and returns an Integer + * that indicates their sort order + * @return an Observable that emits the items emitted by the source Observable in sorted order + */ + @Experimental + public final Observable sorted(Func2 sortFunction) { + return toSortedList(sortFunction).flatMapIterable(UtilityFunctions.>identity()); + } + /** * Modifies the source Observable so that subscribers will unsubscribe from it on a specified * {@link Scheduler}. diff --git a/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java b/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java index f89d7d640b..6fa3b40647 100644 --- a/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java +++ b/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java @@ -40,12 +40,11 @@ public void testSortedList() { Observable w = Observable.just(1, 3, 2, 5, 4); Observable> observable = w.toSortedList(); - @SuppressWarnings("unchecked") - Observer> observer = mock(Observer.class); - observable.subscribe(observer); - verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5)); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); + TestSubscriber> testSubscriber = new TestSubscriber>(); + observable.subscribe(testSubscriber); + testSubscriber.assertValue(Arrays.asList(1,2,3,4,5)); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); } @Test @@ -154,12 +153,11 @@ public void testSortedListCapacity() { Observable w = Observable.just(1, 3, 2, 5, 4); Observable> observable = w.toSortedList(4); - @SuppressWarnings("unchecked") - Observer> observer = mock(Observer.class); - observable.subscribe(observer); - verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5)); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); + TestSubscriber> testSubscriber = new TestSubscriber>(); + observable.subscribe(testSubscriber); + testSubscriber.assertValue(Arrays.asList(1,2,3,4,5)); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); } @Test @@ -172,12 +170,11 @@ public Integer call(Integer t1, Integer t2) { } }); - @SuppressWarnings("unchecked") - Observer> observer = mock(Observer.class); - observable.subscribe(observer); - verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1)); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); + TestSubscriber> testSubscriber = new TestSubscriber>(); + observable.subscribe(testSubscriber); + testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1)); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); } @Test @@ -190,11 +187,85 @@ public Integer call(Integer t1, Integer t2) { } }, 4); - @SuppressWarnings("unchecked") - Observer> observer = mock(Observer.class); - observable.subscribe(observer); - verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1)); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); + TestSubscriber> testSubscriber = new TestSubscriber>(); + observable.subscribe(testSubscriber); + testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1)); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); + } + + @Test + public void testSorted() { + Observable w = Observable.just(1, 3, 2, 5, 4); + Observable observable = w.sorted(); + + TestSubscriber testSubscriber = new TestSubscriber(); + observable.subscribe(testSubscriber); + testSubscriber.assertValues(1,2,3,4,5); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); + } + + @Test + public void testSortedWithCustomFunction() { + Observable w = Observable.just(1, 3, 2, 5, 4); + Observable observable = w.sorted(new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t2 - t1; + } + + }); + + TestSubscriber testSubscriber = new TestSubscriber(); + observable.subscribe(testSubscriber); + testSubscriber.assertValues(5,4,3,2,1); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); + } + + @Test + public void testSortedCustomComparator() { + Observable w = Observable.just(1, 3, 2, 5, 4); + Observable observable = w.sorted(new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1.compareTo(t2); + } + + }); + + TestSubscriber testSubscriber = new TestSubscriber(); + observable.subscribe(testSubscriber); + testSubscriber.assertValues(1,2,3,4,5); + testSubscriber.assertNoErrors(); + testSubscriber.assertCompleted(); + } + + @Test + public void testSortedWithNonComparable() { + NonComparable n1 = new NonComparable(1,"a"); + NonComparable n2 = new NonComparable(2,"b"); + NonComparable n3 = new NonComparable(3,"c"); + Observable w = Observable.just(n1,n2,n3); + + Observable observable = w.sorted(); + + TestSubscriber testSubscriber = new TestSubscriber(); + observable.subscribe(testSubscriber); + testSubscriber.assertNoValues(); + testSubscriber.assertError(ClassCastException.class); + testSubscriber.assertNotCompleted(); + } + + private final static class NonComparable{ + public int i; + public String s; + + NonComparable(int i, String s){ + this.i = i; + this.s = s; + } } }