Skip to content

Commit c6affdf

Browse files
committed
avoid actor hopping on SubmissionGroup
1 parent 27f7446 commit c6affdf

File tree

3 files changed

+14
-20
lines changed

3 files changed

+14
-20
lines changed

Sources/IORing/IORing.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,9 @@ public actor IORing: CustomStringConvertible {
336336
func withSubmissionGroup<T: Sendable>(_ body: (
337337
SubmissionGroup<T>
338338
) async throws -> ()) async throws -> [T] {
339-
let submissionGroup = try await SubmissionGroup<T>(ring: self)
339+
let submissionGroup = try SubmissionGroup<T>(ring: self)
340340
try await body(submissionGroup)
341-
return try await submissionGroup.finish()
341+
return try await submissionGroup.finish(ring: self)
342342
}
343343

344344
fileprivate func prepareAndSubmit<T: Sendable>(

Sources/IORing/Submission.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
201201
handler: handler
202202
)
203203
if let group {
204-
await group.enqueue(submission: self)
204+
group.enqueue(submission: self, ring: ring)
205205
}
206206
}
207207

@@ -211,7 +211,7 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
211211
// guaranteed to run immediately
212212
self.continuation = continuation
213213
if group != nil {
214-
Task { await ready() }
214+
ready()
215215
} else {
216216
_ = try? ring.submit()
217217
}

Sources/IORing/SubmissionGroup.swift

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import AsyncQueue
1919
import Glibc
2020

2121
extension SingleshotSubmission {
22-
func enqueue() {
22+
func enqueue(ring: isolated IORing) {
2323
guard let group else { return }
2424
Task {
2525
do {
@@ -31,34 +31,30 @@ extension SingleshotSubmission {
3131
}
3232
}
3333

34-
func ready() async {
35-
await group?.readinessChannel.send(())
34+
func ready() {
35+
Task { await group?.readinessChannel.send(()) }
3636
}
3737
}
3838

39-
actor SubmissionGroup<T: Sendable> {
39+
final class SubmissionGroup<T: Sendable>: Sendable {
4040
private let ring: IORing
41-
private let queue = ActorQueue<SubmissionGroup>()
42-
private var submissions = [SingleshotSubmission<T>]()
41+
private nonisolated(unsafe) var submissions = [SingleshotSubmission<T>]()
4342

4443
fileprivate let readinessChannel = AsyncChannel<()>()
4544
fileprivate let resultChannel = AsyncThrowingChannel<T, Error>()
4645

47-
init(ring: IORing) async throws {
46+
init(ring: isolated IORing) throws {
4847
self.ring = ring
49-
queue.adoptExecutionContext(of: self)
5048
}
5149

5250
///
5351
/// Asynchronously enqueues an submission. Submission must call `ready()` when
5452
/// its continuation is registered in the SQE `user_data` otherwise the group
5553
/// will never be submitted.
5654
///
57-
func enqueue(submission: SingleshotSubmission<T>) {
55+
func enqueue(submission: SingleshotSubmission<T>, ring: isolated IORing) {
5856
submissions.append(submission)
59-
Task(on: queue) { _ in
60-
submission.enqueue()
61-
}
57+
submission.enqueue(ring: ring)
6258
}
6359

6460
private func allReady() async {
@@ -75,15 +71,13 @@ actor SubmissionGroup<T: Sendable> {
7571
///
7672
/// Completing the submission group involves the following:
7773
///
78-
/// - Await all submissions to be scheduled on queue
7974
/// - Wait for all submissions to have continuations registered
8075
/// - Submit SQEs to I/O ring
8176
/// - Collect results from results channel
8277
///
83-
func finish() async throws -> [T] {
84-
await Task(on: queue) { _ in }.value
78+
func finish(ring: isolated IORing) async throws -> [T] {
8579
await allReady()
86-
try await ring.submit()
80+
try ring.submit()
8781
readinessChannel.finish()
8882
return try await allComplete()
8983
}

0 commit comments

Comments
 (0)