Skip to content

Fixed schedule race and task retention with ExecutorScheduler. #2907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 55 additions & 71 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
*/
package rx.schedulers;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Scheduler;
import rx.Subscription;

import rx.*;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.*;

/**
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
Expand Down Expand Up @@ -58,12 +51,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
// TODO: use a better performing structure for task tracking
final CompositeSubscription tasks;
// TODO: use MpscLinkedQueue once available
final ConcurrentLinkedQueue<ExecutorAction> queue;
final ConcurrentLinkedQueue<ScheduledAction> queue;
final AtomicInteger wip;

public ExecutorSchedulerWorker(Executor executor) {
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
this.wip = new AtomicInteger();
this.tasks = new CompositeSubscription();
}
Expand All @@ -73,11 +66,15 @@ public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ExecutorAction ea = new ExecutorAction(action, tasks);
ScheduledAction ea = new ScheduledAction(action, tasks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we include a reference to queue in the ScheduledAction so that if it is unsubscribed it can remove itself from the queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that is that remove from queue is O(n) and we'd pay it most of the time (once dequeued, remove will traverse the entire queue and not find it) because we can't distinguish between the normal completion calling unsubscribe and external unsubscribe.

tasks.add(ea);
queue.offer(ea);
if (wip.getAndIncrement() == 0) {
try {
// note that since we schedule the emission of potentially multiple tasks
// there is no clear way to cancel this schedule from individual tasks
// so even if executor is an ExecutorService, we can't associate the future
// returned by submit() with any particular ScheduledAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is somewhat confusing to me. The "potentially multiple tasks" is just that we're scheduling the "this" reference over and over again isn't it?

We are scheduling a single "this" one at a time so that it then drains from the queue to ensure sequential execution.

Thus, the scheduling on the Executor has little to do with any particular Action0. Is that what you're saying?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the scenario: I submit 2 tasks concurrently. The first one will start the schedule with this. Now let's assume the executor is busy so this is waiting in the pool's queue. If I cancel the 2 tasks, in theory, there is no need to run this anymore but to cancel it, one needs very complicated accounting. So in other terms, the best we can do here is to do the emission loop and check if the particular ScheduledAction is unsubscribed or not. The downside is the long retention of such tasks.

executor.execute(this);
} catch (RejectedExecutionException t) {
// cleanup if rejected
Expand All @@ -96,7 +93,10 @@ public Subscription schedule(Action0 action) {
@Override
public void run() {
do {
queue.poll().run();
ScheduledAction sa = queue.poll();
if (!sa.isUnsubscribed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that's a sad bug we had sitting in there allowing unsubscribed tasks to still execute!

sa.run();
}
} while (wip.decrementAndGet() > 0);
}

Expand All @@ -115,28 +115,54 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
service = GenericScheduledExecutorService.getInstance();
}

final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
// tasks.add(mas); // Needs a removal without unsubscription
mas.set(first);
tasks.add(mas);
final Subscription removeMas = Subscriptions.create(new Action0() {
@Override
public void call() {
tasks.remove(mas);
}
});

try {
Future<?> f = service.schedule(new Runnable() {
@Override
public void run() {
if (mas.isUnsubscribed()) {
return;
}
mas.set(schedule(action));
// tasks.delete(mas); // Needs a removal without unsubscription
ScheduledAction ea = new ScheduledAction(new Action0() {
@Override
public void call() {
if (mas.isUnsubscribed()) {
return;
}
}, delayTime, unit);
mas.set(Subscriptions.from(f));
// schedule the real action untimed
Subscription s2 = schedule(action);
mas.set(s2);
// unless the worker is unsubscribed, we should get a new ScheduledAction
if (s2.getClass() == ScheduledAction.class) {
// when this ScheduledAction completes, we need to remove the
// MAS referencing the whole setup to avoid leaks
((ScheduledAction)s2).add(removeMas);
}
}
});
// This will make sure if ea.call() gets executed before this line
// we don't override the current task in mas.
first.set(ea);
// we don't need to add ea to tasks because it will be tracked through mas/first


try {
Future<?> f = service.schedule(ea, delayTime, unit);
ea.add(f);
} catch (RejectedExecutionException t) {
// report the rejection to plugins
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
throw t;
}

return mas;
/*
* This allows cancelling either the delayed schedule or the actual schedule referenced
* by mas and makes sure mas is removed from the tasks composite to avoid leaks.
*/
return removeMas;
}

@Override
Expand All @@ -150,46 +176,4 @@ public void unsubscribe() {
}

}

/** Runs the actual action and maintains an unsubscription state. */
static final class ExecutorAction implements Runnable, Subscription {
final Action0 actual;
final CompositeSubscription parent;
volatile int unsubscribed;
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");

public ExecutorAction(Action0 actual, CompositeSubscription parent) {
this.actual = actual;
this.parent = parent;
}

@Override
public void run() {
if (isUnsubscribed()) {
return;
}
try {
actual.call();
} catch (Throwable t) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
} finally {
unsubscribe();
}
}
@Override
public boolean isUnsubscribed() {
return unsubscribed != 0;
}

@Override
public void unsubscribe() {
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
parent.remove(this);
}
}

}
}
172 changes: 168 additions & 4 deletions src/test/java/rx/schedulers/ExecutorSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
*/
package rx.schedulers;

import static org.junit.Assert.*;

import java.lang.management.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import rx.Scheduler;
import rx.internal.util.RxThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import rx.*;
import rx.Scheduler.Worker;
import rx.functions.*;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker;

public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {

Expand All @@ -40,4 +48,160 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
System.out.println("Wait before GC");
Thread.sleep(1000);

System.out.println("GC");
System.gc();

Thread.sleep(1000);


MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 500000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
}

memHeap = memoryMXBean.getHeapMemoryUsage();
long after = memHeap.getUsed();
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);

w.unsubscribe();

System.out.println("Wait before second GC");
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);

System.out.println("Second GC");
System.gc();

Thread.sleep(1000);

memHeap = memoryMXBean.getHeapMemoryUsage();
long finish = memHeap.getUsed();
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);

if (finish > initial * 5) {
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
}
}

/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
static final class TestExecutor implements Executor {
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
@Override
public void execute(Runnable command) {
queue.offer(command);
}
public void executeOne() {
Runnable r = queue.poll();
if (r != null) {
r.run();
}
}
public void executeAll() {
Runnable r;
while ((r = queue.poll()) != null) {
r.run();
}
}
}

@Test
public void testCancelledTasksDontRun() {
final AtomicInteger calls = new AtomicInteger();
Action0 task = new Action0() {
@Override
public void call() {
calls.getAndIncrement();
}
};
TestExecutor exec = new TestExecutor();
Scheduler custom = Schedulers.from(exec);
Worker w = custom.createWorker();
try {
Subscription s1 = w.schedule(task);
Subscription s2 = w.schedule(task);
Subscription s3 = w.schedule(task);

s1.unsubscribe();
s2.unsubscribe();
s3.unsubscribe();

exec.executeAll();

assertEquals(0, calls.get());
} finally {
w.unsubscribe();
}
}
@Test
public void testCancelledWorkerDoesntRunTasks() {
final AtomicInteger calls = new AtomicInteger();
Action0 task = new Action0() {
@Override
public void call() {
calls.getAndIncrement();
}
};
TestExecutor exec = new TestExecutor();
Scheduler custom = Schedulers.from(exec);
Worker w = custom.createWorker();
try {
w.schedule(task);
w.schedule(task);
w.schedule(task);
} finally {
w.unsubscribe();
}
exec.executeAll();
assertEquals(0, calls.get());
}
@Test
public void testNoTimedTaskAfterScheduleRetention() throws InterruptedException {
Executor e = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();

w.schedule(Actions.empty(), 1, TimeUnit.MILLISECONDS);

assertTrue(w.tasks.hasSubscriptions());

Thread.sleep(100);

assertFalse(w.tasks.hasSubscriptions());
}

@Test
public void testNoTimedTaskPartRetention() {
Executor e = new Executor() {
@Override
public void execute(Runnable command) {

}
};
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();

Subscription s = w.schedule(Actions.empty(), 1, TimeUnit.DAYS);

assertTrue(w.tasks.hasSubscriptions());

s.unsubscribe();

assertFalse(w.tasks.hasSubscriptions());
}
}