Skip to content

Commit 295f620

Browse files
committed
WIP: use AsyncThrowingStream instead of AsyncThrowingChannel
1 parent 52b14d8 commit 295f620

File tree

2 files changed

+74
-27
lines changed

2 files changed

+74
-27
lines changed

Sources/IORing/IORing.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public final class IORing: CustomStringConvertible {
391391
moreFlags: UInt32 = 0,
392392
bufferIndexOrGroup: UInt16 = 0,
393393
handler: @escaping @Sendable (io_uring_cqe) throws -> T
394-
) throws -> AsyncThrowingChannel<T, Error> {
394+
) throws -> AsyncThrowingStream<T, Error> {
395395
try MultishotSubmission(
396396
ring: self,
397397
opcode,
@@ -616,7 +616,7 @@ private extension IORing {
616616
fd: FileDescriptorRepresentable,
617617
count: Int,
618618
link: Bool = false
619-
) throws -> AsyncThrowingChannel<[UInt8], Error> {
619+
) throws -> AsyncThrowingStream<[UInt8], Error> {
620620
var buffer = [UInt8]._unsafelyInitialized(count: count)
621621
return try prepareAndSubmitMultishot(
622622
.recv,
@@ -654,7 +654,7 @@ private extension IORing {
654654
count: Int,
655655
capacity: Int,
656656
flags: UInt32 = 0
657-
) async throws -> AsyncThrowingChannel<Message, Error> {
657+
) async throws -> AsyncThrowingStream<Message, Error> {
658658
// FIXME: combine message holder buffer registration with multishot registration to avoid extra system call
659659
let holder = try await MessageHolder(ring: self, size: count, count: capacity)
660660
return try await holder.withUnsafeMutablePointer { @IORingActor pointer in
@@ -669,7 +669,7 @@ private extension IORing {
669669
bufferIndexOrGroup: holder.bufferGroup
670670
) { [holder] cqe in
671671
// because the default for multishots is to resubmit when IORING_CQE_F_MORE is unset,
672-
// we don't need to deallocate the buffer here. FIXME: do this when channel closes.
672+
// we don't need to deallocate the buffer here. FIXME: do this when stream finishes.
673673
// we know that handlers are always executed in an @IORingActor actor's execution context
674674
// so it's safe to access the holder's buffer. But we should make this more explicit
675675
// by making the callback take an async function.
@@ -715,7 +715,7 @@ private extension IORing {
715715
func io_uring_op_multishot_accept(
716716
fd: FileDescriptorRepresentable,
717717
flags: UInt32 = 0
718-
) throws -> AsyncThrowingChannel<FileDescriptorRepresentable, Error> {
718+
) throws -> AsyncThrowingStream<FileDescriptorRepresentable, Error> {
719719
try prepareAndSubmitMultishot(
720720
.accept,
721721
fd: fd,

Sources/IORing/Submission.swift

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,22 @@ final class BufferSubmission<U>: Submission<()> {
371371
}
372372

373373
final class MultishotSubmission<T: Sendable>: Submission<T> {
374-
private let channel = AsyncThrowingChannel<T, Error>()
374+
// Shared holder ensures continuation is accessible across resubmissions
375+
private final class StreamHolder: Sendable {
376+
let stream: AsyncThrowingStream<T, Error>
377+
let continuation: AsyncThrowingStream<T, Error>.Continuation
378+
379+
init() {
380+
var cont: AsyncThrowingStream<T, Error>.Continuation!
381+
let str = AsyncThrowingStream<T, Error> { continuation in
382+
cont = continuation
383+
}
384+
stream = str
385+
continuation = cont
386+
}
387+
}
388+
389+
private let holder: StreamHolder
375390

376391
// state for resubmission
377392
private let address: UnsafeRawPointer?
@@ -383,7 +398,8 @@ final class MultishotSubmission<T: Sendable>: Submission<T> {
383398
private let bufferIndexOrGroup: UInt16
384399
private let socketAddress: sockaddr_storage?
385400

386-
override init(
401+
// Private init that allows specifying a holder (for resubmission)
402+
private init(
387403
ring: IORing,
388404
_ opcode: IORingOperation,
389405
fd: FileDescriptorRepresentable,
@@ -395,8 +411,10 @@ final class MultishotSubmission<T: Sendable>: Submission<T> {
395411
moreFlags: UInt32 = 0,
396412
bufferIndexOrGroup: UInt16 = 0,
397413
socketAddress: sockaddr_storage? = nil,
414+
holder: StreamHolder?,
398415
handler: @escaping @Sendable (io_uring_cqe) throws -> T
399416
) throws {
417+
self.holder = holder ?? StreamHolder()
400418
self.address = address
401419
self.length = length
402420
self.offset = offset
@@ -422,44 +440,73 @@ final class MultishotSubmission<T: Sendable>: Submission<T> {
422440
)
423441
}
424442

425-
private convenience init(_ submission: MultishotSubmission) throws {
443+
// Public convenience init (creates new holder/stream)
444+
override convenience init(
445+
ring: IORing,
446+
_ opcode: IORingOperation,
447+
fd: FileDescriptorRepresentable,
448+
address: UnsafeRawPointer? = nil,
449+
length: CUnsignedInt = 0,
450+
offset: IORing.Offset = 0,
451+
flags: IORing.SqeFlags = IORing.SqeFlags(),
452+
ioprio: UInt16 = 0,
453+
moreFlags: UInt32 = 0,
454+
bufferIndexOrGroup: UInt16 = 0,
455+
socketAddress: sockaddr_storage? = nil,
456+
handler: @escaping @Sendable (io_uring_cqe) throws -> T
457+
) throws {
426458
try self.init(
427-
ring: submission.ring,
428-
submission.opcode,
429-
fd: submission.fd,
430-
address: submission.address,
431-
length: submission.length,
432-
offset: submission.offset,
433-
flags: submission.flags,
434-
ioprio: submission.ioprio,
435-
moreFlags: submission.moreFlags,
436-
bufferIndexOrGroup: submission.bufferIndexOrGroup,
437-
socketAddress: submission.socketAddress,
438-
handler: submission.handler
459+
ring: ring,
460+
opcode,
461+
fd: fd,
462+
address: address,
463+
length: length,
464+
offset: offset,
465+
flags: flags,
466+
ioprio: ioprio,
467+
moreFlags: moreFlags,
468+
bufferIndexOrGroup: bufferIndexOrGroup,
469+
socketAddress: socketAddress,
470+
holder: nil,
471+
handler: handler
439472
)
440473
}
441474

442-
func submit() throws -> AsyncThrowingChannel<T, Error> {
475+
func submit() throws -> AsyncThrowingStream<T, Error> {
443476
try ring.submit()
444-
return channel
477+
return holder.stream
445478
}
446479

447480
private func resubmit() {
448481
do {
449-
// this will allocate a new SQE with the same channel, fd, opcode and handler
450-
let resubmission = try MultishotSubmission(self)
482+
// Create new SQE with same holder (shared stream/continuation)
483+
let resubmission = try MultishotSubmission(
484+
ring: ring,
485+
opcode,
486+
fd: fd,
487+
address: address,
488+
length: length,
489+
offset: offset,
490+
flags: flags,
491+
ioprio: ioprio,
492+
moreFlags: moreFlags,
493+
bufferIndexOrGroup: bufferIndexOrGroup,
494+
socketAddress: socketAddress,
495+
holder: holder, // Share the same holder!
496+
handler: handler
497+
)
451498
IORing.shared.logger.debug("resubmitting multishot submission \(resubmission)")
452499
_ = try resubmission.submit()
453500
} catch {
454501
IORing.shared.logger.debug("resubmitting multishot submission failed: \(error)")
455-
channel.fail(error)
502+
holder.continuation.finish(throwing: error)
456503
}
457504
}
458505

459506
override func onCompletion(cqe: io_uring_cqe) async {
460507
do {
461508
let result = try throwingErrno(cqe: cqe, handler)
462-
await channel.send(result)
509+
holder.continuation.yield(result) // No suspension point!
463510
if cqe.flags & IORING_CQE_F_MORE == 0 {
464511
// if IORING_CQE_F_MORE is not set, we need to issue a new request
465512
// try to do this implictily
@@ -469,7 +516,7 @@ final class MultishotSubmission<T: Sendable>: Submission<T> {
469516
try await cancel()
470517
}
471518
} catch {
472-
channel.fail(error)
519+
holder.continuation.finish(throwing: error)
473520
}
474521
}
475522
}

0 commit comments

Comments
 (0)