Skip to content

Commit e56709d

Browse files
Introduce WorkStealingDispatcher.
1 parent 72b0e22 commit e56709d

File tree

9 files changed

+1053
-0
lines changed

9 files changed

+1053
-0
lines changed

build.gradle.kts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import com.squareup.workflow1.buildsrc.shardConnectedCheckTasks
2+
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
23
import org.jetbrains.dokka.gradle.AbstractDokkaLeafTask
34
import java.net.URL
45

@@ -101,6 +102,15 @@ subprojects {
101102
.configureEach { mustRunAfter(tasks.matching { it is Sign }) }
102103
}
103104

105+
subprojects {
106+
tasks.withType(Test::class.java)
107+
.configureEach {
108+
testLogging {
109+
exceptionFormat = FULL
110+
}
111+
}
112+
}
113+
104114
// This task is invoked by the documentation site generator script in the main workflow project (not
105115
// in this repo), which also expects the generated files to be in a specific location. Both the task
106116
// name and destination directory are defined in this script:

workflow-runtime/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ kotlin {
1717
if (targets == "kmp" || targets == "js") {
1818
js(IR) { browser() }
1919
}
20+
21+
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes")
2022
}
2123

2224
dependencies {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.squareup.workflow1.internal
2+
3+
import platform.Foundation.NSLock
4+
5+
internal actual typealias Lock = NSLock
6+
7+
internal actual inline fun <R> Lock.withLock(block: () -> R): R {
8+
lock()
9+
try {
10+
return block()
11+
} finally {
12+
unlock()
13+
}
14+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.squareup.workflow1.internal
2+
3+
internal expect class Lock()
4+
5+
internal expect inline fun <R> Lock.withLock(block: () -> R): R
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
package com.squareup.workflow1.internal
2+
3+
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.ExperimentalCoroutinesApi
7+
import kotlinx.coroutines.Runnable
8+
import kotlin.concurrent.Volatile
9+
import kotlin.coroutines.Continuation
10+
import kotlin.coroutines.ContinuationInterceptor
11+
import kotlin.coroutines.CoroutineContext
12+
import kotlin.coroutines.resume
13+
14+
private typealias RunQueue = ArrayList<Runnable>
15+
16+
/**
17+
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
18+
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
19+
*
20+
* The easiest way to create one is by calling [wrapDispatcherFrom].
21+
*
22+
* E.g.
23+
* ```
24+
* val dispatcher = WorkStealingDispatcher.wrappingDispatcherFrom(scope.coroutineContext)
25+
* scope.launch(dispatcher) {
26+
* while (true) {
27+
* lots()
28+
* of()
29+
* suspending()
30+
* calls()
31+
* }
32+
* }
33+
* …
34+
* dispatcher.advanceUntilIdle()
35+
* ```
36+
*
37+
* @param delegateInterceptor The [CoroutineDispatcher] or other [ContinuationInterceptor] to
38+
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
39+
* behavior will be preserved transparently.
40+
*/
41+
internal class WorkStealingDispatcher(
42+
private val delegateInterceptor: ContinuationInterceptor
43+
) : CoroutineDispatcher() {
44+
45+
companion object {
46+
/**
47+
* The initial storage capacity for the task queue. We use a small queue capacity since in most
48+
* cases the queue should be processed very soon after enqueuing.
49+
*/
50+
private const val INITIAL_QUEUE_CAPACITY = 3
51+
52+
/**
53+
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
54+
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
55+
*/
56+
fun wrapDispatcherFrom(context: CoroutineContext): WorkStealingDispatcher {
57+
// If there's no dispatcher in the context then the coroutines runtime will fall back to
58+
// Dispatchers.Default anyway.
59+
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
60+
return WorkStealingDispatcher(delegateInterceptor = baseDispatcher)
61+
}
62+
}
63+
64+
/** Used to synchronize access to the mutable properties of this class. */
65+
private val lock = Lock()
66+
67+
// region Access to these properties must always be synchronized with lock.
68+
/**
69+
* The queue of unconsumed work items. When there is no contention on the dispatcher, only one
70+
* queue will ever be allocated. Only when [dispatch] is called while the queue is being processed
71+
* (either by [advanceUntilIdle] or a [DispatchContinuation]) then a new queue will be allocated,
72+
* but when the processing is done the old one will be placed back here to be re-used.
73+
*/
74+
@Volatile
75+
private var queue: RunQueue? = null
76+
77+
@Volatile
78+
private var dispatchScheduled = false
79+
80+
/**
81+
* Cached [DispatchContinuation] used to delegate to the [delegateInterceptor]'s dispatching
82+
* behavior from [dispatch]. This is initialized the first call to [dispatch] that needs dispatch,
83+
* and then never changed.
84+
*/
85+
@Volatile
86+
private var dispatchContinuation: DispatchContinuation? = null
87+
// endregion
88+
89+
/**
90+
* Always returns true since we always need to track what work is waiting so we can advance it.
91+
*/
92+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
93+
94+
override fun dispatch(
95+
context: CoroutineContext,
96+
block: Runnable
97+
) {
98+
var continuation: DispatchContinuation? = null
99+
lock.withLock {
100+
val queue = this.queue ?: RunQueue(INITIAL_QUEUE_CAPACITY).also { this.queue = it }
101+
queue += block
102+
103+
// If no dispatch is currently scheduled, then flag that we're handling it, and schedule one
104+
// outside the critical section.
105+
if (!dispatchScheduled) {
106+
dispatchScheduled = true
107+
continuation = dispatchContinuation ?: DispatchContinuation()
108+
.also { dispatchContinuation = it }
109+
}
110+
}
111+
112+
// Trampoline the dispatch outside the critical section to avoid deadlocks.
113+
// This will either synchronously run block or dispatch it, depending on what resuming a
114+
// continuation on the delegate dispatcher would do.
115+
continuation?.resumeOnDelegateDispatcher()
116+
}
117+
118+
/**
119+
* Throws [UnsupportedOperationException]. We can't allow the default implementation to run in
120+
* case the delegate dispatcher would throw.
121+
*/
122+
@ExperimentalCoroutinesApi
123+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
124+
// We could probably support this by forwarding the call to the delegate then wrapping that
125+
// dispatcher with a WSD that advances when this one does, but we don't need this for our use
126+
// cases and getting all the behavior correct might be hard, so don't bother for now.
127+
throw UnsupportedOperationException(
128+
"limitedParallelism is not supported for WorkStealingDispatcher"
129+
)
130+
}
131+
132+
/**
133+
* "Steals" work that was scheduled on this dispatcher but hasn't had a chance to run yet and runs
134+
* it, until there is no work left to do. If the work schedules more work, that will also be ran
135+
* before the method returns.
136+
*
137+
* Some care must be taken with this method: if it is called from a coroutine that is dispatched
138+
* by this dispatcher, then it may run inside the stack frame of another call and in that case the
139+
* inner call may think the dispatcher is idle when the outer call still has more tasks to run.
140+
*
141+
* It is technically safe to call from multiple threads, even in parallel, although the behavior
142+
* is undefined. E.g. One thread might return from this method before the other has finished
143+
* running all tasks.
144+
*/
145+
// If we need a strong guarantee for reentrant calls we could use a ThreadLocal so the inner call
146+
// could steal work from the outer one.
147+
// If we need a strong guarantee for calling from multiple threads we could just run this method
148+
// with a separate lock so all threads would just wait on the first one to finish running, but
149+
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
150+
fun advanceUntilIdle() {
151+
var wasDispatchScheduled = false
152+
advanceUntilIdle(
153+
onStartLocked = {
154+
// If no dispatch was scheduled, then set the flag so that any new dispatch calls that
155+
// happen while we're draining the queue won't schedule one unnecessarily since we'll just
156+
// handle them.
157+
// Note that we could "cancel" the dispatch if this is true here, since we're stealing all
158+
// its work, but we can't cancel that task so it will just have to noop.
159+
wasDispatchScheduled = dispatchScheduled.also {
160+
if (!it) dispatchScheduled = true
161+
}
162+
},
163+
onFinishedLocked = {
164+
// If we set this flag above, then clear it now so future dispatch calls schedule normally.
165+
dispatchScheduled = wasDispatchScheduled
166+
}
167+
)
168+
}
169+
170+
/**
171+
* Executes queued work items until there are none left, then returns.
172+
*
173+
* @param onStartLocked Called while [lock] is held exactly 1 time before any tasks are executed.
174+
* @param onFinishedLocked Called while [lock] is held exactly 1 time after all tasks are finished
175+
* executing.
176+
*/
177+
private inline fun advanceUntilIdle(
178+
onStartLocked: () -> Unit = {},
179+
onFinishedLocked: () -> Unit
180+
) {
181+
var queueToDrain: RunQueue? = null
182+
do {
183+
lock.withLock {
184+
// Will only be null on first loop, since if it's null after this critical section we'll
185+
// exit the loop.
186+
if (queueToDrain == null) {
187+
onStartLocked()
188+
}
189+
190+
// We're about to overwrite queueToDrain, so put the old one back so future calls to
191+
// dispatch might not need to allocate a new queue.
192+
queueToDrain = consumeQueueLocked(queueToRecycle = queueToDrain).also {
193+
if (it == null) {
194+
onFinishedLocked()
195+
}
196+
}
197+
}
198+
199+
// Drain the queue outside the critical section to ensure we don't deadlock if any of the
200+
// runnables try to dispatch.
201+
queueToDrain?.runAll()
202+
} while (queueToDrain != null)
203+
}
204+
205+
/**
206+
* If there are work items queued up, returns the queue, otherwise returns null. MUST ONLY BE
207+
* CALLED while [lock] is held.
208+
*
209+
* If [queueToRecycle] is non-null then we try to place it back in the [queue] property for the
210+
* next call to [dispatch] (after clearing it) so it won't have to allocate a new one. After this
211+
* method returns [queueToRecycle] is unsafe to use for the calling code since it might be
212+
* modified by another thread.
213+
*/
214+
private fun consumeQueueLocked(queueToRecycle: RunQueue?): RunQueue? {
215+
if (queueToRecycle != null && queueToRecycle === queue) {
216+
throw IllegalArgumentException("Cannot recycle queue with itself")
217+
}
218+
219+
// Note: clear() iterates through the list to null everything out, so avoid calling it unless
220+
// necessary.
221+
val queue = this.queue
222+
return when {
223+
queue == null -> {
224+
// The next dispatch would allocate a new queue, so recycle one if possible.
225+
this.queue = queueToRecycle?.apply { clear() }
226+
null
227+
}
228+
229+
queue.isEmpty() -> {
230+
// There's nothing to process in an empty queue, so don't return it at all. And since the
231+
// next dispatch call already has a queue to use, so just let the recycled one be GC'd and
232+
// don't bother clearing it.
233+
null
234+
}
235+
236+
else -> {
237+
// There are queued tasks, so return the current queue and replace it with the recycled one.
238+
queue.also {
239+
this.queue = queueToRecycle?.apply { clear() }
240+
}
241+
}
242+
}
243+
}
244+
245+
private fun RunQueue.runAll() {
246+
forEach {
247+
it.run()
248+
}
249+
}
250+
251+
/**
252+
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for
253+
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information
254+
* on how this works.
255+
*
256+
* [WorkStealingDispatcher] guarantees that only one instance of this class will be created per
257+
* dispatcher, and that it will never be resumed more than once concurrently, so it's safe to
258+
* reuse.
259+
*/
260+
private inner class DispatchContinuation : Continuation<Unit> {
261+
override val context: CoroutineContext get() = delegateInterceptor
262+
263+
/**
264+
* Cache for intercepted coroutine so we can release it from [resumeWith].
265+
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
266+
* is done, so we don't need to guard this property with a lock.
267+
*/
268+
private var intercepted: Continuation<Unit>? = null
269+
270+
/**
271+
* Resumes this continuation on [delegateInterceptor] by intercepting it and resuming the
272+
* intercepted continuation. When a dispatcher returns false from [isDispatchNeeded], then when
273+
* continuations intercepted by it are resumed, they may either be ran in-place or scheduled to
274+
* a special thread-local queue. The only way to access this queue is to have the dispatcher
275+
* intercept a continuation and resume the intercepted continuation.
276+
*/
277+
fun resumeOnDelegateDispatcher() {
278+
val intercepted = delegateInterceptor.interceptContinuation(this).also {
279+
this.intercepted = it
280+
}
281+
282+
// If delegate is a CoroutineDispatcher, intercepted will be a special Continuation that will
283+
// check the delegate's isDispatchNeeded to decide whether to call dispatch() or to enqueue it
284+
// to the thread-local unconfined queue.
285+
intercepted.resume(Unit)
286+
}
287+
288+
/**
289+
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
290+
*/
291+
override fun resumeWith(result: Result<Unit>) {
292+
intercepted?.let {
293+
if (it !== this) {
294+
delegateInterceptor.releaseInterceptedContinuation(it)
295+
}
296+
intercepted = null
297+
}
298+
299+
advanceUntilIdle(onFinishedLocked = {
300+
// Set this in the lock when we're about to return so that any dispatch calls waiting
301+
// on the lock will know to schedule a fresh dispatch.
302+
dispatchScheduled = false
303+
})
304+
}
305+
}
306+
}

0 commit comments

Comments
 (0)