diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java index 9242d6586d..c5fec0a13d 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithObservable.java @@ -119,7 +119,7 @@ public void onNext(T t) { do { drain(localQueue); if (once) { - once = true; + once = false; emitValue(t); } diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java index 8d6b9bb6a5..fd8a20fed3 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.verify; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.junit.Test; @@ -29,6 +30,8 @@ import rx.Observable; import rx.Observer; import rx.exceptions.TestException; +import rx.functions.Func0; +import rx.observers.TestSubscriber; import rx.subjects.PublishSubject; public class OperatorWindowWithObservableTest { @@ -252,4 +255,39 @@ public void onCompleted() { verify(o, never()).onCompleted(); verify(o).onError(any(TestException.class)); } + + @Test + public void testWindowNoDuplication() { + final PublishSubject source = PublishSubject.create(); + final TestSubscriber tsw = new TestSubscriber() { + boolean once; + @Override + public void onNext(Integer t) { + if (!once) { + once = true; + source.onNext(2); + } + super.onNext(t); + } + }; + TestSubscriber> ts = new TestSubscriber>() { + @Override + public void onNext(Observable t) { + t.subscribe(tsw); + super.onNext(t); + } + }; + source.window(new Func0>() { + @Override + public Observable call() { + return Observable.never(); + } + }).subscribe(ts); + + source.onNext(1); + source.onCompleted(); + + assertEquals(1, ts.getOnNextEvents().size()); + assertEquals(Arrays.asList(1, 2), tsw.getOnNextEvents()); + } } \ No newline at end of file