diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 328fe5770f..3f91e665b2 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -15,20 +15,20 @@ */ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + /** * Holds a fixed pool of worker threads and assigns them * to requested Scheduler.Workers in a round-robin fashion. */ public final class ComputationScheduler extends Scheduler { /** This will indicate no pool is active. */ - static final FixedSchedulerPool NONE = new FixedSchedulerPool(0); + static final FixedSchedulerPool NONE; /** Manages a fixed number of workers. */ private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool"; static final RxThreadFactory THREAD_FACTORY; @@ -42,6 +42,7 @@ public final class ComputationScheduler extends Scheduler { static final PoolWorker SHUTDOWN_WORKER; + final ThreadFactory threadFactory; final AtomicReference pool; /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority"; @@ -56,6 +57,9 @@ public final class ComputationScheduler extends Scheduler { Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY))); THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); + + NONE = new FixedSchedulerPool(0, THREAD_FACTORY); + NONE.shutdown(); } static int cap(int cpuCount, int paramThreads) { @@ -68,12 +72,12 @@ static final class FixedSchedulerPool { final PoolWorker[] eventLoops; long n; - FixedSchedulerPool(int maxThreads) { + FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) { // initialize event loops this.cores = maxThreads; this.eventLoops = new PoolWorker[maxThreads]; for (int i = 0; i < maxThreads; i++) { - this.eventLoops[i] = new PoolWorker(THREAD_FACTORY); + this.eventLoops[i] = new PoolWorker(threadFactory); } } @@ -98,6 +102,18 @@ public void shutdown() { * count and using least-recent worker selection policy. */ public ComputationScheduler() { + this(THREAD_FACTORY); + } + + /** + * Create a scheduler with pool size equal to the available processor + * count and using least-recent worker selection policy. + * + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public ComputationScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; this.pool = new AtomicReference(NONE); start(); } @@ -121,7 +137,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo @Override public void start() { - FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS); + FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index 710ab9a04c..8030cd7333 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -16,13 +16,13 @@ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + /** * Scheduler that creates and caches a set of thread pools and reuses them if possible. */ @@ -37,6 +37,7 @@ public final class IoScheduler extends Scheduler { private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS; static final ThreadWorker SHUTDOWN_THREAD_WORKER; + final ThreadFactory threadFactory; final AtomicReference pool; /** The name of the system property for setting the thread priority for this Scheduler. */ @@ -44,9 +45,6 @@ public final class IoScheduler extends Scheduler { static final CachedWorkerPool NONE; static { - NONE = new CachedWorkerPool(0, null); - NONE.shutdown(); - SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown")); SHUTDOWN_THREAD_WORKER.dispose(); @@ -56,6 +54,9 @@ public final class IoScheduler extends Scheduler { WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); + + NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); + NONE.shutdown(); } static final class CachedWorkerPool implements Runnable { @@ -64,11 +65,13 @@ static final class CachedWorkerPool implements Runnable { final CompositeDisposable allWorkers; private final ScheduledExecutorService evictorService; private final Future evictorTask; + private final ThreadFactory threadFactory; - CachedWorkerPool(long keepAliveTime, TimeUnit unit) { + CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; this.expiringWorkerQueue = new ConcurrentLinkedQueue(); this.allWorkers = new CompositeDisposable(); + this.threadFactory = threadFactory; ScheduledExecutorService evictor = null; Future task = null; @@ -97,7 +100,7 @@ ThreadWorker get() { } // No cached worker found, so create a new one. - ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY); + ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } @@ -143,13 +146,22 @@ void shutdown() { } public IoScheduler() { + this(WORKER_THREAD_FACTORY); + } + + /** + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public IoScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; this.pool = new AtomicReference(NONE); start(); } @Override public void start() { - CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); + CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java index 52f2426f4d..e78f897d7e 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java @@ -18,16 +18,18 @@ import io.reactivex.Scheduler; +import java.util.concurrent.ThreadFactory; + /** * Schedules work on a new thread. */ public final class NewThreadScheduler extends Scheduler { + final ThreadFactory threadFactory; + private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler"; private static final RxThreadFactory THREAD_FACTORY; - private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); - /** The name of the system property for setting the thread priority for this Scheduler. */ private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority"; @@ -38,16 +40,16 @@ public final class NewThreadScheduler extends Scheduler { THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); } - public static NewThreadScheduler instance() { - return INSTANCE; + public NewThreadScheduler() { + this(THREAD_FACTORY); } - private NewThreadScheduler() { - + public NewThreadScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; } @Override public Worker createWorker() { - return new NewThreadWorker(THREAD_FACTORY); + return new NewThreadWorker(threadFactory); } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index ce65031280..f082921164 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -12,20 +12,21 @@ */ package io.reactivex.internal.schedulers; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; - import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + /** * A scheduler with a shared, single threaded underlying ScheduledExecutorService. * @since 2.0 */ public final class SingleScheduler extends Scheduler { + final ThreadFactory threadFactory; final AtomicReference executor = new AtomicReference(); /** The name of the system property for setting the thread priority for this Scheduler. */ @@ -47,11 +48,20 @@ public final class SingleScheduler extends Scheduler { } public SingleScheduler() { - executor.lazySet(createExecutor()); + this(SINGLE_THREAD_FACTORY); + } + + /** + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + */ + public SingleScheduler(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + executor.lazySet(createExecutor(threadFactory)); } - static ScheduledExecutorService createExecutor() { - return SchedulerPoolFactory.create(SINGLE_THREAD_FACTORY); + static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) { + return SchedulerPoolFactory.create(threadFactory); } @Override @@ -66,7 +76,7 @@ public void start() { return; } if (next == null) { - next = createExecutor(); + next = createExecutor(threadFactory); } if (executor.compareAndSet(current, next)) { return; diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 52061cd23f..8fe57c8405 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -12,17 +12,19 @@ */ package io.reactivex.plugins; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.Callable; - -import io.reactivex.internal.functions.ObjectHelper; -import org.reactivestreams.Subscriber; - import io.reactivex.*; +import io.reactivex.annotations.Experimental; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Subscriber; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.*; /** * Utility class to inject handlers to certain standard RxJava operations. @@ -926,6 +928,58 @@ public static Completable onAssembly(Completable source) { return source; } + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + * @since 2.0.5 - experimental + */ + @Experimental + public static Scheduler createComputationScheduler(ThreadFactory threadFactory) { + return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + * @since 2.0.5 - experimental + */ + @Experimental + public static Scheduler createIoScheduler(ThreadFactory threadFactory) { + return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + * @since 2.0.5 - experimental + */ + @Experimental + public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) { + return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()} + * except using {@code threadFactory} for thread creation. + * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any + * system properties for configuring new thread creation. Cannot be null. + * @return the created Scheduler instance + * @since 2.0.5 - experimental + */ + @Experimental + public static Scheduler createSingleScheduler(ThreadFactory threadFactory) { + return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); + } + /** * Wraps the call to the function in try-catch and propagates thrown * checked exceptions as RuntimeException. diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index fb74da4f3b..c5913b70cf 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -13,13 +13,12 @@ package io.reactivex.schedulers; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; - import io.reactivex.Scheduler; import io.reactivex.internal.schedulers.*; import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.*; + /** * Static factory methods for returning standard Scheduler instances. *

@@ -58,7 +57,7 @@ static final class IoHolder { } static final class NewThreadHolder { - static final Scheduler DEFAULT = NewThreadScheduler.instance(); + static final Scheduler DEFAULT = new NewThreadScheduler(); } static { diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 253dca9995..8d6da37045 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -16,19 +16,6 @@ package io.reactivex.plugins; -import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.*; -import org.reactivestreams.*; - import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; @@ -47,6 +34,17 @@ import io.reactivex.internal.subscriptions.ScalarSubscription; import io.reactivex.observables.ConnectableObservable; import io.reactivex.schedulers.Schedulers; +import org.junit.*; +import org.reactivestreams.*; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.junit.Assert.*; public class RxJavaPluginsTest { @@ -1890,4 +1888,137 @@ public void accept(final Throwable throwable) throws Exception { RxJavaPlugins.reset(); } } + + private static void verifyThread(Scheduler scheduler, String expectedThreadName) + throws AssertionError { + assertNotNull(scheduler); + Worker w = scheduler.createWorker(); + try { + final AtomicReference value = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + + w.schedule(new Runnable() { + @Override + public void run() { + value.set(Thread.currentThread()); + cdl.countDown(); + } + }); + + cdl.await(); + + Thread t = value.get(); + assertNotNull(t); + assertTrue(expectedThreadName.equals(t.getName())); + } catch (Exception e) { + fail(); + } finally { + w.dispose(); + } + } + + @Test + public void testCreateComputationScheduler() { + final String name = "ComputationSchedulerTest"; + ThreadFactory factory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, name); + } + }; + + final Scheduler customScheduler = RxJavaPlugins.createComputationScheduler(factory); + RxJavaPlugins.setComputationSchedulerHandler(new Function() { + @Override + public Scheduler apply(Scheduler scheduler) throws Exception { + return customScheduler; + } + }); + + try { + verifyThread(Schedulers.computation(), name); + } finally { + customScheduler.shutdown(); + RxJavaPlugins.reset(); + } + } + + @Test + public void testCreateIoScheduler() { + final String name = "IoSchedulerTest"; + ThreadFactory factory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, name); + } + }; + + final Scheduler customScheduler = RxJavaPlugins.createIoScheduler(factory); + RxJavaPlugins.setIoSchedulerHandler(new Function() { + @Override + public Scheduler apply(Scheduler scheduler) throws Exception { + return customScheduler; + } + }); + + try { + verifyThread(Schedulers.io(), name); + } finally { + customScheduler.shutdown(); + RxJavaPlugins.reset(); + } + } + + @Test + public void testCreateNewThreadScheduler() { + final String name = "NewThreadSchedulerTest"; + ThreadFactory factory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, name); + } + }; + + final Scheduler customScheduler = RxJavaPlugins.createNewThreadScheduler(factory); + RxJavaPlugins.setNewThreadSchedulerHandler(new Function() { + @Override + public Scheduler apply(Scheduler scheduler) throws Exception { + return customScheduler; + } + }); + + try { + verifyThread(Schedulers.newThread(), name); + } finally { + customScheduler.shutdown(); + RxJavaPlugins.reset(); + } + } + + @Test + public void testCreateSingleScheduler() { + final String name = "SingleSchedulerTest"; + ThreadFactory factory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, name); + } + }; + + final Scheduler customScheduler = RxJavaPlugins.createSingleScheduler(factory); + + RxJavaPlugins.setSingleSchedulerHandler(new Function() { + @Override + public Scheduler apply(Scheduler scheduler) throws Exception { + return customScheduler; + } + }); + + try { + verifyThread(Schedulers.single(), name); + } finally { + customScheduler.shutdown(); + RxJavaPlugins.reset(); + } + } }