Skip to content

Commit e6f20b2

Browse files
committed
fix(realtime): losing postgres_changes on resubscribe
1 parent aa5ba6e commit e6f20b2

File tree

6 files changed

+71
-109
lines changed

6 files changed

+71
-109
lines changed

Examples/SlackClone/AppView.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ final class AppViewModel {
1515
var session: Session?
1616
var selectedChannel: Channel?
1717

18-
var realtimeConnectionStatus: RealtimeClientV2.Status?
18+
var realtimeConnectionStatus: RealtimeClientStatus?
1919

2020
init() {
2121
Task {
2222
for await (event, session) in supabase.auth.authStateChanges {
2323
Logger.main.debug("AuthStateChange: \(event.rawValue)")
24-
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return }
24+
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else {
25+
return
26+
}
2527
self.session = session
2628

2729
if session == nil {

Examples/SlackClone/Supabase.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ let decoder: JSONDecoder = {
2121
}()
2222

2323
let supabase = SupabaseClient(
24-
supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!,
25-
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY",
24+
supabaseURL: URL(string: "http://127.0.0.1:54321")!,
25+
supabaseKey:
26+
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
2627
options: SupabaseClientOptions(
2728
db: .init(encoder: encoder, decoder: decoder),
2829
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")),

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import ConcurrencyExtras
99
import Foundation
10-
import Helpers
1110
import HTTPTypes
11+
import Helpers
1212

1313
#if canImport(FoundationNetworking)
1414
import FoundationNetworking
@@ -59,7 +59,9 @@ extension Socket {
5959
addChannel: { [weak client] in client?.addChannel($0) },
6060
removeChannel: { [weak client] in await client?.removeChannel($0) },
6161
push: { [weak client] in await client?.push($0) },
62-
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
62+
httpSend: { [weak client] in
63+
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
64+
}
6365
)
6466
}
6567
}
@@ -128,11 +130,6 @@ public final class RealtimeChannelV2: Sendable {
128130
await socket.connect()
129131
}
130132

131-
guard status != .subscribed else {
132-
logger?.warning("Channel \(topic) is already subscribed")
133-
return
134-
}
135-
136133
socket.addChannel(self)
137134

138135
status = .subscribing
@@ -185,7 +182,8 @@ public final class RealtimeChannelV2: Sendable {
185182
@available(
186183
*,
187184
deprecated,
188-
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
185+
message:
186+
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
189187
)
190188
public func updateAuth(jwt: String?) async {
191189
logger?.debug("Updating auth token for channel \(topic)")
@@ -238,8 +236,8 @@ public final class RealtimeChannelV2: Sendable {
238236
event: event,
239237
payload: message,
240238
private: config.isPrivate
241-
),
242-
],
239+
)
240+
]
243241
]
244242
)
245243
)

Sources/Realtime/V2/RealtimeClientV2.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import ConcurrencyExtras
99
import Foundation
1010
import Helpers
11+
import Network
1112

1213
#if canImport(FoundationNetworking)
1314
import FoundationNetworking
@@ -345,15 +346,18 @@ public final class RealtimeClientV2: Sendable {
345346
}
346347

347348
/// Disconnects client.
348-
public func disconnect() {
349+
/// - Parameters:
350+
/// - code: A numeric status code to send on disconnect.
351+
/// - reason: A custom reason for the disconnect.
352+
public func disconnect(code: Int? = nil, reason: String? = nil) {
349353
options.logger?.debug("Closing WebSocket connection")
350354
mutableState.withValue {
351355
$0.ref = 0
352356
$0.messageTask?.cancel()
353357
$0.heartbeatTask?.cancel()
354358
$0.connectionTask?.cancel()
355359
}
356-
ws.disconnect()
360+
ws.disconnect(code: code, reason: reason)
357361
status = .disconnected
358362
}
359363

@@ -490,8 +494,6 @@ public final class RealtimeClientV2: Sendable {
490494
}
491495
}
492496

493-
import Network
494-
495497
final class NetworkMonitor: @unchecked Sendable {
496498
static let shared = NetworkMonitor()
497499

Sources/Realtime/V2/WebSocketClient.swift

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import Helpers
1313
import FoundationNetworking
1414
#endif
1515

16+
enum WebSocketClientError: Error {
17+
case unsupportedData
18+
}
19+
1620
enum ConnectionStatus {
1721
case connected
1822
case disconnected(reason: String, code: URLSessionWebSocketTask.CloseCode)
@@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable {
2327
func send(_ message: RealtimeMessageV2) async throws
2428
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error>
2529
func connect() -> AsyncStream<ConnectionStatus>
26-
func disconnect()
30+
func disconnect(code: Int?, reason: String?)
2731
}
2832

2933
final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable {
@@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
3337

3438
struct MutableState {
3539
var continuation: AsyncStream<ConnectionStatus>.Continuation?
36-
var connection: WebSocketConnection<RealtimeMessageV2, RealtimeMessageV2>?
40+
var task: URLSessionWebSocketTask?
3741
}
3842

3943
private let mutableState = LockIsolated(MutableState())
@@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
4751
logger = options.logger
4852
}
4953

54+
deinit {
55+
mutableState.task?.cancel(with: .goingAway, reason: nil)
56+
}
57+
5058
func connect() -> AsyncStream<ConnectionStatus> {
5159
mutableState.withValue { state in
5260
let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
5361
let task = session.webSocketTask(with: realtimeURL)
54-
state.connection = WebSocketConnection(task: task)
62+
state.task = task
5563
task.resume()
5664

5765
let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
@@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
6068
}
6169
}
6270

63-
func disconnect() {
71+
func disconnect(code: Int?, reason: String?) {
6472
mutableState.withValue { state in
65-
state.connection?.close()
73+
if let code {
74+
state.task?.cancel(
75+
with: URLSessionWebSocketTask.CloseCode(rawValue: code) ?? .invalid,
76+
reason: reason?.data(using: .utf8))
77+
} else {
78+
state.task?.cancel()
79+
}
6680
}
6781
}
6882

6983
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
70-
guard let connection = mutableState.connection else {
71-
return .finished(
72-
throwing: RealtimeError(
73-
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
74-
)
75-
)
84+
AsyncThrowingStream { [weak self] in
85+
guard let self else { return nil }
86+
87+
let task = mutableState.task
88+
89+
guard
90+
let message = try await task?.receive(),
91+
!Task.isCancelled
92+
else { return nil }
93+
94+
switch message {
95+
case .data(let data):
96+
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
97+
return message
98+
99+
case .string(let string):
100+
guard let data = string.data(using: .utf8) else {
101+
throw WebSocketClientError.unsupportedData
102+
}
103+
104+
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
105+
return message
106+
107+
@unknown default:
108+
assertionFailure("Unsupported message type.")
109+
task?.cancel(with: .unsupportedData, reason: nil)
110+
throw WebSocketClientError.unsupportedData
111+
}
76112
}
77-
78-
return connection.receive()
79113
}
80114

81115
func send(_ message: RealtimeMessageV2) async throws {
82116
logger?.verbose("Sending message: \(message)")
83-
try await mutableState.connection?.send(message)
117+
118+
let data = try JSONEncoder().encode(message)
119+
try await mutableState.task?.send(.data(data))
84120
}
85121

86122
// MARK: - URLSessionWebSocketDelegate

Sources/Realtime/V2/WebSocketConnection.swift

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)