diff --git a/build.gradle b/build.gradle index b9129c3..45ba40e 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'rxjava-project' apply plugin: 'java' dependencies { - compile 'io.reactivex:rxjava:1.0.+' + compile 'io.reactivex:rxjava:1.0.12' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index 8beecce..202f008 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -17,6 +17,7 @@ import rx.Observable; import rx.Observable.Operator; +import rx.Producer; import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func0; @@ -460,40 +461,69 @@ private void output(String part) { public static Observable join(final Observable source, final CharSequence separator) { return source.lift(new Operator() { @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - boolean mayAddSeparator; - StringBuilder b = new StringBuilder(); - - @Override - public void onCompleted() { - String str = b.toString(); - b = null; - if (!o.isUnsubscribed()) - o.onNext(str); - if (!o.isUnsubscribed()) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - b = null; - if (!o.isUnsubscribed()) - o.onError(e); - } - + public Subscriber call(final Subscriber child) { + final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator); + child.add(parent); + child.setProducer(new Producer() { @Override - public void onNext(String t) { - if (mayAddSeparator) { - b.append(separator); - } - mayAddSeparator = true; - b.append(t); - } - }; + public void request(long n) { + if (n > 0) { + parent.requestAll(); + } + }}); + return parent; } }); } + + private static final class JoinParentSubscriber extends Subscriber { + + private final Subscriber child; + private final CharSequence separator; + private boolean mayAddSeparator; + private StringBuilder b = new StringBuilder(); + + JoinParentSubscriber(Subscriber child, CharSequence separator) { + this.child = child; + this.separator = separator; + } + + void requestAll() { + request(Long.MAX_VALUE); + } + + @Override + public void onStart() { + request(0); + } + + @Override + public void onCompleted() { + String str = b.toString(); + b = null; + if (!child.isUnsubscribed()) + child.onNext(str); + if (!child.isUnsubscribed()) + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + b = null; + if (!child.isUnsubscribed()) + child.onError(e); + } + + @Override + public void onNext(String t) { + if (mayAddSeparator) { + b.append(separator); + } + mayAddSeparator = true; + b.append(t); + } + + } /** * Splits the {@link Observable} of Strings by lines and numbers them (zero based index) diff --git a/src/test/java/rx/observables/StringObservableTest.java b/src/test/java/rx/observables/StringObservableTest.java index dca6bf6..9be8106 100644 --- a/src/test/java/rx/observables/StringObservableTest.java +++ b/src/test/java/rx/observables/StringObservableTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.Observable.just; import static rx.observables.StringObservable.byCharacter; import static rx.observables.StringObservable.byLine; import static rx.observables.StringObservable.decode; @@ -456,4 +457,25 @@ public Observable call(Reader reader) { verify(reader, times(1)).close(); } + + @Test(timeout=5000) + public void testJoinDoesNotStallIssue23() { + String s = StringObservable + .join(just("a","b","c"),",") + .toBlocking().single(); + assertEquals("a,b,c", s); + } + + @Test + public void testJoinBackpressure() { + TestSubscriber ts = new TestSubscriber(0); + StringObservable + .join(just("a","b","c"),",") + .subscribe(ts); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + ts.requestMore(1); + ts.assertValues("a,b,c"); + ts.assertCompleted(); + } }