Skip to content

Commit d606d8c

Browse files
WIP rewrite to support parallel delegates, Delay
1 parent e56709d commit d606d8c

File tree

3 files changed

+104
-155
lines changed

3 files changed

+104
-155
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
@file:Suppress(
2+
"CANNOT_OVERRIDE_INVISIBLE_MEMBER",
3+
"INVISIBLE_MEMBER",
4+
"INVISIBLE_REFERENCE",
5+
)
6+
7+
package com.squareup.workflow1.internal
8+
9+
import kotlinx.coroutines.CoroutineDispatcher
10+
import kotlinx.coroutines.DefaultDelay
11+
import kotlinx.coroutines.Delay
12+
import kotlinx.coroutines.InternalCoroutinesApi
13+
import kotlin.coroutines.ContinuationInterceptor
14+
15+
/**
16+
* Ugly, terrible, no-good hack to support propagating [Delay] implementations to custom
17+
* [CoroutineDispatcher]s without needing them to do all the internal suppressions we need to make
18+
* this work.
19+
*
20+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/4476.
21+
*/
22+
@OptIn(InternalCoroutinesApi::class)
23+
internal abstract class DelayPropagatingCoroutineDispatcher(
24+
private val delegate: ContinuationInterceptor
25+
) : CoroutineDispatcher(), Delay by (delegate as? Delay ?: DefaultDelay)

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt

Lines changed: 33 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import kotlinx.coroutines.CoroutineDispatcher
55
import kotlinx.coroutines.Dispatchers
66
import kotlinx.coroutines.ExperimentalCoroutinesApi
77
import kotlinx.coroutines.Runnable
8-
import kotlin.concurrent.Volatile
98
import kotlin.coroutines.Continuation
109
import kotlin.coroutines.ContinuationInterceptor
1110
import kotlin.coroutines.CoroutineContext
@@ -21,7 +20,7 @@ private typealias RunQueue = ArrayList<Runnable>
2120
*
2221
* E.g.
2322
* ```
24-
* val dispatcher = WorkStealingDispatcher.wrappingDispatcherFrom(scope.coroutineContext)
23+
* val dispatcher = WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
2524
* scope.launch(dispatcher) {
2625
* while (true) {
2726
* lots()
@@ -40,15 +39,9 @@ private typealias RunQueue = ArrayList<Runnable>
4039
*/
4140
internal class WorkStealingDispatcher(
4241
private val delegateInterceptor: ContinuationInterceptor
43-
) : CoroutineDispatcher() {
42+
) : DelayPropagatingCoroutineDispatcher(delegateInterceptor) {
4443

4544
companion object {
46-
/**
47-
* The initial storage capacity for the task queue. We use a small queue capacity since in most
48-
* cases the queue should be processed very soon after enqueuing.
49-
*/
50-
private const val INITIAL_QUEUE_CAPACITY = 3
51-
5245
/**
5346
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
5447
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
@@ -65,25 +58,7 @@ internal class WorkStealingDispatcher(
6558
private val lock = Lock()
6659

6760
// region Access to these properties must always be synchronized with lock.
68-
/**
69-
* The queue of unconsumed work items. When there is no contention on the dispatcher, only one
70-
* queue will ever be allocated. Only when [dispatch] is called while the queue is being processed
71-
* (either by [advanceUntilIdle] or a [DispatchContinuation]) then a new queue will be allocated,
72-
* but when the processing is done the old one will be placed back here to be re-used.
73-
*/
74-
@Volatile
75-
private var queue: RunQueue? = null
76-
77-
@Volatile
78-
private var dispatchScheduled = false
79-
80-
/**
81-
* Cached [DispatchContinuation] used to delegate to the [delegateInterceptor]'s dispatching
82-
* behavior from [dispatch]. This is initialized the first call to [dispatch] that needs dispatch,
83-
* and then never changed.
84-
*/
85-
@Volatile
86-
private var dispatchContinuation: DispatchContinuation? = null
61+
private val queue = LinkedHashSet<DelegateDispatchedContinuation>()
8762
// endregion
8863

8964
/**
@@ -95,24 +70,15 @@ internal class WorkStealingDispatcher(
9570
context: CoroutineContext,
9671
block: Runnable
9772
) {
98-
var continuation: DispatchContinuation? = null
73+
val continuation = DelegateDispatchedContinuation(context, block)
9974
lock.withLock {
100-
val queue = this.queue ?: RunQueue(INITIAL_QUEUE_CAPACITY).also { this.queue = it }
101-
queue += block
102-
103-
// If no dispatch is currently scheduled, then flag that we're handling it, and schedule one
104-
// outside the critical section.
105-
if (!dispatchScheduled) {
106-
dispatchScheduled = true
107-
continuation = dispatchContinuation ?: DispatchContinuation()
108-
.also { dispatchContinuation = it }
109-
}
75+
queue += continuation
11076
}
11177

11278
// Trampoline the dispatch outside the critical section to avoid deadlocks.
11379
// This will either synchronously run block or dispatch it, depending on what resuming a
11480
// continuation on the delegate dispatcher would do.
115-
continuation?.resumeOnDelegateDispatcher()
81+
continuation.resumeOnDelegateDispatcher()
11682
}
11783

11884
/**
@@ -148,118 +114,29 @@ internal class WorkStealingDispatcher(
148114
// with a separate lock so all threads would just wait on the first one to finish running, but
149115
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
150116
fun advanceUntilIdle() {
151-
var wasDispatchScheduled = false
152-
advanceUntilIdle(
153-
onStartLocked = {
154-
// If no dispatch was scheduled, then set the flag so that any new dispatch calls that
155-
// happen while we're draining the queue won't schedule one unnecessarily since we'll just
156-
// handle them.
157-
// Note that we could "cancel" the dispatch if this is true here, since we're stealing all
158-
// its work, but we can't cancel that task so it will just have to noop.
159-
wasDispatchScheduled = dispatchScheduled.also {
160-
if (!it) dispatchScheduled = true
161-
}
162-
},
163-
onFinishedLocked = {
164-
// If we set this flag above, then clear it now so future dispatch calls schedule normally.
165-
dispatchScheduled = wasDispatchScheduled
166-
}
167-
)
168-
}
169-
170-
/**
171-
* Executes queued work items until there are none left, then returns.
172-
*
173-
* @param onStartLocked Called while [lock] is held exactly 1 time before any tasks are executed.
174-
* @param onFinishedLocked Called while [lock] is held exactly 1 time after all tasks are finished
175-
* executing.
176-
*/
177-
private inline fun advanceUntilIdle(
178-
onStartLocked: () -> Unit = {},
179-
onFinishedLocked: () -> Unit
180-
) {
181-
var queueToDrain: RunQueue? = null
182117
do {
183-
lock.withLock {
184-
// Will only be null on first loop, since if it's null after this critical section we'll
185-
// exit the loop.
186-
if (queueToDrain == null) {
187-
onStartLocked()
188-
}
189-
190-
// We're about to overwrite queueToDrain, so put the old one back so future calls to
191-
// dispatch might not need to allocate a new queue.
192-
queueToDrain = consumeQueueLocked(queueToRecycle = queueToDrain).also {
193-
if (it == null) {
194-
onFinishedLocked()
195-
}
196-
}
197-
}
198-
199-
// Drain the queue outside the critical section to ensure we don't deadlock if any of the
200-
// runnables try to dispatch.
201-
queueToDrain?.runAll()
202-
} while (queueToDrain != null)
118+
val task = nextTask()
119+
task?.releaseAndRun()
120+
} while (task != null)
203121
}
204122

205-
/**
206-
* If there are work items queued up, returns the queue, otherwise returns null. MUST ONLY BE
207-
* CALLED while [lock] is held.
208-
*
209-
* If [queueToRecycle] is non-null then we try to place it back in the [queue] property for the
210-
* next call to [dispatch] (after clearing it) so it won't have to allocate a new one. After this
211-
* method returns [queueToRecycle] is unsafe to use for the calling code since it might be
212-
* modified by another thread.
213-
*/
214-
private fun consumeQueueLocked(queueToRecycle: RunQueue?): RunQueue? {
215-
if (queueToRecycle != null && queueToRecycle === queue) {
216-
throw IllegalArgumentException("Cannot recycle queue with itself")
217-
}
218-
219-
// Note: clear() iterates through the list to null everything out, so avoid calling it unless
220-
// necessary.
221-
val queue = this.queue
222-
return when {
223-
queue == null -> {
224-
// The next dispatch would allocate a new queue, so recycle one if possible.
225-
this.queue = queueToRecycle?.apply { clear() }
226-
null
227-
}
228-
229-
queue.isEmpty() -> {
230-
// There's nothing to process in an empty queue, so don't return it at all. And since the
231-
// next dispatch call already has a queue to use, so just let the recycled one be GC'd and
232-
// don't bother clearing it.
233-
null
234-
}
235-
236-
else -> {
237-
// There are queued tasks, so return the current queue and replace it with the recycled one.
238-
queue.also {
239-
this.queue = queueToRecycle?.apply { clear() }
240-
}
123+
private fun nextTask(): DelegateDispatchedContinuation? {
124+
lock.withLock {
125+
val iterator = queue.iterator()
126+
if (iterator.hasNext()) {
127+
val task = iterator.next()
128+
iterator.remove()
129+
return task
130+
} else {
131+
return null
241132
}
242133
}
243134
}
244135

245-
private fun RunQueue.runAll() {
246-
forEach {
247-
it.run()
248-
}
249-
}
250-
251-
/**
252-
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for
253-
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information
254-
* on how this works.
255-
*
256-
* [WorkStealingDispatcher] guarantees that only one instance of this class will be created per
257-
* dispatcher, and that it will never be resumed more than once concurrently, so it's safe to
258-
* reuse.
259-
*/
260-
private inner class DispatchContinuation : Continuation<Unit> {
261-
override val context: CoroutineContext get() = delegateInterceptor
262-
136+
private inner class DelegateDispatchedContinuation(
137+
override val context: CoroutineContext,
138+
private val runnable: Runnable
139+
) : Continuation<Unit> {
263140
/**
264141
* Cache for intercepted coroutine so we can release it from [resumeWith].
265142
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
@@ -289,18 +166,23 @@ internal class WorkStealingDispatcher(
289166
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
290167
*/
291168
override fun resumeWith(result: Result<Unit>) {
169+
val shouldRun = lock.withLock {
170+
queue.remove(this)
171+
}
172+
173+
if (shouldRun) {
174+
releaseAndRun()
175+
}
176+
}
177+
178+
fun releaseAndRun() {
292179
intercepted?.let {
293180
if (it !== this) {
294181
delegateInterceptor.releaseInterceptedContinuation(it)
295182
}
296183
intercepted = null
297184
}
298-
299-
advanceUntilIdle(onFinishedLocked = {
300-
// Set this in the lock when we're about to return so that any dispatch calls waiting
301-
// on the lock will know to schedule a fresh dispatch.
302-
dispatchScheduled = false
303-
})
185+
runnable.run()
304186
}
305187
}
306188
}

workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcherTest.kt

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import kotlinx.coroutines.Dispatchers
55
import kotlinx.coroutines.ExperimentalCoroutinesApi
66
import kotlinx.coroutines.Runnable
77
import kotlinx.coroutines.currentCoroutineContext
8+
import kotlinx.coroutines.delay
89
import kotlinx.coroutines.launch
910
import kotlinx.coroutines.test.StandardTestDispatcher
1011
import kotlinx.coroutines.test.runTest
@@ -352,7 +353,7 @@ class WorkStealingDispatcherTest {
352353
}
353354

354355
@Test fun integration_confined_whenAdvanceUntilIdle() = runTest {
355-
val testDispatcher = StandardTestDispatcher()
356+
val testDispatcher = StandardTestDispatcher(testScheduler)
356357
val dispatcher = WorkStealingDispatcher(testDispatcher)
357358

358359
expect(0)
@@ -366,7 +367,7 @@ class WorkStealingDispatcherTest {
366367
}
367368

368369
@Test fun integration_confined_whenDispatched() = runTest {
369-
val testDispatcher = StandardTestDispatcher()
370+
val testDispatcher = StandardTestDispatcher(testScheduler)
370371
val dispatcher = WorkStealingDispatcher(testDispatcher)
371372

372373
expect(0)
@@ -380,7 +381,7 @@ class WorkStealingDispatcherTest {
380381
}
381382

382383
@Test fun integration_yield_whenAdvanceUntilIdle() = runTest {
383-
val testDispatcher = StandardTestDispatcher()
384+
val testDispatcher = StandardTestDispatcher(testScheduler)
384385
val dispatcher = WorkStealingDispatcher(testDispatcher)
385386

386387
launch(dispatcher) {
@@ -399,7 +400,7 @@ class WorkStealingDispatcherTest {
399400
}
400401

401402
@Test fun integration_yield_whenDispatched() = runTest {
402-
val testDispatcher = StandardTestDispatcher()
403+
val testDispatcher = StandardTestDispatcher(testScheduler)
403404
val dispatcher = WorkStealingDispatcher(testDispatcher)
404405

405406
launch(dispatcher) {
@@ -417,6 +418,47 @@ class WorkStealingDispatcherTest {
417418
expect(4)
418419
}
419420

421+
@Test fun integration_delay_whenAdvanceUntilIdle() = runTest {
422+
val testDispatcher = StandardTestDispatcher(testScheduler)
423+
val dispatcher = WorkStealingDispatcher(testDispatcher)
424+
425+
launch(dispatcher) {
426+
expect(0)
427+
delay(20)
428+
expect(4)
429+
}
430+
launch(dispatcher) {
431+
expect(1)
432+
delay(10)
433+
expect(3)
434+
}
435+
436+
dispatcher.advanceUntilIdle()
437+
expect(2)
438+
439+
testScheduler.advanceUntilIdle()
440+
expect(5)
441+
}
442+
443+
@Test fun integration_delay_whenDispatched() = runTest {
444+
val testDispatcher = StandardTestDispatcher(testScheduler)
445+
val dispatcher = WorkStealingDispatcher(testDispatcher)
446+
447+
launch(dispatcher) {
448+
expect(0)
449+
delay(20)
450+
expect(3)
451+
}
452+
launch(dispatcher) {
453+
expect(1)
454+
delay(10)
455+
expect(2)
456+
}
457+
458+
testDispatcher.scheduler.advanceUntilIdle()
459+
expect(4)
460+
}
461+
420462
private fun CoroutineDispatcher.dispatch(block: () -> Unit) {
421463
dispatch(this, Runnable { block() })
422464
}

0 commit comments

Comments
 (0)