diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index c021331ff1..078c4fc9b0 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8578,7 +8578,7 @@ public final BlockingObservable toBlocking() { * you do not have the option to unsubscribe. *
*
Backpressure Support:
- *
This operator does not support backpressure as by intent it is requesting and buffering everything.
+ *
The operator buffers everything from its upstream but it only emits the aggregated list when the downstream requests at least one item.
*
Scheduler:
*
{@code toList} does not operate by default on a particular {@link Scheduler}.
*
@@ -8779,7 +8779,7 @@ public final Observable>> toMultimap(Func1 *
*
Backpressure Support:
- *
This operator does not support backpressure as by intent it is requesting and buffering everything.
+ *
The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.
*
Scheduler:
*
{@code toSortedList} does not operate by default on a particular {@link Scheduler}.
*
@@ -8792,7 +8792,7 @@ public final Observable>> toMultimap(Func1ReactiveX operators documentation: To */ public final Observable> toSortedList() { - return lift(new OperatorToObservableSortedList()); + return lift(new OperatorToObservableSortedList(10)); } /** @@ -8802,7 +8802,7 @@ public final Observable> toSortedList() { * *
*
Backpressure Support:
- *
This operator does not support backpressure as by intent it is requesting and buffering everything.
+ *
The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.
*
Scheduler:
*
{@code toSortedList} does not operate by default on a particular {@link Scheduler}.
*
@@ -8815,7 +8815,60 @@ public final Observable> toSortedList() { * @see ReactiveX operators documentation: To */ public final Observable> toSortedList(Func2 sortFunction) { - return lift(new OperatorToObservableSortedList(sortFunction)); + return lift(new OperatorToObservableSortedList(sortFunction, 10)); + } + + /** + * Returns an Observable that emits a list that contains the items emitted by the source Observable, in a + * sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all + * other items in the sequence. + *

+ * + *

+ *
Backpressure Support:
+ *
The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.
+ *
Scheduler:
+ *
{@code toSortedList} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @throws ClassCastException + * if any item emitted by the Observable does not implement {@link Comparable} with respect to + * all other items emitted by the Observable + * @param initialCapacity + * the initial capacity of the ArrayList used to accumulate items before sorting + * @return an Observable that emits a list that contains the items emitted by the source Observable in + * sorted order + * @see ReactiveX operators documentation: To + */ + @Experimental + public final Observable> toSortedList(int initialCapacity) { + return lift(new OperatorToObservableSortedList(initialCapacity)); + } + + /** + * Returns an Observable that emits a list that contains the items emitted by the source Observable, in a + * sorted order based on a specified comparison function. + *

+ * + *

+ *
Backpressure Support:
+ *
The operator buffers everything from its upstream but it only emits the sorted list when the downstream requests at least one item.
+ *
Scheduler:
+ *
{@code toSortedList} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param sortFunction + * a function that compares two items emitted by the source Observable and returns an Integer + * that indicates their sort order + * @param initialCapacity + * the initial capacity of the ArrayList used to accumulate items before sorting + * @return an Observable that emits a list that contains the items emitted by the source Observable in + * sorted order + * @see ReactiveX operators documentation: To + */ + @Experimental + public final Observable> toSortedList(Func2 sortFunction, int initialCapacity) { + return lift(new OperatorToObservableSortedList(sortFunction, initialCapacity)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorToObservableList.java b/src/main/java/rx/internal/operators/OperatorToObservableList.java index fef13577cc..8d7dff8f96 100644 --- a/src/main/java/rx/internal/operators/OperatorToObservableList.java +++ b/src/main/java/rx/internal/operators/OperatorToObservableList.java @@ -52,10 +52,11 @@ public static OperatorToObservableList instance() { private OperatorToObservableList() { } @Override public Subscriber call(final Subscriber> o) { - return new Subscriber(o) { + final SingleDelayedProducer> producer = new SingleDelayedProducer>(o); + Subscriber result = new Subscriber() { - private boolean completed = false; - final List list = new LinkedList(); + boolean completed = false; + List list = new LinkedList(); @Override public void onStart() { @@ -64,27 +65,32 @@ public void onStart() { @Override public void onCompleted() { - try { + if (!completed) { completed = true; - /* - * Ideally this should just return Collections.unmodifiableList(list) and not copy it, - * but, it ends up being a breaking change if we make that modification. - * - * Here is an example of is being done with these lists that breaks if we make it immutable: - * - * Caused by: java.lang.UnsupportedOperationException - * at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244) - * at java.util.Collections.sort(Collections.java:221) - * ... - * Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class - * at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98) - * at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56) - * ... 419 more - */ - o.onNext(new ArrayList(list)); - o.onCompleted(); - } catch (Throwable e) { - onError(e); + List result; + try { + /* + * Ideally this should just return Collections.unmodifiableList(list) and not copy it, + * but, it ends up being a breaking change if we make that modification. + * + * Here is an example of is being done with these lists that breaks if we make it immutable: + * + * Caused by: java.lang.UnsupportedOperationException + * at java.util.Collections$UnmodifiableList$1.set(Collections.java:1244) + * at java.util.Collections.sort(Collections.java:221) + * ... + * Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: UnmodifiableList.class + * at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98) + * at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56) + * ... 419 more + */ + result = new ArrayList(list); + } catch (Throwable t) { + onError(t); + return; + } + list = null; + producer.set(result); } } @@ -101,6 +107,9 @@ public void onNext(T value) { } }; + o.add(result); + o.setProducer(producer); + return result; } } diff --git a/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java b/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java index afe9d3ee94..f2d5cb9948 100644 --- a/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java +++ b/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java @@ -15,13 +15,10 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; +import java.util.*; import rx.Observable.Operator; -import rx.Subscriber; +import rx.*; import rx.functions.Func2; /** @@ -35,23 +32,33 @@ * the type of the items emitted by the source and the resulting {@code Observable}s */ public final class OperatorToObservableSortedList implements Operator, T> { - private final Func2 sortFunction; + private final Comparator sortFunction; + private final int initialCapacity; @SuppressWarnings("unchecked") - public OperatorToObservableSortedList() { - this.sortFunction = defaultSortFunction; + public OperatorToObservableSortedList(int initialCapacity) { + this.sortFunction = DEFAULT_SORT_FUNCTION; + this.initialCapacity = initialCapacity; } - public OperatorToObservableSortedList(Func2 sortFunction) { - this.sortFunction = sortFunction; + public OperatorToObservableSortedList(final Func2 sortFunction, int initialCapacity) { + this.initialCapacity = initialCapacity; + this.sortFunction = new Comparator() { + @Override + public int compare(T o1, T o2) { + return sortFunction.call(o1, o2); + } + }; } @Override - public Subscriber call(final Subscriber> o) { - return new Subscriber(o) { - - final List list = new ArrayList(); + public Subscriber call(final Subscriber> child) { + final SingleDelayedProducer> producer = new SingleDelayedProducer>(child); + Subscriber result = new Subscriber() { + List list = new ArrayList(initialCapacity); + boolean completed; + @Override public void onStart() { request(Long.MAX_VALUE); @@ -59,48 +66,48 @@ public void onStart() { @Override public void onCompleted() { - try { - - // sort the list before delivery - Collections.sort(list, new Comparator() { - - @Override - public int compare(T o1, T o2) { - return sortFunction.call(o1, o2); - } - - }); - - o.onNext(Collections.unmodifiableList(list)); - o.onCompleted(); - } catch (Throwable e) { - onError(e); + if (!completed) { + completed = true; + List a = list; + list = null; + try { + // sort the list before delivery + Collections.sort(a, sortFunction); + } catch (Throwable e) { + onError(e); + return; + } + producer.set(a); } } @Override public void onError(Throwable e) { - o.onError(e); + child.onError(e); } @Override public void onNext(T value) { - list.add(value); + if (!completed) { + list.add(value); + } } }; + child.add(result); + child.setProducer(producer); + return result; } - // raw because we want to support Object for this default @SuppressWarnings("rawtypes") - private static Func2 defaultSortFunction = new DefaultComparableFunction(); + private static Comparator DEFAULT_SORT_FUNCTION = new DefaultComparableFunction(); - private static class DefaultComparableFunction implements Func2 { + private static class DefaultComparableFunction implements Comparator { // unchecked because we want to support Object for this default @SuppressWarnings("unchecked") @Override - public Integer call(Object t1, Object t2) { + public int compare(Object t1, Object t2) { Comparable c1 = (Comparable) t1; Comparable c2 = (Comparable) t2; return c1.compareTo(c2); diff --git a/src/main/java/rx/internal/operators/SingleDelayedProducer.java b/src/main/java/rx/internal/operators/SingleDelayedProducer.java new file mode 100644 index 0000000000..9405250ac5 --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleDelayedProducer.java @@ -0,0 +1,87 @@ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicInteger; + +import rx.*; + +/** + * A producer that holds a single value until it is requested and emits it followed by an onCompleted. + */ +public final class SingleDelayedProducer extends AtomicInteger implements Producer { + /** */ + private static final long serialVersionUID = 4721551710164477552L; + /** The actual child. */ + final Subscriber child; + /** The value to emit, acquired and released by compareAndSet. */ + T value; + /** State flag: request() called with positive value. */ + static final int REQUESTED = 1; + /** State flag: set() called. */ + static final int SET = 2; + /** + * Constructs a SingleDelayedProducer with the given child as output. + * @param child the subscriber to emit the value and completion events + */ + public SingleDelayedProducer(Subscriber child) { + this.child = child; + } + @Override + public void request(long n) { + if (n > 0) { + for (;;) { + int s = get(); + // if already requested + if ((s & REQUESTED) != 0) { + break; + } + int u = s | REQUESTED; + if (compareAndSet(s, u)) { + if ((s & SET) != 0) { + emit(); + } + break; + } + } + } + } + /** + * Sets the value to be emitted and emits it if there was a request. + * Should be called only once and from a single thread + * @param value the value to set and possibly emit + */ + public void set(T value) { + for (;;) { + int s = get(); + // if already set + if ((s & SET) != 0) { + break; + } + int u = s | SET; + this.value = value; + if (compareAndSet(s, u)) { + if ((s & REQUESTED) != 0) { + emit(); + } + break; + } + } + } + /** + * Emits the set value if the child is not unsubscribed and bounces back + * exceptions caught from child.onNext. + */ + void emit() { + try { + T v = value; + value = null; // do not hold onto the value + if (child.isUnsubscribed()) { + return; + } + child.onNext(v); + } catch (Throwable t) { + child.onError(t); + return; + } + child.onCompleted(); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorToObservableListTest.java b/src/test/java/rx/internal/operators/OperatorToObservableListTest.java index 298c6f5f62..c38786b286 100644 --- a/src/test/java/rx/internal/operators/OperatorToObservableListTest.java +++ b/src/test/java/rx/internal/operators/OperatorToObservableListTest.java @@ -15,20 +15,26 @@ */ package rx.internal.operators; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import java.util.concurrent.*; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import rx.*; import rx.Observable; import rx.Observer; +import rx.functions.Action0; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorToObservableListTest { @@ -101,4 +107,79 @@ public void testListWithBlockingFirst() { List actual = o.toList().toBlocking().first(); Assert.assertEquals(Arrays.asList("one", "two", "three"), actual); } + @Test + public void testBackpressureHonored() { + Observable> w = Observable.just(1, 2, 3, 4, 5).toList(); + TestSubscriber> ts = new TestSubscriber>() { + @Override + public void onStart() { + requestMore(0); + } + }; + + w.subscribe(ts); + + assertTrue(ts.getOnNextEvents().isEmpty()); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1, 2, 3, 4, 5))); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertEquals(1, ts.getOnCompletedEvents().size()); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1, 2, 3, 4, 5))); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertEquals(1, ts.getOnCompletedEvents().size()); + } + @Test(timeout = 2000) + public void testAsyncRequested() { + Scheduler.Worker w = Schedulers.newThread().createWorker(); + try { + for (int i = 0; i < 1000; i++) { + if (i % 50 == 0) { + System.out.println("testAsyncRequested -> " + i); + } + PublishSubject source = PublishSubject.create(); + Observable> sorted = source.toList(); + + final CyclicBarrier cb = new CyclicBarrier(2); + final TestSubscriber> ts = new TestSubscriber>() { + @Override + public void onStart() { + requestMore(0); + } + }; + sorted.subscribe(ts); + w.schedule(new Action0() { + @Override + public void call() { + await(cb); + ts.requestMore(1); + } + }); + source.onNext(1); + await(cb); + source.onCompleted(); + ts.awaitTerminalEvent(1, TimeUnit.SECONDS); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1))); + } + } finally { + w.unsubscribe(); + } + } + static void await(CyclicBarrier cb) { + try { + cb.await(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } catch (BrokenBarrierException ex) { + ex.printStackTrace(); + } + } } diff --git a/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java b/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java index d304e9443e..0b1d64bf87 100644 --- a/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java +++ b/src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java @@ -15,29 +15,30 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import java.util.concurrent.*; import org.junit.Test; import org.mockito.Mockito; +import rx.*; import rx.Observable; import rx.Observer; -import rx.functions.Func2; -import rx.internal.operators.OperatorToObservableSortedList; +import rx.functions.*; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorToObservableSortedListTest { @Test public void testSortedList() { Observable w = Observable.just(1, 3, 2, 5, 4); - Observable> observable = w.lift(new OperatorToObservableSortedList()); + Observable> observable = w.toSortedList(); @SuppressWarnings("unchecked") Observer> observer = mock(Observer.class); @@ -50,14 +51,14 @@ public void testSortedList() { @Test public void testSortedListWithCustomFunction() { Observable w = Observable.just(1, 3, 2, 5, 4); - Observable> observable = w.lift(new OperatorToObservableSortedList(new Func2() { + Observable> observable = w.toSortedList(new Func2() { @Override public Integer call(Integer t1, Integer t2) { return t2 - t1; } - })); + }); @SuppressWarnings("unchecked") Observer> observer = mock(Observer.class); @@ -72,4 +73,79 @@ public void testWithFollowingFirst() { Observable o = Observable.just(1, 3, 2, 5, 4); assertEquals(Arrays.asList(1, 2, 3, 4, 5), o.toSortedList().toBlocking().first()); } + @Test + public void testBackpressureHonored() { + Observable> w = Observable.just(1, 3, 2, 5, 4).toSortedList(); + TestSubscriber> ts = new TestSubscriber>() { + @Override + public void onStart() { + requestMore(0); + } + }; + + w.subscribe(ts); + + assertTrue(ts.getOnNextEvents().isEmpty()); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1, 2, 3, 4, 5))); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertEquals(1, ts.getOnCompletedEvents().size()); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1, 2, 3, 4, 5))); + assertTrue(ts.getOnErrorEvents().isEmpty()); + assertEquals(1, ts.getOnCompletedEvents().size()); + } + @Test(timeout = 2000) + public void testAsyncRequested() { + Scheduler.Worker w = Schedulers.newThread().createWorker(); + try { + for (int i = 0; i < 1000; i++) { + if (i % 50 == 0) { + System.out.println("testAsyncRequested -> " + i); + } + PublishSubject source = PublishSubject.create(); + Observable> sorted = source.toSortedList(); + + final CyclicBarrier cb = new CyclicBarrier(2); + final TestSubscriber> ts = new TestSubscriber>() { + @Override + public void onStart() { + requestMore(0); + } + }; + sorted.subscribe(ts); + w.schedule(new Action0() { + @Override + public void call() { + await(cb); + ts.requestMore(1); + } + }); + source.onNext(1); + await(cb); + source.onCompleted(); + ts.awaitTerminalEvent(1, TimeUnit.SECONDS); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.singletonList(Arrays.asList(1))); + } + } finally { + w.unsubscribe(); + } + } + static void await(CyclicBarrier cb) { + try { + cb.await(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } catch (BrokenBarrierException ex) { + ex.printStackTrace(); + } + } }