Skip to content

Commit de33be1

Browse files
committed
refactor reconnection logic, split out into new file
1 parent f0fae9e commit de33be1

File tree

12 files changed

+367
-158
lines changed

12 files changed

+367
-158
lines changed

Sources/SwiftOCA/OCF/Errors.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public enum Ocp1Error: Error, Equatable {
3434
case invalidProtocolVersion
3535
case invalidProxyMethodResponse
3636
case invalidSyncValue
37+
case missingKeepalive
3738
case noConnectionDelegate
3839
case noInitialValue
3940
case noMatchingTypeForClass

Sources/SwiftOCA/OCP.1/Backend/Ocp1CFSocketConnection.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,14 +401,14 @@ public class Ocp1CFSocketConnection: Ocp1Connection {
401401
deviceAddress.family
402402
}
403403

404-
override func connectDevice() async throws {
404+
override public func connectDevice() async throws {
405405
socket = try await _CFSocketWrapper(address: deviceAddress, type: type)
406406
try await super.connectDevice()
407407
}
408408

409-
override public func disconnectDevice(clearObjectCache: Bool) async throws {
409+
override public func disconnectDevice() async throws {
410410
socket = nil
411-
try await super.disconnectDevice(clearObjectCache: clearObjectCache)
411+
try await super.disconnectDevice()
412412
}
413413
}
414414

Sources/SwiftOCA/OCP.1/Backend/Ocp1FlyingSocksConnection.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public class Ocp1FlyingSocksConnection: Ocp1Connection {
123123
try? asyncSocket?.close()
124124
}
125125

126-
override func connectDevice() async throws {
126+
override public func connectDevice() async throws {
127127
let socket = try Socket(domain: Int32(deviceAddress.family), type: socketType)
128128
try? setSocketOptions(socket)
129129
// also connect UDP sockets to ensure we do not receive unsolicited replies
@@ -135,13 +135,13 @@ public class Ocp1FlyingSocksConnection: Ocp1Connection {
135135
try await super.connectDevice()
136136
}
137137

138-
override public func disconnectDevice(clearObjectCache: Bool) async throws {
138+
override public func disconnectDevice() async throws {
139139
await AsyncSocketPoolMonitor.shared.stop()
140140
if let asyncSocket {
141141
try asyncSocket.close()
142142
self.asyncSocket = nil
143143
}
144-
try await super.disconnectDevice(clearObjectCache: clearObjectCache)
144+
try await super.disconnectDevice()
145145
}
146146

147147
public convenience init(

Sources/SwiftOCA/OCP.1/Backend/Ocp1IORingConnection.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class Ocp1IORingConnection: Ocp1Connection {
7979
)
8080
}
8181

82-
override func connectDevice() async throws {
82+
override public func connectDevice() async throws {
8383
let socket = try Socket(
8484
ring: IORing.shared,
8585
domain: deviceAddress.family,
@@ -94,9 +94,9 @@ public class Ocp1IORingConnection: Ocp1Connection {
9494
try await super.connectDevice()
9595
}
9696

97-
override public func disconnectDevice(clearObjectCache: Bool) async throws {
97+
override public func disconnectDevice() async throws {
9898
socket = nil
99-
try await super.disconnectDevice(clearObjectCache: clearObjectCache)
99+
try await super.disconnectDevice()
100100
}
101101

102102
fileprivate func withMappedError<T: Sendable>(
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
//
2+
// Copyright (c) 2023-2024 PADL Software Pty Ltd
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the License);
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an 'AS IS' BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import AsyncAlgorithms
18+
import AsyncExtensions
19+
@preconcurrency
20+
import Foundation
21+
import Logging
22+
#if canImport(Combine)
23+
import Combine
24+
#elseif canImport(OpenCombine)
25+
import OpenCombine
26+
#endif
27+
28+
private extension Ocp1Error {
29+
var connectionState: Ocp1ConnectionState? {
30+
switch self {
31+
case .notConnected:
32+
.notConnected
33+
case .connectionTimeout:
34+
.connectionTimedOut
35+
default:
36+
nil
37+
}
38+
}
39+
}
40+
41+
private extension Error {
42+
var ocp1ConnectionState: Ocp1ConnectionState {
43+
(self as? Ocp1Error)?.connectionState ?? .connectionFailed
44+
}
45+
}
46+
47+
private extension Ocp1ConnectionState {
48+
var error: Ocp1Error? {
49+
switch self {
50+
case .notConnected:
51+
fallthrough
52+
case .connectionFailed:
53+
return .notConnected
54+
case .connectionTimedOut:
55+
return .connectionTimeout
56+
default:
57+
return nil
58+
}
59+
}
60+
}
61+
62+
extension Ocp1Connection {
63+
enum ReconnectionPolicy {
64+
/// do not try to automatically reconnect on connection failure
65+
case noReconnect
66+
/// try to reconnect in the keepAlive monitor task
67+
case reconnectInMonitor
68+
/// try to reconnect before sending the next message
69+
case reconnectOnSend
70+
}
71+
72+
/// start receiveMessages/keepAlive monitor task
73+
private func _startMonitor() {
74+
let monitor = Monitor(self)
75+
monitorTask = Task {
76+
try await monitor.run()
77+
}
78+
self.monitor = monitor
79+
}
80+
81+
/// stop receiveMessages/keepAlive monitor task
82+
private func _stopMonitor() {
83+
if let monitor {
84+
monitor.stop()
85+
self.monitor = nil
86+
}
87+
if let monitorTask {
88+
monitorTask.cancel()
89+
self.monitorTask = nil
90+
}
91+
}
92+
93+
private func _didConnectDevice() async throws {
94+
_updateConnectionState(.connected)
95+
96+
_startMonitor()
97+
98+
if heartbeatTime > .zero {
99+
// send keepalive to open UDP connection
100+
try await sendKeepAlive()
101+
}
102+
103+
#if canImport(Combine) || canImport(OpenCombine)
104+
objectWillChange.send()
105+
#endif
106+
107+
await refreshSubscriptions()
108+
await refreshCachedObjectProperties()
109+
await _refreshDeviceTreeWithPolicy()
110+
111+
logger.info("connected to \(self)")
112+
}
113+
114+
private func _didDisconnectDevice(clearObjectCache: Bool) async {
115+
if clearObjectCache {
116+
await self.clearObjectCache()
117+
}
118+
119+
#if canImport(Combine) || canImport(OpenCombine)
120+
objectWillChange.send()
121+
#endif
122+
123+
logger.info("disconnected from \(self)")
124+
}
125+
126+
/// disconnect from the OCA device, retaining the object cache
127+
private func _disconnectDeviceAfterConnectionFailure() async throws {
128+
try await disconnectDevice()
129+
await _didDisconnectDevice(clearObjectCache: false)
130+
}
131+
132+
///
133+
/// Re-connection logic is as follows:
134+
///
135+
/// * If the connection has a heartbeat, then automatic reconnection is only
136+
/// managed in the / heartbeat task
137+
///
138+
/// * If the connection does not have a heartbeat, than automatic
139+
/// reconnection is managed when / sending a PDU
140+
///
141+
private var _reconnectionPolicy: ReconnectionPolicy {
142+
if !options.flags.contains(.automaticReconnect) {
143+
.noReconnect
144+
} else if heartbeatTime == .zero {
145+
.reconnectOnSend
146+
} else {
147+
.reconnectInMonitor
148+
}
149+
}
150+
151+
/// functions with `_` prefix (with the exception of `_updateConnectionState()`) expect the
152+
/// caller to update the connection state
153+
154+
private func _updateConnectionState(_ connectionState: Ocp1ConnectionState) {
155+
_connectionState.send(connectionState)
156+
}
157+
158+
/// connect to the OCA device, throwing `Ocp1Error.connectionTimeout` if it times out
159+
private func _connectDeviceWithTimeout() async throws {
160+
do {
161+
try await withThrowingTimeout(of: options.connectionTimeout) {
162+
try await self.connectDevice()
163+
}
164+
} catch Ocp1Error.responseTimeout {
165+
throw Ocp1Error.connectionTimeout
166+
} catch {
167+
throw error
168+
}
169+
}
170+
171+
private func _refreshDeviceTreeWithPolicy() async {
172+
if options.flags.contains(.refreshDeviceTreeOnConnection) {
173+
logger.trace("refreshing device tree")
174+
try? await refreshDeviceTree()
175+
}
176+
}
177+
178+
/// reconnect to the OCA device with exponential backoff, updating
179+
/// connectionState
180+
func reconnectDeviceWithBackoff() async throws {
181+
var lastError: Error?
182+
var backoff: Duration = options.reconnectPauseInterval
183+
184+
_updateConnectionState(.reconnecting)
185+
186+
for i in 0..<options.reconnectMaxTries {
187+
do {
188+
logger.trace("reconnection attempt \(i + 1)")
189+
try await _connectDeviceWithTimeout()
190+
try await _didConnectDevice()
191+
break
192+
} catch {
193+
lastError = error
194+
if options.reconnectExponentialBackoffThreshold.contains(i) {
195+
backoff *= 2
196+
}
197+
logger.trace("reconnection failed with \(error), sleeping for \(backoff)")
198+
try await Task.sleep(for: backoff)
199+
}
200+
}
201+
202+
if let lastError {
203+
logger.trace("gave up attempting to reconnect: \(lastError)")
204+
_updateConnectionState(lastError.ocp1ConnectionState)
205+
throw lastError
206+
} else if !isConnected {
207+
logger.trace("gave up attempting to reconnect")
208+
_updateConnectionState(.notConnected)
209+
throw Ocp1Error.notConnected
210+
}
211+
}
212+
213+
private var _needsReconnectOnSend: Bool {
214+
guard _reconnectionPolicy == .reconnectOnSend else { return false }
215+
216+
switch _connectionState.value {
217+
case .notConnected:
218+
fallthrough
219+
case .connectionTimedOut:
220+
fallthrough
221+
case .connectionFailed:
222+
return true
223+
default:
224+
return false
225+
}
226+
}
227+
228+
func willSendMessage() async throws {
229+
guard _needsReconnectOnSend else { return }
230+
231+
logger.trace("reconnecting before sending message")
232+
try await reconnectDeviceWithBackoff()
233+
}
234+
235+
func didSendMessage(error: Ocp1Error? = nil) async throws {
236+
if error == nil {
237+
lastMessageSentTime = Monitor.now
238+
}
239+
240+
if _reconnectionPolicy != .reconnectInMonitor, let error,
241+
let connectionState = error.connectionState
242+
{
243+
logger
244+
.trace(
245+
"failed to send message: error \(error), new connection state \(connectionState); disconnecting"
246+
)
247+
if isConnected {
248+
_updateConnectionState(connectionState)
249+
try await _disconnectDeviceAfterConnectionFailure()
250+
}
251+
}
252+
}
253+
254+
func didMissKeepAlive() async throws {
255+
guard _connectionState.value != .reconnecting else { return }
256+
257+
logger.trace("missed keepalive, timing out connection with policy \(_reconnectionPolicy)")
258+
259+
_updateConnectionState(.connectionTimedOut)
260+
261+
if _reconnectionPolicy == .reconnectInMonitor {
262+
try await _disconnectDeviceAfterConnectionFailure()
263+
Task { try await reconnectDeviceWithBackoff() }
264+
}
265+
266+
throw Ocp1Error.missingKeepalive
267+
}
268+
269+
public var isConnected: Bool {
270+
_connectionState.value == .connected
271+
}
272+
}
273+
274+
/// Public API
275+
public extension Ocp1Connection {
276+
func connect() async throws {
277+
logger.trace("connecting...")
278+
279+
_updateConnectionState(.connecting)
280+
281+
do {
282+
try await _connectDeviceWithTimeout()
283+
} catch {
284+
logger.trace("connection failed: \(error)")
285+
_updateConnectionState(error.ocp1ConnectionState)
286+
throw error
287+
}
288+
289+
let connectionState = _connectionState.value
290+
if connectionState == .connecting {
291+
try await _didConnectDevice()
292+
} else if connectionState != .connected {
293+
logger.trace("connection failed whilst attempting to connect: \(connectionState)")
294+
throw connectionState.error ?? .notConnected
295+
}
296+
}
297+
298+
func disconnect() async throws {
299+
await removeSubscriptions()
300+
301+
_updateConnectionState(.notConnected)
302+
303+
let clearObjectCache = !options.flags.contains(.retainObjectCacheAfterDisconnect)
304+
try await disconnectDevice()
305+
await _didDisconnectDevice(clearObjectCache: clearObjectCache)
306+
}
307+
}

0 commit comments

Comments
 (0)