diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 56a7058d26..835ccd7695 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -81,6 +81,9 @@ public static OperatorMerge instance(boolean delayErrors) { * @return */ public static OperatorMerge instance(boolean delayErrors, int maxConcurrent) { + if (maxConcurrent <= 0) { + throw new IllegalArgumentException("maxConcurrent > 0 required but it was " + maxConcurrent); + } if (maxConcurrent == Integer.MAX_VALUE) { return instance(delayErrors); } diff --git a/src/main/java/rx/internal/operators/OperatorSkip.java b/src/main/java/rx/internal/operators/OperatorSkip.java index 505c1491e7..bc63fe5e21 100644 --- a/src/main/java/rx/internal/operators/OperatorSkip.java +++ b/src/main/java/rx/internal/operators/OperatorSkip.java @@ -31,6 +31,9 @@ public final class OperatorSkip implements Observable.Operator { final int toSkip; public OperatorSkip(int n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } this.toSkip = n; } diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 78cf229bbc..2c40ac53d3 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -1333,4 +1333,24 @@ public void testConcurrencyLimit() { ts.assertValue(0); ts.assertCompleted(); } + + @Test + public void negativeMaxConcurrent() { + try { + Observable.merge(Arrays.asList(Observable.just(1), Observable.just(2)), -1); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("maxConcurrent > 0 required but it was -1", e.getMessage()); + } + } + + @Test + public void zeroMaxConcurrent() { + try { + Observable.merge(Arrays.asList(Observable.just(1), Observable.just(2)), 0); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("maxConcurrent > 0 required but it was 0", e.getMessage()); + } + } } diff --git a/src/test/java/rx/internal/operators/OperatorSkipTest.java b/src/test/java/rx/internal/operators/OperatorSkipTest.java index 0e9ca9367e..21ad07b251 100644 --- a/src/test/java/rx/internal/operators/OperatorSkipTest.java +++ b/src/test/java/rx/internal/operators/OperatorSkipTest.java @@ -16,6 +16,7 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -37,17 +38,12 @@ public class OperatorSkipTest { @Test public void testSkipNegativeElements() { - - Observable skip = Observable.just("one", "two", "three").lift(new OperatorSkip(-99)); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - skip.subscribe(observer); - verify(observer, times(1)).onNext("one"); - verify(observer, times(1)).onNext("two"); - verify(observer, times(1)).onNext("three"); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); + try { + Observable.just("one", "two", "three").skip(-99); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("n >= 0 required but it was -99", e.getMessage()); + } } @Test