Skip to content

Commit cece520

Browse files
committed
ensure StringObservable.join does not stall, see #23
1 parent 8ad398b commit cece520

File tree

3 files changed

+83
-31
lines changed

3 files changed

+83
-31
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply plugin: 'rxjava-project'
77
apply plugin: 'java'
88

99
dependencies {
10-
compile 'io.reactivex:rxjava:1.0.+'
10+
compile 'io.reactivex:rxjava:1.0.12'
1111
testCompile 'junit:junit-dep:4.10'
1212
testCompile 'org.mockito:mockito-core:1.8.5'
1313
}

src/main/java/rx/observables/StringObservable.java

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observable;
1919
import rx.Observable.Operator;
20+
import rx.Producer;
2021
import rx.Subscriber;
2122
import rx.functions.Action1;
2223
import rx.functions.Func0;
@@ -460,40 +461,69 @@ private void output(String part) {
460461
public static Observable<String> join(final Observable<String> source, final CharSequence separator) {
461462
return source.lift(new Operator<String, String>() {
462463
@Override
463-
public Subscriber<String> call(final Subscriber<? super String> o) {
464-
return new Subscriber<String>(o) {
465-
boolean mayAddSeparator;
466-
StringBuilder b = new StringBuilder();
467-
468-
@Override
469-
public void onCompleted() {
470-
String str = b.toString();
471-
b = null;
472-
if (!o.isUnsubscribed())
473-
o.onNext(str);
474-
if (!o.isUnsubscribed())
475-
o.onCompleted();
476-
}
477-
478-
@Override
479-
public void onError(Throwable e) {
480-
b = null;
481-
if (!o.isUnsubscribed())
482-
o.onError(e);
483-
}
484-
464+
public Subscriber<String> call(final Subscriber<? super String> child) {
465+
final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator);
466+
child.add(parent);
467+
child.setProducer(new Producer() {
485468
@Override
486-
public void onNext(String t) {
487-
if (mayAddSeparator) {
488-
b.append(separator);
489-
}
490-
mayAddSeparator = true;
491-
b.append(t);
492-
}
493-
};
469+
public void request(long n) {
470+
if (n > 0) {
471+
parent.requestAll();
472+
}
473+
}});
474+
return parent;
494475
}
495476
});
496477
}
478+
479+
private static final class JoinParentSubscriber extends Subscriber<String> {
480+
481+
private final Subscriber<? super String> child;
482+
private final CharSequence separator;
483+
private boolean mayAddSeparator;
484+
private StringBuilder b = new StringBuilder();
485+
486+
JoinParentSubscriber(Subscriber<? super String> child, CharSequence separator) {
487+
this.child = child;
488+
this.separator = separator;
489+
}
490+
491+
void requestAll() {
492+
request(Long.MAX_VALUE);
493+
}
494+
495+
@Override
496+
public void onStart() {
497+
request(0);
498+
}
499+
500+
@Override
501+
public void onCompleted() {
502+
String str = b.toString();
503+
b = null;
504+
if (!child.isUnsubscribed())
505+
child.onNext(str);
506+
if (!child.isUnsubscribed())
507+
child.onCompleted();
508+
}
509+
510+
@Override
511+
public void onError(Throwable e) {
512+
b = null;
513+
if (!child.isUnsubscribed())
514+
child.onError(e);
515+
}
516+
517+
@Override
518+
public void onNext(String t) {
519+
if (mayAddSeparator) {
520+
b.append(separator);
521+
}
522+
mayAddSeparator = true;
523+
b.append(t);
524+
}
525+
526+
}
497527

498528
/**
499529
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)

src/test/java/rx/observables/StringObservableTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.Mockito.spy;
2727
import static org.mockito.Mockito.times;
2828
import static org.mockito.Mockito.verify;
29+
import static rx.Observable.just;
2930
import static rx.observables.StringObservable.byCharacter;
3031
import static rx.observables.StringObservable.byLine;
3132
import static rx.observables.StringObservable.decode;
@@ -456,4 +457,25 @@ public Observable<String> call(Reader reader) {
456457

457458
verify(reader, times(1)).close();
458459
}
460+
461+
@Test(timeout=5000)
462+
public void testJoinDoesNotStallIssue23() {
463+
String s = StringObservable
464+
.join(just("a","b","c"),",")
465+
.toBlocking().single();
466+
assertEquals("a,b,c", s);
467+
}
468+
469+
@Test
470+
public void testJoinBackpressure() {
471+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
472+
StringObservable
473+
.join(just("a","b","c"),",")
474+
.subscribe(ts);
475+
ts.assertNoValues();
476+
ts.assertNoTerminalEvent();
477+
ts.requestMore(1);
478+
ts.assertValues("a,b,c");
479+
ts.assertCompleted();
480+
}
459481
}

0 commit comments

Comments
 (0)