Skip to content

Commit 9f9f17d

Browse files
committed
WIP: submission protocol
1 parent 27f7446 commit 9f9f17d

File tree

1 file changed

+138
-44
lines changed

1 file changed

+138
-44
lines changed

Sources/IORing/Submission.swift

Lines changed: 138 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,53 @@ internal import CIOURing
2121
import Glibc
2222
import SystemPackage
2323

24-
class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
24+
// MARK: - Submission Protocol
25+
26+
protocol Submission: CustomStringConvertible, Sendable, Hashable {
27+
associatedtype Result: Sendable
28+
29+
var core: SubmissionCore<Result> { get }
30+
31+
func onCompletion(cqe: io_uring_cqe)
32+
func onCancel(cqe: io_uring_cqe)
33+
}
34+
35+
extension Submission {
36+
var ring: IORing { core.ring }
37+
var fd: FileDescriptorRepresentable { core.fd }
38+
var opcode: IORingOperation { core.opcode }
39+
40+
nonisolated var description: String {
41+
"(\(type(of: self)))(fd: \(fd.fileDescriptor), opcode: \(opcode), handler: \(String(describing: core.handler)))"
42+
}
43+
44+
func cancel(ring: isolated IORing) throws {
45+
try core.cancel(ring: ring, onCancel: onCancel)
46+
}
47+
48+
func throwingErrno(
49+
cqe: io_uring_cqe,
50+
_ body: @escaping @Sendable (_: io_uring_cqe) throws -> Result
51+
) throws -> Result {
52+
try core.throwingErrno(cqe: cqe, body)
53+
}
54+
}
55+
56+
// MARK: - Submission Core
57+
58+
final class SubmissionCore<T: Sendable>: @unchecked Sendable {
2559
// reference to owner which owns ring
2660
let ring: IORing
2761
/// user-supplied callback to transform a completion queue entry to a result
28-
fileprivate let handler: @Sendable (io_uring_cqe) throws -> T
62+
let handler: @Sendable (io_uring_cqe) throws -> T
2963
/// file descriptor, stored so that it is not closed before the completion handler is run
30-
fileprivate let fd: FileDescriptorRepresentable
31-
64+
let fd: FileDescriptorRepresentable
3265
/// opcode, useful for debugging
33-
fileprivate let opcode: IORingOperation
66+
let opcode: IORingOperation
3467
/// assigned submission queue entry for this object
3568
private let sqe: UnsafeMutablePointer<io_uring_sqe>
3669
private var cancellationToken: UnsafeMutableRawPointer?
3770

38-
nonisolated var description: String {
39-
"(\(type(of: self)))(fd: \(fd.fileDescriptor), opcode: \(opcode), handler: \(String(describing: handler)))"
40-
}
41-
4271
private func prepare(
4372
_ opcode: IORingOperation,
4473
sqe: UnsafeMutablePointer<io_uring_sqe>,
@@ -86,24 +115,24 @@ class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
86115
/// because actors are reentrant, `setBlock()` must be called immediately after
87116
/// the io_uring assigned a SQE (or, at least before any suspension point)
88117
/// FIXME: `swift_allocObject()` here appears to be a potential performance issue
89-
private func setBlock() {
118+
private func setBlock(onCompletion: @escaping (io_uring_cqe) -> ()) {
90119
cancellationToken = io_uring_sqe_set_block(sqe) { cqe in
91120
let cqe = cqe.pointee
92-
self.onCompletion(cqe: cqe)
121+
onCompletion(cqe)
93122
}
94123
}
95124

96-
func cancel(ring: isolated IORing) throws {
125+
func cancel(ring: isolated IORing, onCancel: @escaping (io_uring_cqe) -> ()) throws {
97126
do {
98127
precondition(cancellationToken != nil)
99128
let sqe = try ring.getSqe()
100129
io_uring_prep_cancel(sqe, cancellationToken, AsyncCancelFlags.userData.rawValue)
101130
_ = io_uring_sqe_set_block(sqe) { cqe in
102-
self.onCancel(cqe: cqe.pointee)
131+
onCancel(cqe.pointee)
103132
}
104133
try ring.submit()
105134
} catch {
106-
IORing.shared.logger.debug("failed to cancel submission \(self)")
135+
IORing.shared.logger.debug("failed to cancel submission")
107136
throw error
108137
}
109138
}
@@ -120,7 +149,8 @@ class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
120149
moreFlags: UInt32 = 0,
121150
bufferIndexOrGroup: UInt16 = 0,
122151
socketAddress: sockaddr_storage? = nil,
123-
handler: @escaping @Sendable (io_uring_cqe) throws -> T
152+
handler: @escaping @Sendable (io_uring_cqe) throws -> T,
153+
onCompletion: @escaping (io_uring_cqe) -> ()
124154
) throws {
125155
sqe = try ring.getSqe()
126156
self.ring = ring
@@ -140,12 +170,9 @@ class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
140170
try setSocketAddress(sqe: sqe, socketAddress: socketAddress)
141171
}
142172
}
143-
setBlock()
173+
setBlock(onCompletion: onCompletion)
144174
}
145175

146-
func onCompletion(cqe: io_uring_cqe) { fatalError("must be implemented by concrete class") }
147-
func onCancel(cqe: io_uring_cqe) {}
148-
149176
func throwingErrno(
150177
cqe: io_uring_cqe,
151178
_ body: @escaping @Sendable (_: io_uring_cqe) throws -> T
@@ -155,7 +182,7 @@ class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
155182
if error != .brokenPipe {
156183
IORing.shared.logger
157184
.debug(
158-
"\(type(of: self)) completion fileDescriptor: \(fd) opcode: \(opcode) error: \(Errno(rawValue: -cqe.res))"
185+
"completion fileDescriptor: \(fd) opcode: \(opcode) error: \(Errno(rawValue: -cqe.res))"
159186
)
160187
}
161188
throw error
@@ -164,11 +191,19 @@ class Submission<T: Sendable>: CustomStringConvertible, @unchecked Sendable {
164191
}
165192
}
166193

167-
final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable {
194+
// MARK: - SingleshotSubmission
195+
196+
struct SingleshotSubmission<T: Sendable>: Submission, @unchecked Sendable {
197+
typealias Result = T
198+
199+
let core: SubmissionCore<T>
168200
weak var group: SubmissionGroup<T>?
169201

170-
private typealias Continuation = UnsafeContinuation<T, Error>
171-
private var continuation: Continuation!
202+
private final class ContinuationHolder: @unchecked Sendable {
203+
var continuation: UnsafeContinuation<T, Error>?
204+
}
205+
206+
private let holder: ContinuationHolder
172207

173208
init(
174209
ring: isolated IORing,
@@ -186,7 +221,9 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
186221
handler: @escaping @Sendable (io_uring_cqe) throws -> T
187222
) async throws {
188223
self.group = group
189-
try super.init(
224+
holder = ContinuationHolder()
225+
226+
core = try SubmissionCore(
190227
ring: ring,
191228
opcode,
192229
fd: fd,
@@ -198,8 +235,17 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
198235
moreFlags: moreFlags,
199236
bufferIndexOrGroup: bufferIndex,
200237
socketAddress: socketAddress,
201-
handler: handler
238+
handler: handler,
239+
onCompletion: { [holder] cqe in
240+
guard let continuation = holder.continuation else { return }
241+
do {
242+
try continuation.resume(returning: handler(cqe))
243+
} catch {
244+
continuation.resume(throwing: error)
245+
}
246+
}
202247
)
248+
203249
if let group {
204250
await group.enqueue(submission: self)
205251
}
@@ -209,7 +255,7 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
209255
try await withTaskCancellationHandler(operation: {
210256
try await withUnsafeThrowingContinuation { continuation in
211257
// guaranteed to run immediately
212-
self.continuation = continuation
258+
holder.continuation = continuation
213259
if group != nil {
214260
Task { await ready() }
215261
} else {
@@ -226,13 +272,16 @@ final class SingleshotSubmission<T: Sendable>: Submission<T>, @unchecked Sendabl
226272
try await _submit(ring: ring)
227273
}
228274

229-
override func onCompletion(cqe: io_uring_cqe) {
275+
func onCompletion(cqe: io_uring_cqe) {
276+
guard let continuation = holder.continuation else { return }
230277
do {
231-
try continuation.resume(returning: throwingErrno(cqe: cqe, handler))
278+
try continuation.resume(returning: throwingErrno(cqe: cqe, core.handler))
232279
} catch {
233280
continuation.resume(throwing: error)
234281
}
235282
}
283+
284+
func onCancel(cqe: io_uring_cqe) {}
236285
}
237286

238287
struct BufferCount: FileDescriptorRepresentable {
@@ -243,7 +292,13 @@ struct BufferCount: FileDescriptorRepresentable {
243292
}
244293
}
245294

246-
final class BufferSubmission<U>: Submission<()>, @unchecked Sendable {
295+
// MARK: - BufferSubmission
296+
297+
final class BufferSubmission<U>: Submission, @unchecked Sendable {
298+
typealias Result = ()
299+
300+
let core: SubmissionCore<()>
301+
247302
nonisolated var count: Int {
248303
Int(fd.fileDescriptor)
249304
}
@@ -253,7 +308,8 @@ final class BufferSubmission<U>: Submission<()>, @unchecked Sendable {
253308
let buffer: UnsafeMutablePointer<U>
254309
let deallocate: Bool
255310

256-
override func onCompletion(cqe: io_uring_cqe) {}
311+
func onCompletion(cqe: io_uring_cqe) {}
312+
func onCancel(cqe: io_uring_cqe) {}
257313

258314
private func _submit(ring: isolated IORing) throws {
259315
try ring.submit()
@@ -287,16 +343,18 @@ final class BufferSubmission<U>: Submission<()>, @unchecked Sendable {
287343
self.deallocate = deallocate
288344
self.buffer = buffer
289345

290-
try super.init(
346+
core = try SubmissionCore(
291347
ring: ring,
292348
.provide_buffers,
293349
fd: BufferCount(count: count),
294350
address: buffer,
295351
length: UInt32(size),
296352
offset: offset,
297353
flags: flags,
298-
bufferIndexOrGroup: bufferGroup
299-
) { _ in }
354+
bufferIndexOrGroup: bufferGroup,
355+
handler: { _ in },
356+
onCompletion: { _ in }
357+
)
300358
}
301359

302360
convenience init(
@@ -382,7 +440,13 @@ final class BufferSubmission<U>: Submission<()>, @unchecked Sendable {
382440
}
383441
}
384442

385-
final class MultishotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable {
443+
// MARK: - MultishotSubmission
444+
445+
final class MultishotSubmission<T: Sendable>: Submission, @unchecked Sendable {
446+
typealias Result = T
447+
448+
let core: SubmissionCore<T>
449+
386450
// Shared holder ensures continuation is accessible across resubmissions
387451
private final class _StreamHolder: Sendable {
388452
let stream: AsyncThrowingStream<T, Error>
@@ -434,7 +498,7 @@ final class MultishotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable
434498
self.socketAddress = socketAddress
435499
self.holder = holder
436500

437-
try super.init(
501+
core = try SubmissionCore(
438502
ring: ring,
439503
opcode,
440504
fd: fd,
@@ -446,7 +510,15 @@ final class MultishotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable
446510
moreFlags: moreFlags,
447511
bufferIndexOrGroup: bufferIndexOrGroup,
448512
socketAddress: socketAddress,
449-
handler: handler
513+
handler: handler,
514+
onCompletion: { [holder] cqe in
515+
do {
516+
let result = try handler(cqe)
517+
holder.continuation.yield(result)
518+
} catch {
519+
holder.continuation.finish(throwing: error)
520+
}
521+
}
450522
)
451523
}
452524

@@ -464,11 +536,11 @@ final class MultishotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable
464536
bufferIndexOrGroup: submission.bufferIndexOrGroup,
465537
socketAddress: submission.socketAddress,
466538
holder: submission.holder,
467-
handler: submission.handler
539+
handler: submission.core.handler
468540
)
469541
}
470542

471-
override convenience init(
543+
convenience init(
472544
ring: isolated IORing,
473545
_ opcode: IORingOperation,
474546
fd: FileDescriptorRepresentable,
@@ -522,22 +594,24 @@ final class MultishotSubmission<T: Sendable>: Submission<T>, @unchecked Sendable
522594
}
523595
}
524596

525-
override func onCompletion(cqe: io_uring_cqe) {
597+
func onCompletion(cqe: io_uring_cqe) {
526598
do {
527-
let result = try throwingErrno(cqe: cqe, handler)
599+
let result = try throwingErrno(cqe: cqe, core.handler)
528600
holder.continuation.yield(result) // No suspension point!
529601
if cqe.flags & IORING_CQE_F_MORE == 0 {
530602
// if IORING_CQE_F_MORE is not set, we need to issue a new request
531603
// try to do this implictily
532604
Task { await resubmit(ring: ring) }
533605
}
534606
if Task.isCancelled {
535-
Task { try await cancel(ring: ring) }
607+
Task { try? await cancel(ring: ring) }
536608
}
537609
} catch {
538610
holder.continuation.finish(throwing: error)
539611
}
540612
}
613+
614+
func onCancel(cqe: io_uring_cqe) {}
541615
}
542616

543617
enum IORingOperation: UInt32 {
@@ -609,13 +683,33 @@ struct AsyncCancelFlags: OptionSet {
609683
static let op = AsyncCancelFlags(rawValue: 1 << 5)
610684
}
611685

612-
extension Submission: Equatable {
613-
nonisolated static func == (lhs: Submission, rhs: Submission) -> Bool {
686+
// MARK: - Equatable and Hashable conformances
687+
688+
extension SingleshotSubmission: Equatable {
689+
nonisolated static func == (lhs: SingleshotSubmission, rhs: SingleshotSubmission) -> Bool {
690+
lhs.core === rhs.core
691+
}
692+
693+
nonisolated func hash(into hasher: inout Hasher) {
694+
ObjectIdentifier(core).hash(into: &hasher)
695+
}
696+
}
697+
698+
extension BufferSubmission: Equatable {
699+
nonisolated static func == (lhs: BufferSubmission, rhs: BufferSubmission) -> Bool {
614700
lhs === rhs
615701
}
702+
703+
nonisolated func hash(into hasher: inout Hasher) {
704+
ObjectIdentifier(self).hash(into: &hasher)
705+
}
616706
}
617707

618-
extension Submission: Hashable {
708+
extension MultishotSubmission: Equatable {
709+
nonisolated static func == (lhs: MultishotSubmission, rhs: MultishotSubmission) -> Bool {
710+
lhs === rhs
711+
}
712+
619713
nonisolated func hash(into hasher: inout Hasher) {
620714
ObjectIdentifier(self).hash(into: &hasher)
621715
}

0 commit comments

Comments
 (0)