From 54a3f8d1aa684c8cf7dfbab7b54c9464ab32c6d6 Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sun, 8 Jan 2017 11:50:47 +0100 Subject: [PATCH] 2.x: Add @CheckReturnValue to create methods of Subjects + Processors --- src/main/java/io/reactivex/processors/AsyncProcessor.java | 2 ++ .../java/io/reactivex/processors/BehaviorProcessor.java | 3 +++ src/main/java/io/reactivex/processors/PublishProcessor.java | 2 ++ src/main/java/io/reactivex/processors/ReplayProcessor.java | 6 ++++++ src/main/java/io/reactivex/processors/UnicastProcessor.java | 4 ++++ src/main/java/io/reactivex/subjects/AsyncSubject.java | 2 ++ src/main/java/io/reactivex/subjects/BehaviorSubject.java | 3 +++ src/main/java/io/reactivex/subjects/PublishSubject.java | 2 ++ src/main/java/io/reactivex/subjects/ReplaySubject.java | 6 ++++++ src/main/java/io/reactivex/subjects/UnicastSubject.java | 4 ++++ 10 files changed, 34 insertions(+) diff --git a/src/main/java/io/reactivex/processors/AsyncProcessor.java b/src/main/java/io/reactivex/processors/AsyncProcessor.java index 3ab4fcf44e..59495fa551 100644 --- a/src/main/java/io/reactivex/processors/AsyncProcessor.java +++ b/src/main/java/io/reactivex/processors/AsyncProcessor.java @@ -12,6 +12,7 @@ */ package io.reactivex.processors; +import io.reactivex.annotations.CheckReturnValue; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; @@ -49,6 +50,7 @@ public final class AsyncProcessor extends FlowableProcessor { * @param the value type to be received and emitted * @return the new AsyncProcessor instance */ + @CheckReturnValue public static AsyncProcessor create() { return new AsyncProcessor(); } diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 90c3f103fb..11a4ed4ee7 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -13,6 +13,7 @@ package io.reactivex.processors; +import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; @@ -97,6 +98,7 @@ public final class BehaviorProcessor extends FlowableProcessor { * the type of item the Subject will emit * @return the constructed {@link BehaviorProcessor} */ + @CheckReturnValue public static BehaviorProcessor create() { return new BehaviorProcessor(); } @@ -112,6 +114,7 @@ public static BehaviorProcessor create() { * {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable} * @return the constructed {@link BehaviorProcessor} */ + @CheckReturnValue public static BehaviorProcessor createDefault(T defaultValue) { ObjectHelper.requireNonNull(defaultValue, "defaultValue is null"); return new BehaviorProcessor(defaultValue); diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index 6605bf9150..9dad6142c6 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -12,6 +12,7 @@ */ package io.reactivex.processors; +import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -74,6 +75,7 @@ public final class PublishProcessor extends FlowableProcessor { * @param the value type * @return the new PublishProcessor */ + @CheckReturnValue public static PublishProcessor create() { return new PublishProcessor(); } diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 89f6af0014..cbe0b3027b 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -13,6 +13,7 @@ package io.reactivex.processors; +import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.TimeUnit; @@ -89,6 +90,7 @@ public final class ReplayProcessor extends FlowableProcessor { * the type of items observed and emitted by the ReplayProcessor * @return the created ReplayProcessor */ + @CheckReturnValue public static ReplayProcessor create() { return new ReplayProcessor(new UnboundedReplayBuffer(16)); } @@ -108,6 +110,7 @@ public static ReplayProcessor create() { * the initial buffer capacity * @return the created subject */ + @CheckReturnValue public static ReplayProcessor create(int capacityHint) { return new ReplayProcessor(new UnboundedReplayBuffer(capacityHint)); } @@ -132,6 +135,7 @@ public static ReplayProcessor create(int capacityHint) { * the maximum number of buffered items * @return the created subject */ + @CheckReturnValue public static ReplayProcessor createWithSize(int maxSize) { return new ReplayProcessor(new SizeBoundReplayBuffer(maxSize)); } @@ -185,6 +189,7 @@ public static ReplayProcessor createWithSize(int maxSize) { * the {@link Scheduler} that provides the current time * @return the created subject */ + @CheckReturnValue public static ReplayProcessor createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) { return new ReplayProcessor(new SizeAndTimeBoundReplayBuffer(Integer.MAX_VALUE, maxAge, unit, scheduler)); } @@ -223,6 +228,7 @@ public static ReplayProcessor createWithTime(long maxAge, TimeUnit unit, * the {@link Scheduler} that provides the current time * @return the created subject */ + @CheckReturnValue public static ReplayProcessor createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) { return new ReplayProcessor(new SizeAndTimeBoundReplayBuffer(maxSize, maxAge, unit, scheduler)); } diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index 4f0f92192e..97af598de0 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -13,6 +13,7 @@ package io.reactivex.processors; +import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -65,6 +66,7 @@ public final class UnicastProcessor extends FlowableProcessor { * @param the value type * @return an UnicastSubject instance */ + @CheckReturnValue public static UnicastProcessor create() { return new UnicastProcessor(bufferSize()); } @@ -75,6 +77,7 @@ public static UnicastProcessor create() { * @param capacityHint the hint to size the internal unbounded buffer * @return an UnicastProcessor instance */ + @CheckReturnValue public static UnicastProcessor create(int capacityHint) { return new UnicastProcessor(capacityHint); } @@ -91,6 +94,7 @@ public static UnicastProcessor create(int capacityHint) { * @param onCancelled the non null callback * @return an UnicastProcessor instance */ + @CheckReturnValue public static UnicastProcessor create(int capacityHint, Runnable onCancelled) { return new UnicastProcessor(capacityHint, onCancelled); } diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index 7c013f7b72..b6ab967304 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.CheckReturnValue; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +52,7 @@ public final class AsyncSubject extends Subject { * @param the value type to be received and emitted * @return the new AsyncProcessor instance */ + @CheckReturnValue public static AsyncSubject create() { return new AsyncSubject(); } diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index cd0e2c366a..0396c4168c 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.*; @@ -96,6 +97,7 @@ public final class BehaviorSubject extends Subject { * the type of item the Subject will emit * @return the constructed {@link BehaviorSubject} */ + @CheckReturnValue public static BehaviorSubject create() { return new BehaviorSubject(); } @@ -111,6 +113,7 @@ public static BehaviorSubject create() { * {@link BehaviorSubject} has not yet observed any items from its source {@code Observable} * @return the constructed {@link BehaviorSubject} */ + @CheckReturnValue public static BehaviorSubject createDefault(T defaultValue) { return new BehaviorSubject(defaultValue); } diff --git a/src/main/java/io/reactivex/subjects/PublishSubject.java b/src/main/java/io/reactivex/subjects/PublishSubject.java index a516dcef0e..9955ce3faa 100644 --- a/src/main/java/io/reactivex/subjects/PublishSubject.java +++ b/src/main/java/io/reactivex/subjects/PublishSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; import io.reactivex.Observer; @@ -63,6 +64,7 @@ public final class PublishSubject extends Subject { * @param the value type * @return the new PublishSubject */ + @CheckReturnValue public static PublishSubject create() { return new PublishSubject(); } diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 3965a97af1..ee5a55839d 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -20,6 +20,7 @@ import io.reactivex.Observer; import io.reactivex.Scheduler; +import io.reactivex.annotations.CheckReturnValue; import io.reactivex.disposables.Disposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.util.NotificationLite; @@ -74,6 +75,7 @@ public final class ReplaySubject extends Subject { * the type of items observed and emitted by the Subject * @return the created subject */ + @CheckReturnValue public static ReplaySubject create() { return new ReplaySubject(new UnboundedReplayBuffer(16)); } @@ -93,6 +95,7 @@ public static ReplaySubject create() { * the initial buffer capacity * @return the created subject */ + @CheckReturnValue public static ReplaySubject create(int capacityHint) { return new ReplaySubject(new UnboundedReplayBuffer(capacityHint)); } @@ -117,6 +120,7 @@ public static ReplaySubject create(int capacityHint) { * the maximum number of buffered items * @return the created subject */ + @CheckReturnValue public static ReplaySubject createWithSize(int maxSize) { return new ReplaySubject(new SizeBoundReplayBuffer(maxSize)); } @@ -170,6 +174,7 @@ public static ReplaySubject createWithSize(int maxSize) { * the {@link Scheduler} that provides the current time * @return the created subject */ + @CheckReturnValue public static ReplaySubject createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) { return new ReplaySubject(new SizeAndTimeBoundReplayBuffer(Integer.MAX_VALUE, maxAge, unit, scheduler)); } @@ -208,6 +213,7 @@ public static ReplaySubject createWithTime(long maxAge, TimeUnit unit, Sc * the {@link Scheduler} that provides the current time * @return the created subject */ + @CheckReturnValue public static ReplaySubject createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) { return new ReplaySubject(new SizeAndTimeBoundReplayBuffer(maxSize, maxAge, unit, scheduler)); } diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 521fd49a7a..376b29a0c2 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.Observer; +import io.reactivex.annotations.CheckReturnValue; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -73,6 +74,7 @@ public final class UnicastSubject extends Subject { * @param the value type * @return an UnicastSubject instance */ + @CheckReturnValue public static UnicastSubject create() { return new UnicastSubject(bufferSize()); } @@ -83,6 +85,7 @@ public static UnicastSubject create() { * @param capacityHint the hint to size the internal unbounded buffer * @return an UnicastSubject instance */ + @CheckReturnValue public static UnicastSubject create(int capacityHint) { return new UnicastSubject(capacityHint); } @@ -99,6 +102,7 @@ public static UnicastSubject create(int capacityHint) { * @param onCancelled the non null callback * @return an UnicastSubject instance */ + @CheckReturnValue public static UnicastSubject create(int capacityHint, Runnable onCancelled) { return new UnicastSubject(capacityHint, onCancelled); }