Skip to content

Commit 7638790

Browse files
committed
WIP: connection broker
1 parent 44f23f0 commit 7638790

File tree

12 files changed

+850
-126
lines changed

12 files changed

+850
-126
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//
2+
// Copyright (c) 2025 PADL Software Pty Ltd
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the License);
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an 'AS IS' BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import SwiftOCA
18+
19+
private let connectionOptions = Ocp1ConnectionOptions(flags: [
20+
.automaticReconnect,
21+
// .enableTracing,
22+
.refreshSubscriptionsOnReconnection,
23+
.retainObjectCacheAfterDisconnect,
24+
])
25+
26+
@main
27+
public enum BrokerTest {
28+
public static func main() async throws {
29+
let broker = await OcaConnectionBroker(connectionOptions: connectionOptions)
30+
print("waiting for events from broker...")
31+
for try await event in await broker.events {
32+
print("\(event)")
33+
switch event.eventType {
34+
case .deviceAdded:
35+
Task { try await broker.connect(device: event.deviceIdentifier) }
36+
default:
37+
break
38+
}
39+
}
40+
print("done!")
41+
}
42+
}

Package.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,19 @@ let CommonTargets: [Target] = [
187187
.unsafeFlags(ASANSwiftFlags),
188188
],
189189
linkerSettings: [] + ASANLinkerSettings
190-
191190
),
191+
.executableTarget(
192+
name: "OCABrokerTest",
193+
dependencies: [
194+
"SwiftOCA",
195+
],
196+
path: "Examples/OCABrokerTest",
197+
swiftSettings: [
198+
.unsafeFlags(ASANSwiftFlags),
199+
],
200+
linkerSettings: [] + ASANLinkerSettings
192201

202+
),
193203
.testTarget(
194204
name: "SwiftOCATests",
195205
dependencies: [

Sources/SwiftOCA/OCA/Browser.swift

Lines changed: 94 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,19 @@ import Foundation
2121

2222
extension NetService: @unchecked Sendable {}
2323

24-
public class OcaBrowser: NSObject, NetServiceBrowserDelegate, @unchecked
25-
Sendable {
24+
public final class OcaBrowser: NSObject, NetServiceBrowserDelegate {
2625
public enum Result: Sendable {
2726
case didNotSearch(Error)
2827
case didFind(NetService)
2928
case didRemove(NetService)
29+
30+
var result: Swift.Result<NetService, Error> {
31+
switch self {
32+
case let .didFind(service): .success(service)
33+
case let .didRemove(service): .success(service)
34+
case let .didNotSearch(error): .failure(error)
35+
}
36+
}
3037
}
3138

3239
private let browser: NetServiceBrowser
@@ -37,54 +44,46 @@ Sendable {
3744
channel = AsyncChannel<Result>()
3845
super.init()
3946
browser.delegate = self
40-
41-
Task {
42-
self.browser.searchForServices(ofType: serviceType.rawValue, inDomain: "local.")
43-
}
47+
browser.schedule(in: .main, forMode: .default)
48+
browser.searchForServices(ofType: serviceType.rawValue, inDomain: "local.")
4449
}
4550

4651
deinit {
4752
browser.stop()
53+
browser.remove(from: .main, forMode: .default)
4854
}
4955

5056
public func netServiceBrowserWillSearch(_ browser: NetServiceBrowser) {}
5157

5258
private func netServiceBrowser(_ browser: NetServiceBrowser, didNotSearch error: Error) {
53-
Task {
54-
await channel.send(Result.didNotSearch(error))
55-
}
59+
Task { await channel.send(Result.didNotSearch(error)) }
5660
}
5761

5862
public func netServiceBrowser(
5963
_ browser: NetServiceBrowser,
6064
didFind service: NetService,
6165
moreComing: Bool
6266
) {
63-
Task {
64-
await channel.send(Result.didFind(service))
65-
}
67+
Task { await channel.send(Result.didFind(service)) }
6668
}
6769

6870
public func netServiceBrowser(
6971
_ browser: NetServiceBrowser,
7072
didRemove service: NetService,
7173
moreComing: Bool
7274
) {
73-
Task {
74-
await channel.send(Result.didRemove(service))
75-
}
75+
Task { await channel.send(Result.didRemove(service)) }
7676
}
7777

7878
public func netServiceBrowserDidStopSearch(_ browser: NetServiceBrowser) {
7979
channel.finish()
8080
}
8181
}
8282

83-
fileprivate final class OcaResolverDelegate: NSObject, NetServiceDelegate, Sendable {
84-
typealias ResolutionResult = Result<Data, Error>
85-
let channel: AsyncChannel<ResolutionResult>
83+
final class OcaResolverDelegate: NSObject, NetServiceDelegate, Sendable {
84+
let channel: AsyncThrowingChannel<Data, Error>
8685

87-
init(_ channel: AsyncChannel<ResolutionResult>) {
86+
init(_ channel: AsyncThrowingChannel<Data, Error>) {
8887
self.channel = channel
8988
}
9089

@@ -93,14 +92,14 @@ fileprivate final class OcaResolverDelegate: NSObject, NetServiceDelegate, Senda
9392
}
9493

9594
func netServiceDidResolveAddress(_ sender: NetService) {
96-
Task {
97-
guard let addresses = sender.addresses, !addresses.isEmpty else {
98-
await channel.send(.failure(Ocp1Error.serviceResolutionFailed))
99-
return
100-
}
95+
guard let addresses = sender.addresses, !addresses.isEmpty else {
96+
channel.fail(Ocp1Error.serviceResolutionFailed)
97+
return
98+
}
10199

100+
Task {
102101
for address in addresses {
103-
await channel.send(ResolutionResult.success(address))
102+
await channel.send(address)
104103
}
105104
}
106105
}
@@ -109,9 +108,7 @@ fileprivate final class OcaResolverDelegate: NSObject, NetServiceDelegate, Senda
109108
let errorCode = errorDict[NetService.errorCode]!.intValue
110109
let errorDomain = errorDict[NetService.errorDomain]!.stringValue
111110

112-
Task {
113-
await channel.send(.failure(NSError(domain: errorDomain, code: errorCode)))
114-
}
111+
channel.fail(NSError(domain: errorDomain, code: errorCode))
115112
}
116113
}
117114

@@ -132,54 +129,86 @@ extension Ocp1Connection: Ocp1ConnectionFactory {
132129
throw Ocp1Error.unknownServiceType
133130
}
134131

135-
let channel = AsyncChannel<OcaResolverDelegate.ResolutionResult>()
132+
let channel = AsyncThrowingChannel<Data, Error>()
136133
let delegate = OcaResolverDelegate(channel)
137134
netService.delegate = delegate
138135
netService.schedule(in: RunLoop.main, forMode: .default)
136+
defer { netService.remove(from: RunLoop.main, forMode: .default) }
139137
netService.resolve(withTimeout: 5)
140138

141-
for await result in channel {
142-
switch result {
143-
case let .success(address):
144-
// FIXME: support IPv6
145-
guard address.withUnsafeBytes({ unbound -> Bool in
146-
unbound.withMemoryRebound(to: sockaddr.self) { cSockAddr -> Bool in
147-
cSockAddr.baseAddress!.pointee.sa_family == AF_INET
148-
}
149-
}) == true else {
150-
continue
151-
}
152-
channel.finish()
153-
154-
switch serviceType {
155-
case .tcp:
156-
try await self
157-
.init(
158-
reassigningSelfTo: Ocp1TCPConnection(
159-
deviceAddress: address,
160-
options: options
161-
) as! Self
162-
)
163-
return
164-
case .udp:
165-
try await self
166-
.init(
167-
reassigningSelfTo: Ocp1UDPConnection(
168-
deviceAddress: address,
169-
options: options
170-
) as! Self
171-
)
172-
return
173-
default:
174-
throw Ocp1Error.unknownServiceType
139+
for try await address in channel {
140+
// FIXME: support IPv6
141+
guard address.withUnsafeBytes({ unbound -> Bool in
142+
unbound.withMemoryRebound(to: sockaddr.self) { cSockAddr -> Bool in
143+
cSockAddr.baseAddress!.pointee.sa_family == AF_INET
175144
}
176-
case let .failure(error):
177-
throw error
145+
}) == true else {
146+
continue
147+
}
148+
channel.finish()
149+
150+
switch serviceType {
151+
case .tcp:
152+
try await self
153+
.init(
154+
reassigningSelfTo: Ocp1TCPConnection(
155+
deviceAddress: address,
156+
options: options
157+
) as! Self
158+
)
159+
return
160+
case .udp:
161+
try await self
162+
.init(
163+
reassigningSelfTo: Ocp1UDPConnection(
164+
deviceAddress: address,
165+
options: options
166+
) as! Self
167+
)
168+
return
169+
default:
170+
throw Ocp1Error.unknownServiceType
178171
}
179172
}
180173

181174
throw Ocp1Error.serviceResolutionFailed
182175
}
183176
}
184177

178+
extension NetService {
179+
/// Decode the TXT record as a string dictionary, or [:] if the data is malformed
180+
static func dictionary(fromTXTRecord txtData: Data) -> [String: String] {
181+
// https://stackoverflow.com/questions/40193911/nsnetservice-dictionaryfromtxtrecord-fails-an-assertion-on-invalid-input
182+
var result = [String: String]()
183+
var data = txtData
184+
185+
while !data.isEmpty {
186+
// The first byte of each record is its length, so prefix that much data
187+
let recordLength = Int(data.removeFirst())
188+
guard data.count >= recordLength else { return [:] }
189+
let recordData = data[..<(data.startIndex + recordLength)]
190+
data = data.dropFirst(recordLength)
191+
192+
guard let record = String(bytes: recordData, encoding: .utf8) else { return [:] }
193+
// The format of the entry is "key=value"
194+
// (According to the reference implementation, = is optional if there is no value,
195+
// and any equals signs after the first are part of the value.)
196+
// `ommittingEmptySubsequences` is necessary otherwise an empty string will crash the next
197+
// line
198+
let keyValue = record.split(separator: "=", maxSplits: 1, omittingEmptySubsequences: false)
199+
let key = String(keyValue[0])
200+
// If there's no value, make the value the empty string
201+
switch keyValue.count {
202+
case 1:
203+
result[key] = ""
204+
case 2:
205+
result[key] = String(keyValue[1])
206+
default:
207+
fatalError()
208+
}
209+
}
210+
211+
return result
212+
}
213+
}
185214
#endif

0 commit comments

Comments
 (0)