@@ -16,6 +16,7 @@ import Foundation
16
16
public typealias JSONObject = _Helpers . JSONObject
17
17
18
18
public actor RealtimeClientV2 {
19
+ @available ( * , deprecated, renamed: " RealtimeClientOptions " )
19
20
public struct Configuration : Sendable {
20
21
var url : URL
21
22
var apiKey : String
@@ -64,10 +65,12 @@ public actor RealtimeClientV2 {
64
65
}
65
66
}
66
67
67
- let config : Configuration
68
+ let url : URL
69
+ let options : RealtimeClientOptions
68
70
let ws : any WebSocketClient
69
71
70
72
var accessToken : String ?
73
+ let apikey : String ?
71
74
var ref = 0
72
75
var pendingHeartbeatRef : Int ?
73
76
@@ -79,34 +82,66 @@ public actor RealtimeClientV2 {
79
82
80
83
private let statusEventEmitter = EventEmitter < Status > ( initialEvent: . disconnected)
81
84
85
+ /// AsyncStream that emits when connection status change.
86
+ ///
87
+ /// You can also use ``onStatusChange(_:)`` for a closure based method.
82
88
public var statusChange : AsyncStream < Status > {
83
89
statusEventEmitter. stream ( )
84
90
}
85
91
92
+ /// The current connection status.
86
93
public private( set) var status : Status {
87
94
get { statusEventEmitter. lastEvent. value }
88
95
set { statusEventEmitter. emit ( newValue) }
89
96
}
90
97
98
+ /// Listen for connection status changes.
99
+ /// - Parameter listener: Closure that will be called when connection status changes.
100
+ /// - Returns: An observation handle that can be used to stop listening.
101
+ ///
102
+ /// - Note: Use ``statusChange`` if you prefer to use Async/Await.
91
103
public func onStatusChange(
92
104
_ listener: @escaping @Sendable ( Status ) -> Void
93
105
) -> ObservationToken {
94
106
statusEventEmitter. attach ( listener)
95
107
}
96
108
109
+ @available ( * , deprecated, renamed: " RealtimeClientV2.init(url:options:) " )
97
110
public init ( config: Configuration ) {
98
- self . init ( config: config, ws: WebSocket ( config: config) )
111
+ self . init (
112
+ url: config. url,
113
+ options: RealtimeClientOptions (
114
+ headers: config. headers,
115
+ heartbeatInterval: config. heartbeatInterval,
116
+ reconnectDelay: config. reconnectDelay,
117
+ timeoutInterval: config. timeoutInterval,
118
+ disconnectOnSessionLoss: config. disconnectOnSessionLoss,
119
+ connectOnSubscribe: config. connectOnSubscribe,
120
+ logger: config. logger
121
+ )
122
+ )
99
123
}
100
124
101
- init ( config: Configuration , ws: any WebSocketClient ) {
102
- self . config = config
103
- self . ws = ws
125
+ public init ( url: URL , options: RealtimeClientOptions ) {
126
+ self . init (
127
+ url: url,
128
+ options: options,
129
+ ws: WebSocket (
130
+ realtimeURL: Self . realtimeWebSocketURL (
131
+ baseURL: Self . realtimeBaseURL ( url: url) ,
132
+ apikey: options. apikey
133
+ ) ,
134
+ options: options
135
+ )
136
+ )
137
+ }
104
138
105
- if let customJWT = config. headers [ " Authorization " ] ? . split ( separator: " " ) . last {
106
- accessToken = String ( customJWT)
107
- } else {
108
- accessToken = config. apiKey
109
- }
139
+ init ( url: URL , options: RealtimeClientOptions , ws: any WebSocketClient ) {
140
+ self . url = url
141
+ self . options = options
142
+ self . ws = ws
143
+ accessToken = options. accessToken ?? options. apikey
144
+ apikey = options. apikey
110
145
}
111
146
112
147
deinit {
@@ -126,16 +161,16 @@ public actor RealtimeClientV2 {
126
161
if status == . disconnected {
127
162
connectionTask = Task {
128
163
if reconnect {
129
- try ? await Task . sleep ( nanoseconds: NSEC_PER_SEC * UInt64( config . reconnectDelay) )
164
+ try ? await Task . sleep ( nanoseconds: NSEC_PER_SEC * UInt64( options . reconnectDelay) )
130
165
131
166
if Task . isCancelled {
132
- config . logger? . debug ( " Reconnect cancelled, returning " )
167
+ options . logger? . debug ( " Reconnect cancelled, returning " )
133
168
return
134
169
}
135
170
}
136
171
137
172
if status == . connected {
138
- config . logger? . debug ( " WebsSocket already connected " )
173
+ options . logger? . debug ( " WebsSocket already connected " )
139
174
return
140
175
}
141
176
@@ -165,7 +200,7 @@ public actor RealtimeClientV2 {
165
200
166
201
private func onConnected( reconnect: Bool ) async {
167
202
status = . connected
168
- config . logger? . debug ( " Connected to realtime WebSocket " )
203
+ options . logger? . debug ( " Connected to realtime WebSocket " )
169
204
listenForMessages ( )
170
205
startHeartbeating ( )
171
206
if reconnect {
@@ -174,17 +209,17 @@ public actor RealtimeClientV2 {
174
209
}
175
210
176
211
private func onDisconnected( ) async {
177
- config . logger?
212
+ options . logger?
178
213
. debug (
179
- " WebSocket disconnected. Trying again in \( config . reconnectDelay) "
214
+ " WebSocket disconnected. Trying again in \( options . reconnectDelay) "
180
215
)
181
216
await reconnect ( )
182
217
}
183
218
184
219
private func onError( _ error: ( any Error ) ? ) async {
185
- config . logger?
220
+ options . logger?
186
221
. debug (
187
- " WebSocket error \( error? . localizedDescription ?? " <none> " ) . Trying again in \( config . reconnectDelay) "
222
+ " WebSocket error \( error? . localizedDescription ?? " <none> " ) . Trying again in \( options . reconnectDelay) "
188
223
)
189
224
await reconnect ( )
190
225
}
@@ -208,7 +243,7 @@ public actor RealtimeClientV2 {
208
243
topic: " realtime: \( topic) " ,
209
244
config: config,
210
245
socket: self ,
211
- logger: self . config . logger
246
+ logger: self . options . logger
212
247
)
213
248
}
214
249
@@ -224,7 +259,7 @@ public actor RealtimeClientV2 {
224
259
subscriptions[ channel. topic] = nil
225
260
226
261
if subscriptions. isEmpty {
227
- config . logger? . debug ( " No more subscribed channel in socket " )
262
+ options . logger? . debug ( " No more subscribed channel in socket " )
228
263
disconnect ( )
229
264
}
230
265
}
@@ -254,18 +289,18 @@ public actor RealtimeClientV2 {
254
289
await onMessage ( message)
255
290
}
256
291
} catch {
257
- config . logger? . debug (
258
- " Error while listening for messages. Trying again in \( config . reconnectDelay) \( error) "
292
+ options . logger? . debug (
293
+ " Error while listening for messages. Trying again in \( options . reconnectDelay) \( error) "
259
294
)
260
295
await reconnect ( )
261
296
}
262
297
}
263
298
}
264
299
265
300
private func startHeartbeating( ) {
266
- heartbeatTask = Task { [ weak self, config ] in
301
+ heartbeatTask = Task { [ weak self, options ] in
267
302
while !Task. isCancelled {
268
- try ? await Task . sleep ( nanoseconds: NSEC_PER_SEC * UInt64( config . heartbeatInterval) )
303
+ try ? await Task . sleep ( nanoseconds: NSEC_PER_SEC * UInt64( options . heartbeatInterval) )
269
304
if Task . isCancelled {
270
305
break
271
306
}
@@ -277,7 +312,7 @@ public actor RealtimeClientV2 {
277
312
private func sendHeartbeat( ) async {
278
313
if pendingHeartbeatRef != nil {
279
314
pendingHeartbeatRef = nil
280
- config . logger? . debug ( " Heartbeat timeout " )
315
+ options . logger? . debug ( " Heartbeat timeout " )
281
316
282
317
await reconnect ( )
283
318
return
@@ -297,7 +332,7 @@ public actor RealtimeClientV2 {
297
332
}
298
333
299
334
public func disconnect( ) {
300
- config . logger? . debug ( " Closing WebSocket connection " )
335
+ options . logger? . debug ( " Closing WebSocket connection " )
301
336
ref = 0
302
337
messageTask? . cancel ( )
303
338
heartbeatTask? . cancel ( )
@@ -323,9 +358,9 @@ public actor RealtimeClientV2 {
323
358
324
359
if let ref = message. ref, Int ( ref) == pendingHeartbeatRef {
325
360
pendingHeartbeatRef = nil
326
- config . logger? . debug ( " heartbeat received " )
361
+ options . logger? . debug ( " heartbeat received " )
327
362
} else {
328
- config . logger?
363
+ options . logger?
329
364
. debug ( " Received event \( message. event) for channel \( channel? . topic ?? " null " ) " )
330
365
await channel? . onMessage ( message)
331
366
}
@@ -335,14 +370,14 @@ public actor RealtimeClientV2 {
335
370
/// - Parameter message: The message to push through the socket.
336
371
public func push( _ message: RealtimeMessageV2 ) async {
337
372
guard status == . connected else {
338
- config . logger? . warning ( " Trying to push a message while socket is not connected. This is not supported yet. " )
373
+ options . logger? . warning ( " Trying to push a message while socket is not connected. This is not supported yet. " )
339
374
return
340
375
}
341
376
342
377
do {
343
378
try await ws. send ( message)
344
379
} catch {
345
- config . logger? . debug ( """
380
+ options . logger? . debug ( """
346
381
Failed to send message:
347
382
\( message)
348
383
@@ -356,10 +391,8 @@ public actor RealtimeClientV2 {
356
391
ref += 1
357
392
return ref
358
393
}
359
- }
360
394
361
- extension RealtimeClientV2 . Configuration {
362
- var realtimeBaseURL : URL {
395
+ static func realtimeBaseURL( url: URL ) -> URL {
363
396
guard var components = URLComponents ( url: url, resolvingAgainstBaseURL: false ) else {
364
397
return url
365
398
}
@@ -377,21 +410,23 @@ extension RealtimeClientV2.Configuration {
377
410
return url
378
411
}
379
412
380
- var realtimeWebSocketURL : URL {
381
- guard var components = URLComponents ( url: realtimeBaseURL , resolvingAgainstBaseURL: false )
413
+ static func realtimeWebSocketURL( baseURL : URL , apikey : String ? ) -> URL {
414
+ guard var components = URLComponents ( url: baseURL , resolvingAgainstBaseURL: false )
382
415
else {
383
- return realtimeBaseURL
416
+ return baseURL
384
417
}
385
418
386
419
components. queryItems = components. queryItems ?? [ ]
387
- components. queryItems!. append ( URLQueryItem ( name: " apikey " , value: apiKey) )
420
+ if let apikey {
421
+ components. queryItems!. append ( URLQueryItem ( name: " apikey " , value: apikey) )
422
+ }
388
423
components. queryItems!. append ( URLQueryItem ( name: " vsn " , value: " 1.0.0 " ) )
389
424
390
425
components. path. append ( " /websocket " )
391
426
components. path = components. path. replacingOccurrences ( of: " // " , with: " / " )
392
427
393
428
guard let url = components. url else {
394
- return realtimeBaseURL
429
+ return baseURL
395
430
}
396
431
397
432
return url
0 commit comments