Skip to content

Commit b71978b

Browse files
Introduce WorkStealingDispatcher.
1 parent 72b0e22 commit b71978b

File tree

6 files changed

+862
-0
lines changed

6 files changed

+862
-0
lines changed

workflow-runtime/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ kotlin {
1717
if (targets == "kmp" || targets == "js") {
1818
js(IR) { browser() }
1919
}
20+
21+
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes")
2022
}
2123

2224
dependencies {
2325
commonMainApi(project(":workflow-core"))
2426
commonMainApi(libs.kotlinx.coroutines.core)
27+
commonMainImplementation("androidx.collection:collection:1.5.0")
2528

2629
commonTestImplementation(libs.kotlinx.coroutines.test.common)
2730
commonTestImplementation(libs.kotlin.test.core)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.squareup.workflow1.internal
2+
3+
internal expect class Lock()
4+
5+
internal expect inline fun <R> Lock.withLock(block: () -> R): R
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
package com.squareup.workflow1.internal
2+
3+
import androidx.collection.MutableObjectList
4+
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
5+
import kotlinx.coroutines.CoroutineDispatcher
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.Runnable
9+
import kotlin.concurrent.Volatile
10+
import kotlin.coroutines.Continuation
11+
import kotlin.coroutines.ContinuationInterceptor
12+
import kotlin.coroutines.CoroutineContext
13+
import kotlin.coroutines.resume
14+
15+
/**
16+
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
17+
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
18+
*
19+
* The easiest way to create one is by calling [wrapDispatcherFrom].
20+
*
21+
* E.g.
22+
* ```
23+
* val dispatcher = WorkStealingDispatcher.wrappingDispatcherFrom(scope.coroutineContext)
24+
* scope.launch(dispatcher) {
25+
* while (true) {
26+
* lots()
27+
* of()
28+
* suspending()
29+
* calls()
30+
* }
31+
* }
32+
* …
33+
* dispatcher.advanceUntilIdle()
34+
* ```
35+
*
36+
* @param delegateInterceptor The [CoroutineDispatcher] or other [ContinuationInterceptor] to
37+
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
38+
* behavior will be preserved transparently.
39+
*/
40+
internal class WorkStealingDispatcher(
41+
private val delegateInterceptor: ContinuationInterceptor
42+
) : CoroutineDispatcher() {
43+
44+
companion object {
45+
/**
46+
* The initial storage capacity for the task queue. We use a small queue capacity since in most
47+
* cases the queue should be processed very soon after enqueuing.
48+
*/
49+
private const val INITIAL_QUEUE_CAPACITY = 3
50+
51+
/**
52+
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
53+
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
54+
*/
55+
fun wrapDispatcherFrom(context: CoroutineContext): WorkStealingDispatcher {
56+
// If there's no dispatcher in the context then the coroutines runtime will fall back to
57+
// Dispatchers.Default anyway.
58+
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
59+
return WorkStealingDispatcher(delegateInterceptor = baseDispatcher)
60+
}
61+
}
62+
63+
/** Used to synchronize access to the mutable properties of this class. */
64+
private val lock = Lock()
65+
66+
// region Access to these properties must always be synchronized with lock.
67+
/**
68+
* The queue of unconsumed work items. When there is no contention on the dispatcher, only one
69+
* queue will ever be allocated. Only when [dispatch] is called while the queue is being processed
70+
* (either by [advanceUntilIdle] or a [DispatchContinuation]) then a new queue will be allocated,
71+
* but when the processing is done the old one will be placed back here to be re-used.
72+
*/
73+
@Volatile
74+
private var queue: MutableObjectList<Runnable>? = null
75+
76+
@Volatile
77+
private var dispatchScheduled = false
78+
79+
/**
80+
* Cached [DispatchContinuation] used to delegate to the [delegateInterceptor]'s dispatching
81+
* behavior from [dispatch]. This is initialized the first call to [dispatch] that needs dispatch,
82+
* and then never changed.
83+
*/
84+
@Volatile
85+
private var dispatchContinuation: DispatchContinuation? = null
86+
// endregion
87+
88+
/**
89+
* Always returns true since we always need to track what work is waiting so we can advance it.
90+
*/
91+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
92+
93+
override fun dispatch(
94+
context: CoroutineContext,
95+
block: Runnable
96+
) {
97+
var continuation: DispatchContinuation? = null
98+
lock.withLock {
99+
val queue = this.queue
100+
?: MutableObjectList<Runnable>(initialCapacity = INITIAL_QUEUE_CAPACITY)
101+
.also { this.queue = it }
102+
queue += block
103+
println("OMG dispatch: queue size = ${queue.size}")
104+
105+
// If no dispatch is currently scheduled, then flag that we're handling it, and schedule one
106+
// outside the critical section.
107+
if (!dispatchScheduled) {
108+
println("OMG scheduling dispatch")
109+
dispatchScheduled = true
110+
continuation = dispatchContinuation ?: DispatchContinuation()
111+
.also { dispatchContinuation = it }
112+
}
113+
}
114+
115+
// Trampoline the dispatch outside the critical section to avoid deadlocks.
116+
// This will either synchronously run block or dispatch it, depending on what resuming a
117+
// continuation on the delegate dispatcher would do.
118+
continuation?.resumeOnDelegateDispatcher()
119+
}
120+
121+
/**
122+
* Throws [UnsupportedOperationException]. We can't allow the default implementation to run in
123+
* case the delegate dispatcher would throw.
124+
*/
125+
@ExperimentalCoroutinesApi
126+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
127+
// We could probably support this by forwarding the call to the delegate then wrapping that
128+
// dispatcher with a WSD that advances when this one does, but we don't need this for our use
129+
// cases and getting all the behavior correct might be hard, so don't bother for now.
130+
throw UnsupportedOperationException(
131+
"limitedParallelism is not supported for WorkStealingDispatcher"
132+
)
133+
}
134+
135+
/**
136+
* "Steals" work that was scheduled on this dispatcher but hasn't had a chance to run yet and runs
137+
* it, until there is no work left to do. If the work schedules more work, that will also be ran
138+
* before the method returns.
139+
*
140+
* Some care must be taken with this method: if it is called from a coroutine that is dispatched
141+
* by this dispatcher, then it may run inside the stack frame of another call and in that case the
142+
* inner call may think the dispatcher is idle when the outer call still has more tasks to run.
143+
*
144+
* It is technically safe to call from multiple threads, even in parallel, although the behavior
145+
* is undefined. E.g. One thread might return from this method before the other has finished
146+
* running all tasks.
147+
*/
148+
// If we need a strong guarantee for reentrant calls we could use a ThreadLocal so the inner call
149+
// could steal work from the outer one.
150+
// If we need a strong guarantee for calling from multiple threads we could just run this method
151+
// with a separate lock so all threads would just wait on the first one to finish running, but
152+
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
153+
fun advanceUntilIdle() {
154+
var wasDispatchScheduled = false
155+
advanceUntilIdle(
156+
onStartLocked = {
157+
// If no dispatch was scheduled, then set the flag so that any new dispatch calls that
158+
// happen while we're draining the queue won't schedule one unnecessarily since we'll just
159+
// handle them.
160+
// Note that we could "cancel" the dispatch if this is true here, since we're stealing all
161+
// its work, but we can't cancel that task so it will just have to noop.
162+
wasDispatchScheduled = dispatchScheduled.also {
163+
if (!it) dispatchScheduled = true
164+
}
165+
},
166+
onFinishedLocked = {
167+
// If we set this flag above, then clear it now so future dispatch calls schedule normally.
168+
dispatchScheduled = wasDispatchScheduled
169+
}
170+
)
171+
}
172+
173+
/**
174+
* Executes queued work items until there are none left, then returns.
175+
*
176+
* @param onStartLocked Called while [lock] is held exactly 1 time before any tasks are executed.
177+
* @param onFinishedLocked Called while [lock] is held exactly 1 time after all tasks are finished
178+
* executing.
179+
*/
180+
private inline fun advanceUntilIdle(
181+
onStartLocked: () -> Unit = {},
182+
onFinishedLocked: () -> Unit
183+
) {
184+
var queueToDrain: MutableObjectList<Runnable>? = null
185+
do {
186+
lock.withLock {
187+
// Will only be null on first loop, since if it's null after this critical section we'll
188+
// exit the loop.
189+
if (queueToDrain == null) {
190+
onStartLocked()
191+
}
192+
193+
// We're about to overwrite queueToDrain, so put the old one back so future calls to
194+
// dispatch might not need to allocate a new queue.
195+
queueToDrain = consumeQueueLocked(queueToRecycle = queueToDrain).also {
196+
if (it == null) {
197+
onFinishedLocked()
198+
}
199+
}
200+
}
201+
202+
// Drain the queue outside the critical section to ensure we don't deadlock if any of the
203+
// runnables try to dispatch.
204+
println("OMG draining queue of size ${queueToDrain?.size}")
205+
queueToDrain?.runAll()
206+
} while (queueToDrain != null)
207+
}
208+
209+
/**
210+
* If there are work items queued up, returns the queue, otherwise returns null. MUST ONLY BE
211+
* CALLED while [lock] is held.
212+
*
213+
* If [queueToRecycle] is non-null then we try to place it back in the [queue] property for the
214+
* next call to [dispatch] (after clearing it) so it won't have to allocate a new one. After this
215+
* method returns [queueToRecycle] is unsafe to use for the calling code since it might be
216+
* modified by another thread.
217+
*/
218+
private fun consumeQueueLocked(
219+
queueToRecycle: MutableObjectList<Runnable>?
220+
): MutableObjectList<Runnable>? {
221+
if (queueToRecycle != null && queueToRecycle === queue) {
222+
throw IllegalArgumentException("Cannot recycle queue with itself")
223+
}
224+
225+
// Note: clear() iterates through the list to null everything out, so avoid calling it unless
226+
// necessary.
227+
val queue = this.queue
228+
return when {
229+
queue == null -> {
230+
// The next dispatch would allocate a new queue, so recycle one if possible.
231+
println("OMG recycling queue to null")
232+
this.queue = queueToRecycle?.apply { clear() }
233+
null
234+
}
235+
236+
queue.isEmpty() -> {
237+
// There's nothing to process in an empty queue, so don't return it at all. And since the
238+
// next dispatch call already has a queue to use, so just let the recycled one be GC'd and
239+
// don't bother clearing it.
240+
println("OMG queue is empty, not recycling")
241+
null
242+
}
243+
244+
else -> {
245+
// There are queued tasks, so return the current queue and replace it with the recycled one.
246+
println("OMG recycling queue to non-null")
247+
queue.also {
248+
this.queue = queueToRecycle?.apply { clear() }
249+
}
250+
}
251+
}
252+
}
253+
254+
private fun MutableObjectList<Runnable>.runAll() {
255+
forEach {
256+
it.run()
257+
}
258+
}
259+
260+
/**
261+
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for
262+
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information
263+
* on how this works.
264+
*
265+
* [WorkStealingDispatcher] guarantees that only one instance of this class will be created per
266+
* dispatcher, and that it will never be resumed more than once concurrently, so it's safe to
267+
* reuse.
268+
*/
269+
private inner class DispatchContinuation : Continuation<Unit> {
270+
override val context: CoroutineContext get() = delegateInterceptor
271+
272+
/**
273+
* Cache for intercepted coroutine so we can release it from [resumeWith].
274+
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
275+
* is done, so we don't need to guard this property with a lock.
276+
*/
277+
private var intercepted: Continuation<Unit>? = null
278+
279+
/**
280+
* Resumes this continuation on [delegateInterceptor] by intercepting it and resuming the
281+
* intercepted continuation. When a dispatcher returns false from [isDispatchNeeded], then when
282+
* continuations intercepted by it are resumed, they may either be ran in-place or scheduled to
283+
* a special thread-local queue. The only way to access this queue is to have the dispatcher
284+
* intercept a continuation and resume the intercepted continuation.
285+
*/
286+
fun resumeOnDelegateDispatcher() {
287+
val intercepted = delegateInterceptor.interceptContinuation(this).also {
288+
this.intercepted = it
289+
}
290+
291+
// If delegate is a CoroutineDispatcher, intercepted will be a special Continuation that will
292+
// check the delegate's isDispatchNeeded to decide whether to call dispatch() or to enqueue it
293+
// to the thread-local unconfined queue.
294+
intercepted.resume(Unit)
295+
}
296+
297+
/**
298+
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
299+
*/
300+
override fun resumeWith(result: Result<Unit>) {
301+
println("OMG DispatchContinuation resuming")
302+
intercepted?.let {
303+
if (it !== this) {
304+
delegateInterceptor.releaseInterceptedContinuation(it)
305+
}
306+
intercepted = null
307+
}
308+
309+
println("OMG DispatchContinuation draining queue")
310+
advanceUntilIdle(onFinishedLocked = {
311+
// Set this in the lock when we're about to return so that any dispatch calls waiting
312+
// on the lock will know to schedule a fresh dispatch.
313+
dispatchScheduled = false
314+
})
315+
println("OMG DispatchContinuation done draining queue")
316+
}
317+
}
318+
}

0 commit comments

Comments
 (0)