Skip to content

Commit 7ea8800

Browse files
committed
SwiftOCADevice: use AsyncThrowingStream, instead of AsyncThrowingChannel
avoid a suspension point by using an AsyncThrowingStream to process inbound messages in the CF and IORing backends
1 parent 56467a1 commit 7ea8800

File tree

2 files changed

+25
-8
lines changed

2 files changed

+25
-8
lines changed

Sources/SwiftOCADevice/OCP.1/Backend/CF/Ocp1CFController.swift

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible {
8989
_messages.eraseToAnyAsyncSequence()
9090
}
9191

92-
private var _messages = AsyncThrowingChannel<Ocp1MessageList, Error>()
92+
private let _messages: AsyncThrowingStream<Ocp1MessageList, Error>
93+
private let _messagesContinuation: AsyncThrowingStream<Ocp1MessageList, Error>.Continuation
9394
private let socket: _CFSocketWrapper
9495
let notificationSocket: _CFSocketWrapper
9596

@@ -106,6 +107,11 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible {
106107
self.notificationSocket = notificationSocket
107108
peerAddress = socket.peerAddress!
108109

110+
(_messages, _messagesContinuation) = AsyncThrowingStream.makeStream(
111+
of: Ocp1MessageList.self,
112+
throwing: Error.self
113+
)
114+
109115
if peerAddress.family == AF_LOCAL {
110116
connectionPrefix = OcaLocalConnectionPrefix
111117
} else {
@@ -115,12 +121,12 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible {
115121
receiveMessageTask = Task { [self] in
116122
do {
117123
repeat {
118-
try await _messages
119-
.send(OcaDevice.receiveMessages { try await Array(socket.read(count: $0)) })
124+
let messages = try await OcaDevice.receiveMessages { try await Array(socket.read(count: $0)) }
125+
_messagesContinuation.yield(messages)
120126
if Task.isCancelled { break }
121127
} while true
122128
} catch {
123-
_messages.fail(error)
129+
_messagesContinuation.finish(throwing: error)
124130
}
125131
}
126132
}
@@ -133,6 +139,8 @@ actor Ocp1CFStreamController: Ocp1CFControllerPrivate, CustomStringConvertible {
133139

134140
receiveMessageTask?.cancel()
135141
receiveMessageTask = nil
142+
143+
_messagesContinuation.finish()
136144
}
137145

138146
func onConnectionBecomingStale() async throws {

Sources/SwiftOCADevice/OCP.1/Backend/IORing/Ocp1IORingController.swift

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve
8080
_messages.eraseToAnyAsyncSequence()
8181
}
8282

83-
private var _messages = AsyncThrowingChannel<Ocp1MessageList, Error>()
83+
private let _messages: AsyncThrowingStream<Ocp1MessageList, Error>
84+
private let _messagesContinuation: AsyncThrowingStream<Ocp1MessageList, Error>.Continuation
8485
private let socket: Socket
8586
let notificationSocket: Socket
8687

@@ -97,6 +98,11 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve
9798
self.notificationSocket = notificationSocket
9899
self.endpoint = endpoint
99100

101+
(_messages, _messagesContinuation) = AsyncThrowingStream.makeStream(
102+
of: Ocp1MessageList.self,
103+
throwing: Error.self
104+
)
105+
100106
peerAddress = try AnySocketAddress(self.socket.peerAddress)
101107
if peerAddress.family == AF_LOCAL {
102108
connectionPrefix = OcaLocalConnectionPrefix
@@ -107,14 +113,15 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve
107113
receiveMessageTask = Task { [self] in
108114
do {
109115
repeat {
110-
try await _messages.send(OcaDevice.receiveMessages { try await socket.read(
116+
let messages = try await OcaDevice.receiveMessages { try await socket.read(
111117
count: $0,
112118
awaitingAllRead: true
113-
) })
119+
) }
120+
_messagesContinuation.yield(messages)
114121
if Task.isCancelled { break }
115122
} while true
116123
} catch {
117-
_messages.fail(error)
124+
_messagesContinuation.finish(throwing: error)
118125
}
119126
}
120127
}
@@ -127,6 +134,8 @@ actor Ocp1IORingStreamController: Ocp1IORingControllerPrivate, CustomStringConve
127134

128135
receiveMessageTask?.cancel()
129136
receiveMessageTask = nil
137+
138+
_messagesContinuation.finish()
130139
}
131140

132141
func onConnectionBecomingStale() async throws {

0 commit comments

Comments
 (0)