-
Notifications
You must be signed in to change notification settings - Fork 106
Introduce WorkStealingDispatcher. #1364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
4df1469
to
b71978b
Compare
5adab40
to
b81c18d
Compare
?: ArrayList<Runnable>(initialCapacity = INITIAL_QUEUE_CAPACITY) | ||
.also { this.queue = it } | ||
queue += block | ||
println("OMG dispatch: queue size = ${queue.size}") |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
20e7b6b
to
e56709d
Compare
Tests in this PR and the runtime one (#1365) are green, so removed the logs, and this is fully ready to review (and hopefully merge). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but I don't think my review is worth much. Maybe we can get @jingibus to take a look?
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could get some of the folks from kotlinx.coroutines that were engaging in my issue - Kotlin/kotlinx.coroutines#4456 - and on the thread - https://kotlinlang.slack.com/archives/C1CFAFJSK/p1745528199117359 - to review this.
The suggestion was made that the behaviour we wanted was runBlocking
but we did not have the mechanism to control the coroutines except via the dispatcher.
This is that mechanism to do that, but I wonder if they might know of some more gotchas/important test cases from work with runBlocking
.
The advantage we have here is that we carefully control where advanceUntilIdle
is called.
I'm still confused by calling advanceUntilIdle
at the end of resumeWith()
though?
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Outdated
Show resolved
Hide resolved
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Outdated
Show resolved
Hide resolved
* a special thread-local queue. The only way to access this queue is to have the dispatcher | ||
* intercept a continuation and resume the intercepted continuation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about the event loop here? Might as well mention it.
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Show resolved
Hide resolved
}, | ||
onFinishedLocked = { | ||
// If we set this flag above, then clear it now so future dispatch calls schedule normally. | ||
dispatchScheduled = wasDispatchScheduled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would we ever need to set it back to true? haven't we just drained the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't we just drained the queue?
Yes, and our scheduled drain operation will indeed be a noop if nothing is dispatched in the meantime.
why would we ever need to set it back to true?
Because if something is dispatched between the end of this operation and when our scheduled drain runs, we don't want to schedule another drain.
I think what you're wondering is why we don't cancel the scheduled drain here instead. The reason is: we can't. The dispatcher API only allows you to schedule Runnable
s to run, not cancel them. I think the reason for that is that even a cancelled continuation needs to be resumed when cancelled to run any finally
blocks.
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Outdated
Show resolved
Hide resolved
forEach { | ||
it.run() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think forEach
guarantees ordering, does it? My frame of reference for contrast is the trampolining queue in Compose's AndroidUiDispatcher
. Having the ordering behaviour enforced (FIFO) is helpful because it means that our actions will get queued up before we start handling them.
I think I see how we can get around not locking getting the next item off the queue, because we are using an isolated queue for each draining, and the rest of the tasks build up on a new queue. So that part makes sense.
How do you think this performs compared to using the locked nextTask()
method of Compose's AndroidUiDispatcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forEach
guarantees ordering for the "batch" of tasks, but ordering of batches depends on the underlying dispatcher and how advanceUntilIdle
is called. It's not guaranteed if the underlying dispatcher is not single-threaded and/or advanceUntilIdle
is called from multiple threads. E.g. Dispatchers aren't required to guarantee FIFO ordering. Dispatchers.Default
can't, for example, since it has no control over thread scheduling. The current impl of WSD doesn't provide any additional guarantees on top of whatever the underlying dispatcher does. It weakens ordering guarantees if advanceUntilIdle
is called from other threads, but as mentioned above this isn't something we do, and would just add extra overhead for no reason.
Discussed performance considerations above. tl;dr: The trade-off is that when there's contention, we can process a whole batch without locking after every task, but might end up allocating a few extra queue list objects. But AndroidUiDispatcher
locks on every task, so that's probably fine.
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Show resolved
Hide resolved
testDispatcher.scheduler.advanceUntilIdle() | ||
expect(4) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No delay()
tests here. But again, what do we want do with that? if we're going to ignore, then we need to make that clear.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -17,6 +17,8 @@ kotlin { | |||
if (targets == "kmp" || targets == "js") { | |||
js(IR) { browser() } | |||
} | |||
|
|||
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this we get an error about the internal expect actual class Lock
. It doesn't affect public API, so we don't need to worry about transitive compatibility.
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Outdated
Show resolved
Hide resolved
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Show resolved
Hide resolved
forEach { | ||
it.run() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forEach
guarantees ordering for the "batch" of tasks, but ordering of batches depends on the underlying dispatcher and how advanceUntilIdle
is called. It's not guaranteed if the underlying dispatcher is not single-threaded and/or advanceUntilIdle
is called from multiple threads. E.g. Dispatchers aren't required to guarantee FIFO ordering. Dispatchers.Default
can't, for example, since it has no control over thread scheduling. The current impl of WSD doesn't provide any additional guarantees on top of whatever the underlying dispatcher does. It weakens ordering guarantees if advanceUntilIdle
is called from other threads, but as mentioned above this isn't something we do, and would just add extra overhead for no reason.
Discussed performance considerations above. tl;dr: The trade-off is that when there's contention, we can process a whole batch without locking after every task, but might end up allocating a few extra queue list objects. But AndroidUiDispatcher
locks on every task, so that's probably fine.
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for | ||
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information | ||
* on how this works. |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Show resolved
Hide resolved
...flow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt
Show resolved
Hide resolved
testDispatcher.scheduler.advanceUntilIdle() | ||
expect(4) | ||
} | ||
|
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
d606d8c
to
483cca7
Compare
7568824
to
c660617
Compare
c660617
to
446b1d8
Compare
No description provided.