Skip to content

Commit b6a1b0b

Browse files
authored
fix: realtime reconnection (#261)
* wip * refactor: EventEmitter * fix: client reconnect * chore: add existential any to missing places * chore: remove unused SharedStream type * Fix tests * Add Sendable requirement to Event generic type * chore: remove TestLogger * test: use withMainSerialExecutor * chore(deps): update all deps * chore: fix build on Windows * fix: do try reconnecting when connection error * chore(examples): track presence on channel subscription * test: add tests * Fix build for Windows and Linux * Add Sendable to SupabaseLogMessage
1 parent fffd5c6 commit b6a1b0b

24 files changed

+597
-465
lines changed

Examples/SlackClone/AppView.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ final class AppViewModel {
1414
var session: Session?
1515
var selectedChannel: Channel?
1616

17+
var realtimeConnectionStatus: RealtimeClientV2.Status?
18+
1719
init() {
1820
Task { [weak self] in
1921
for await (event, session) in await supabase.auth.authStateChanges {
@@ -27,23 +29,46 @@ final class AppViewModel {
2729
}
2830
}
2931
}
32+
33+
Task {
34+
for await status in await supabase.realtimeV2.statusChange {
35+
realtimeConnectionStatus = status
36+
}
37+
}
3038
}
3139
}
3240

3341
@MainActor
3442
struct AppView: View {
3543
@Bindable var model: AppViewModel
44+
let log = LogStore.shared
45+
46+
@State var logPresented = false
3647

3748
@ViewBuilder
3849
var body: some View {
3950
if model.session != nil {
4051
NavigationSplitView {
4152
ChannelListView(channel: $model.selectedChannel)
53+
.toolbar {
54+
ToolbarItem {
55+
Button("Log") {
56+
logPresented = true
57+
}
58+
}
59+
}
4260
} detail: {
4361
if let channel = model.selectedChannel {
4462
MessagesView(channel: channel).id(channel.id)
4563
}
4664
}
65+
.sheet(isPresented: $logPresented) {
66+
List {
67+
ForEach(0 ..< log.messages.count, id: \.self) { i in
68+
Text(log.messages[i].description)
69+
}
70+
}
71+
}
4772
} else {
4873
AuthView()
4974
}

Examples/SlackClone/Logger.swift

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,21 @@ extension Logger {
1313
static let main = Self(subsystem: "com.supabase.SlackClone", category: "app")
1414
}
1515

16-
final class SupabaseLoggerImpl: SupabaseLogger, @unchecked Sendable {
16+
@Observable
17+
final class LogStore: SupabaseLogger {
1718
private let lock = NSLock()
1819
private var loggers: [String: Logger] = [:]
1920

21+
static let shared = LogStore()
22+
23+
@MainActor
24+
var messages: [SupabaseLogMessage] = []
25+
2026
func log(message: SupabaseLogMessage) {
27+
Task {
28+
await add(message: message)
29+
}
30+
2131
lock.withLock {
2232
if loggers[message.system] == nil {
2333
loggers[message.system] = Logger(
@@ -29,11 +39,16 @@ final class SupabaseLoggerImpl: SupabaseLogger, @unchecked Sendable {
2939
let logger = loggers[message.system]!
3040

3141
switch message.level {
32-
case .debug: logger.debug("\(message)")
33-
case .error: logger.error("\(message)")
34-
case .verbose: logger.info("\(message)")
35-
case .warning: logger.notice("\(message)")
42+
case .debug: logger.debug("\(message, privacy: .public)")
43+
case .error: logger.error("\(message, privacy: .public)")
44+
case .verbose: logger.info("\(message, privacy: .public)")
45+
case .warning: logger.notice("\(message, privacy: .public)")
3646
}
3747
}
3848
}
49+
50+
@MainActor
51+
private func add(message: SupabaseLogMessage) {
52+
messages.insert(message, at: 0)
53+
}
3954
}

Examples/SlackClone/Supabase.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ let decoder: JSONDecoder = {
2121
}()
2222

2323
let supabase = SupabaseClient(
24-
supabaseURL: URL(string: "http://127.0.0.1:54321")!,
24+
supabaseURL: URL(string: "http://localhost:54321")!,
2525
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
2626
options: SupabaseClientOptions(
2727
db: .init(encoder: encoder, decoder: decoder),
28-
global: SupabaseClientOptions.GlobalOptions(logger: SupabaseLoggerImpl())
28+
global: SupabaseClientOptions.GlobalOptions(logger: LogStore.shared)
2929
)
3030
)

Examples/SlackClone/UserStore.swift

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ final class UserStore {
2222
let channel = await supabase.realtimeV2.channel("public:users")
2323
let changes = await channel.postgresChange(AnyAction.self, table: "users")
2424

25-
let prenseces = await channel.presenceChange()
25+
let presences = await channel.presenceChange()
2626

2727
await channel.subscribe()
2828

29-
let userId = try await supabase.auth.session.user.id
30-
try await channel.track(UserPresence(userId: userId, onlineAt: Date()))
29+
Task {
30+
let statusChange = await channel.statusChange
31+
for await _ in statusChange.filter({ $0 == .subscribed }) {
32+
let userId = try await supabase.auth.session.user.id
33+
try await channel.track(UserPresence(userId: userId, onlineAt: Date()))
34+
}
35+
}
3136

3237
Task {
3338
for await change in changes {
@@ -36,7 +41,7 @@ final class UserStore {
3641
}
3742

3843
Task {
39-
for await presence in prenseces {
44+
for await presence in presences {
4045
let joins = try presence.decodeJoins(as: UserPresence.self)
4146
let leaves = try presence.decodeLeaves(as: UserPresence.self)
4247

Package.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ let package = Package(
7979
dependencies: [
8080
"Auth",
8181
"_Helpers",
82+
"TestHelpers",
8283
.product(name: "SnapshotTesting", package: "swift-snapshot-testing"),
8384
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
8485
],
@@ -115,6 +116,7 @@ let package = Package(
115116
name: "RealtimeTests",
116117
dependencies: [
117118
"Realtime",
119+
"TestHelpers",
118120
.product(name: "CustomDump", package: "swift-custom-dump"),
119121
]
120122
),
@@ -132,6 +134,7 @@ let package = Package(
132134
]
133135
),
134136
.testTarget(name: "SupabaseTests", dependencies: ["Supabase"]),
137+
.target(name: "TestHelpers"),
135138
]
136139
)
137140

Sources/Auth/AuthClient.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ public actor AuthClient {
199199
public func onAuthStateChange(
200200
_ listener: @escaping AuthStateChangeListener
201201
) async -> some AuthStateChangeListenerRegistration {
202-
let handle = eventEmitter.attachListener(listener)
203-
await emitInitialSession(forHandle: handle)
204-
return handle
202+
let token = eventEmitter.attachListener(listener)
203+
await emitInitialSession(forToken: token)
204+
return token
205205
}
206206

207207
/// Listen for auth state changes.
@@ -908,9 +908,9 @@ public actor AuthClient {
908908
return session
909909
}
910910

911-
private func emitInitialSession(forHandle handle: AuthStateChangeListenerHandle) async {
911+
private func emitInitialSession(forToken token: ObservationToken) async {
912912
let session = try? await session
913-
eventEmitter.emit(.initialSession, session: session, handle: handle)
913+
eventEmitter.emit(.initialSession, session: session, token: token)
914914
}
915915

916916
private func prepareForPKCE() -> (codeChallenge: String?, codeChallengeMethod: String?) {

Sources/Auth/AuthStateChangeListener.swift

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,19 @@
55
// Created by Guilherme Souza on 17/02/24.
66
//
77

8+
import _Helpers
89
import ConcurrencyExtras
910
import Foundation
1011

1112
/// A listener that can be removed by calling ``AuthStateChangeListenerRegistration/remove()``.
1213
///
1314
/// - Note: Listener is automatically removed on deinit.
14-
public protocol AuthStateChangeListenerRegistration: Sendable, AnyObject {
15+
public protocol AuthStateChangeListenerRegistration: Sendable {
1516
/// Removes the listener. After the initial call, subsequent calls have no effect.
1617
func remove()
1718
}
1819

19-
final class AuthStateChangeListenerHandle: AuthStateChangeListenerRegistration {
20-
let _onRemove = LockIsolated((@Sendable () -> Void)?.none)
21-
22-
public func remove() {
23-
_onRemove.withValue {
24-
if $0 == nil {
25-
return
26-
}
27-
28-
$0?()
29-
$0 = nil
30-
}
31-
}
32-
33-
deinit {
34-
remove()
35-
}
36-
}
20+
extension ObservationToken: AuthStateChangeListenerRegistration {}
3721

3822
public typealias AuthStateChangeListener = @Sendable (
3923
_ event: AuthChangeEvent,
Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
import ConcurrencyExtras
22
import Foundation
3+
@_spi(Internal) import _Helpers
34

45
protocol EventEmitter: Sendable {
56
func attachListener(
67
_ listener: @escaping AuthStateChangeListener
7-
) -> AuthStateChangeListenerHandle
8+
) -> ObservationToken
89

910
func emit(
1011
_ event: AuthChangeEvent,
1112
session: Session?,
12-
handle: AuthStateChangeListenerHandle?
13+
token: ObservationToken?
1314
)
1415
}
1516

@@ -18,7 +19,7 @@ extension EventEmitter {
1819
_ event: AuthChangeEvent,
1920
session: Session?
2021
) {
21-
emit(event, session: session, handle: nil)
22+
emit(event, session: session, token: nil)
2223
}
2324
}
2425

@@ -27,31 +28,24 @@ final class DefaultEventEmitter: EventEmitter {
2728

2829
private init() {}
2930

30-
let listeners = LockIsolated<[ObjectIdentifier: AuthStateChangeListener]>([:])
31+
let emitter = _Helpers.EventEmitter<(AuthChangeEvent, Session?)?>(
32+
initialEvent: nil,
33+
emitsLastEventWhenAttaching: false
34+
)
3135

3236
func attachListener(
3337
_ listener: @escaping AuthStateChangeListener
34-
) -> AuthStateChangeListenerHandle {
35-
let handle = AuthStateChangeListenerHandle()
36-
let key = ObjectIdentifier(handle)
37-
38-
handle._onRemove.setValue { [weak self] in
39-
self?.listeners.withValue {
40-
$0[key] = nil
41-
}
42-
}
43-
44-
listeners.withValue {
45-
$0[key] = listener
38+
) -> ObservationToken {
39+
emitter.attach { event in
40+
guard let event else { return }
41+
listener(event.0, event.1)
4642
}
47-
48-
return handle
4943
}
5044

5145
func emit(
5246
_ event: AuthChangeEvent,
5347
session: Session?,
54-
handle: AuthStateChangeListenerHandle? = nil
48+
token: ObservationToken? = nil
5549
) {
5650
NotificationCenter.default.post(
5751
name: AuthClient.didChangeAuthStateNotification,
@@ -62,14 +56,6 @@ final class DefaultEventEmitter: EventEmitter {
6256
]
6357
)
6458

65-
let listeners = listeners.value
66-
67-
if let handle {
68-
listeners[ObjectIdentifier(handle)]?(event, session)
69-
} else {
70-
for listener in listeners.values {
71-
listener(event, session)
72-
}
73-
}
59+
emitter.emit((event, session), to: token)
7460
}
7561
}

Sources/Realtime/V2/PushV2.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ actor PushV2 {
2121

2222
func send() async -> PushStatus {
2323
do {
24-
try await channel?.socket?.ws?.send(message)
24+
try await channel?.socket?.ws.send(message)
2525

2626
if channel?.config.broadcast.acknowledgeBroadcasts == true {
2727
return await withCheckedContinuation {

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,20 @@ public actor RealtimeChannelV2 {
3333
let logger: (any SupabaseLogger)?
3434

3535
private let callbackManager = CallbackManager()
36-
private let statusStream = SharedStream<Status>(initialElement: .unsubscribed)
36+
37+
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
3738

3839
private var clientChanges: [PostgresJoinConfig] = []
3940
private var joinRef: String?
4041
private var pushes: [String: PushV2] = [:]
4142

4243
public private(set) var status: Status {
43-
get { statusStream.lastElement }
44-
set { statusStream.yield(newValue) }
44+
get { statusEventEmitter.lastEvent.value }
45+
set { statusEventEmitter.emit(newValue) }
4546
}
4647

4748
public var statusChange: AsyncStream<Status> {
48-
statusStream.makeStream()
49+
statusEventEmitter.stream()
4950
}
5051

5152
init(

0 commit comments

Comments
 (0)