Skip to content

Commit 446b1d8

Browse files
Introduce WorkStealingDispatcher.
1 parent 72b0e22 commit 446b1d8

File tree

9 files changed

+1367
-0
lines changed

9 files changed

+1367
-0
lines changed

build.gradle.kts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import com.squareup.workflow1.buildsrc.shardConnectedCheckTasks
2+
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
23
import org.jetbrains.dokka.gradle.AbstractDokkaLeafTask
34
import java.net.URL
45

@@ -101,6 +102,17 @@ subprojects {
101102
.configureEach { mustRunAfter(tasks.matching { it is Sign }) }
102103
}
103104

105+
subprojects {
106+
tasks.withType(Test::class.java)
107+
.configureEach {
108+
testLogging {
109+
// This prints exception messages and stack traces to the log when tests fail. Makes it a
110+
// lot easier to see what failed in CI. If this gets too noisy, just remove it.
111+
exceptionFormat = FULL
112+
}
113+
}
114+
}
115+
104116
// This task is invoked by the documentation site generator script in the main workflow project (not
105117
// in this repo), which also expects the generated files to be in a specific location. Both the task
106118
// name and destination directory are defined in this script:

workflow-runtime/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ kotlin {
1717
if (targets == "kmp" || targets == "js") {
1818
js(IR) { browser() }
1919
}
20+
21+
// Needed for expect class Lock, which is not public API, so this doesn't add any binary compat
22+
// risk.
23+
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes")
2024
}
2125

2226
dependencies {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.squareup.workflow1.internal
2+
3+
import platform.Foundation.NSLock
4+
5+
internal actual typealias Lock = NSLock
6+
7+
internal actual inline fun <R> Lock.withLock(block: () -> R): R {
8+
lock()
9+
try {
10+
return block()
11+
} finally {
12+
unlock()
13+
}
14+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.squareup.workflow1.internal
2+
3+
internal expect class Lock()
4+
5+
internal expect inline fun <R> Lock.withLock(block: () -> R): R
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package com.squareup.workflow1.internal
2+
3+
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Delay
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.InternalCoroutinesApi
9+
import kotlinx.coroutines.Runnable
10+
import kotlin.coroutines.Continuation
11+
import kotlin.coroutines.ContinuationInterceptor
12+
import kotlin.coroutines.CoroutineContext
13+
import kotlin.coroutines.resume
14+
15+
/**
16+
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
17+
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
18+
*
19+
* The easiest way to create one is by calling [wrapDispatcherFrom].
20+
*
21+
* E.g.
22+
* ```
23+
* val dispatcher = WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
24+
* scope.launch(dispatcher) {
25+
* while (true) {
26+
* lots()
27+
* of()
28+
* suspending()
29+
* calls()
30+
* }
31+
* }
32+
* …
33+
* dispatcher.advanceUntilIdle()
34+
* ```
35+
*
36+
* @param delegateInterceptor The [CoroutineDispatcher] or other [ContinuationInterceptor] to
37+
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
38+
* behavior will be preserved transparently.
39+
*/
40+
internal open class WorkStealingDispatcher protected constructor(
41+
private val delegateInterceptor: ContinuationInterceptor,
42+
lock: Lock?,
43+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
44+
) : CoroutineDispatcher() {
45+
companion object {
46+
/**
47+
* Creates a [WorkStealingDispatcher] that supports [Delay] if [delegateInterceptor] does.
48+
*/
49+
operator fun invoke(delegateInterceptor: ContinuationInterceptor): WorkStealingDispatcher =
50+
createMatchingDelayability(
51+
delegateInterceptor = delegateInterceptor,
52+
lock = null,
53+
queue = null
54+
)
55+
56+
/**
57+
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
58+
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
59+
*/
60+
fun wrapDispatcherFrom(context: CoroutineContext): WorkStealingDispatcher {
61+
// If there's no dispatcher in the context then the coroutines runtime will fall back to
62+
// Dispatchers.Default anyway.
63+
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
64+
return invoke(delegateInterceptor = baseDispatcher)
65+
}
66+
67+
@OptIn(InternalCoroutinesApi::class)
68+
private fun createMatchingDelayability(
69+
delegateInterceptor: ContinuationInterceptor,
70+
lock: Lock?,
71+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
72+
): WorkStealingDispatcher {
73+
return if (delegateInterceptor is Delay) {
74+
DelayableWorkStealingDispatcher(
75+
delegate = delegateInterceptor,
76+
delay = delegateInterceptor,
77+
lock = lock,
78+
queue = queue
79+
)
80+
} else {
81+
WorkStealingDispatcher(
82+
delegateInterceptor = delegateInterceptor,
83+
lock = lock,
84+
queue = queue
85+
)
86+
}
87+
}
88+
}
89+
90+
/** Used to synchronize access to the mutable properties of this class. */
91+
private val lock = lock ?: Lock()
92+
93+
// region Access to these properties must always be synchronized with lock.
94+
private val queue = queue ?: LinkedHashSet()
95+
// endregion
96+
97+
/**
98+
* Always returns true since we always need to track what work is waiting so we can advance it.
99+
*/
100+
final override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
101+
102+
final override fun dispatch(
103+
context: CoroutineContext,
104+
block: Runnable
105+
) {
106+
val continuation = DelegateDispatchedContinuation(context, block)
107+
lock.withLock {
108+
queue += continuation
109+
}
110+
111+
// Trampoline the dispatch outside the critical section to avoid deadlocks.
112+
// This will either synchronously run block or dispatch it, depending on what resuming a
113+
// continuation on the delegate dispatcher would do.
114+
continuation.resumeOnDelegateDispatcher()
115+
}
116+
117+
/**
118+
* Calls [limitedParallelism] on [delegateInterceptor] and wraps the returned dispatcher with
119+
* a [WorkStealingDispatcher] that this instance will steal from.
120+
*
121+
* This satisfies the limited parallelism requirements because [advanceUntilIdle] always runs
122+
* tasks with a parallelism of 1 (i.e. serially).
123+
*/
124+
@ExperimentalCoroutinesApi
125+
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
126+
if (delegateInterceptor !is CoroutineDispatcher) {
127+
throw UnsupportedOperationException(
128+
"limitedParallelism is not supported for WorkStealingDispatcher with " +
129+
"non-dispatcher delegate"
130+
)
131+
}
132+
133+
val limitedDelegate = delegateInterceptor.limitedParallelism(parallelism)
134+
return createMatchingDelayability(
135+
delegateInterceptor = limitedDelegate,
136+
lock = lock,
137+
queue = queue
138+
)
139+
}
140+
141+
/**
142+
* "Steals" work that was scheduled on this dispatcher but hasn't had a chance to run yet and runs
143+
* it, until there is no work left to do. If the work schedules more work, that will also be ran
144+
* before the method returns.
145+
*
146+
* Some care must be taken with this method: if it is called from a coroutine that is dispatched
147+
* by this dispatcher, then it may run inside the stack frame of another call and in that case the
148+
* inner call may think the dispatcher is idle when the outer call still has more tasks to run.
149+
*
150+
* It is technically safe to call from multiple threads, even in parallel, although the behavior
151+
* is undefined. E.g. One thread might return from this method before the other has finished
152+
* running all tasks.
153+
*/
154+
// If we need a strong guarantee for reentrant calls we could use a ThreadLocal so the inner call
155+
// could steal work from the outer one.
156+
// If we need a strong guarantee for calling from multiple threads we could just run this method
157+
// with a separate lock so all threads would just wait on the first one to finish running, but
158+
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
159+
fun advanceUntilIdle() {
160+
do {
161+
val task = nextTask()
162+
task?.releaseAndRun()
163+
} while (task != null)
164+
}
165+
166+
private fun nextTask(): DelegateDispatchedContinuation? {
167+
lock.withLock {
168+
val iterator = queue.iterator()
169+
if (iterator.hasNext()) {
170+
val task = iterator.next()
171+
iterator.remove()
172+
return task
173+
} else {
174+
return null
175+
}
176+
}
177+
}
178+
179+
protected inner class DelegateDispatchedContinuation(
180+
override val context: CoroutineContext,
181+
private val runnable: Runnable
182+
) : Continuation<Unit> {
183+
/**
184+
* Cache for intercepted coroutine so we can release it from [resumeWith].
185+
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
186+
* is done, so we don't need to guard this property with a lock.
187+
*/
188+
private var intercepted: Continuation<Unit>? = null
189+
190+
/**
191+
* Resumes this continuation on [delegateInterceptor] by intercepting it and resuming the
192+
* intercepted continuation. When a dispatcher returns false from [isDispatchNeeded], then when
193+
* continuations intercepted by it are resumed, they may either be ran in-place or scheduled to
194+
* a special thread-local queue. The only way to access this queue is to have the dispatcher
195+
* intercept a continuation and resume the intercepted continuation.
196+
*/
197+
fun resumeOnDelegateDispatcher() {
198+
val intercepted = delegateInterceptor.interceptContinuation(this).also {
199+
this.intercepted = it
200+
}
201+
202+
// If delegate is a CoroutineDispatcher, intercepted will be a special Continuation that will
203+
// check the delegate's isDispatchNeeded to decide whether to call dispatch() or to enqueue it
204+
// to the thread-local unconfined queue.
205+
intercepted.resume(Unit)
206+
}
207+
208+
/**
209+
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
210+
*/
211+
override fun resumeWith(result: Result<Unit>) {
212+
val shouldRun = lock.withLock {
213+
queue.remove(this)
214+
}
215+
216+
if (shouldRun) {
217+
releaseAndRun()
218+
}
219+
}
220+
221+
fun releaseAndRun() {
222+
intercepted?.let {
223+
if (it !== this) {
224+
delegateInterceptor.releaseInterceptedContinuation(it)
225+
}
226+
intercepted = null
227+
}
228+
runnable.run()
229+
}
230+
}
231+
}
232+
233+
@OptIn(InternalCoroutinesApi::class)
234+
private class DelayableWorkStealingDispatcher(
235+
delegate: ContinuationInterceptor,
236+
delay: Delay,
237+
lock: Lock?,
238+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
239+
) : WorkStealingDispatcher(delegate, lock, queue), Delay by delay

0 commit comments

Comments
 (0)