diff --git a/Sources/Realtime/PushV2.swift b/Sources/Realtime/PushV2.swift index 884fc981..81829860 100644 --- a/Sources/Realtime/PushV2.swift +++ b/Sources/Realtime/PushV2.swift @@ -15,7 +15,8 @@ public enum PushStatus: String, Sendable { case timeout } -actor PushV2 { +@MainActor +final class PushV2 { private weak var channel: RealtimeChannelV2? let message: RealtimeMessageV2 diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 71412241..384d84af 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -32,13 +32,15 @@ public final class RealtimeChannelV2: Sendable { var pushes: [String: PushV2] = [:] } - private let mutableState = LockIsolated(MutableState()) + @MainActor + private var mutableState = MutableState() let topic: String let config: RealtimeChannelConfig let logger: (any SupabaseLogger)? let socket: RealtimeClientV2 - var joinRef: String? { mutableState.joinRef } + + @MainActor var joinRef: String? { mutableState.joinRef } let callbackManager = CallbackManager() private let statusSubject = AsyncValueSubject(.unsubscribed) @@ -81,6 +83,7 @@ public final class RealtimeChannelV2: Sendable { } /// Subscribes to the channel + @MainActor public func subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { @@ -109,7 +112,7 @@ public final class RealtimeChannelV2: Sendable { ) let joinRef = socket.makeRef() - mutableState.withValue { $0.joinRef = joinRef } + mutableState.joinRef = joinRef logger?.debug("Subscribing to channel with body: \(joinConfig)") @@ -497,8 +500,8 @@ public final class RealtimeChannelV2: Sendable { filter: filter ) - mutableState.withValue { - $0.clientChanges.append(config) + Task { @MainActor in + mutableState.clientChanges.append(config) } let id = callbackManager.addPostgresCallback(filter: config, callback: callback) @@ -538,32 +541,28 @@ public final class RealtimeChannelV2: Sendable { self.onSystem { _ in callback() } } + @MainActor @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { - let push = mutableState.withValue { - let message = RealtimeMessageV2( - joinRef: $0.joinRef, - ref: ref ?? socket.makeRef(), - topic: self.topic, - event: event, - payload: payload - ) - - let push = PushV2(channel: self, message: message) - if let ref = message.ref { - $0.pushes[ref] = push - } + let message = RealtimeMessageV2( + joinRef: joinRef, + ref: ref ?? socket.makeRef(), + topic: self.topic, + event: event, + payload: payload + ) - return push + let push = PushV2(channel: self, message: message) + if let ref = message.ref { + mutableState.pushes[ref] = push } return await push.send() } - private func didReceiveReply(ref: String, status: String) async { - let push = mutableState.withValue { - $0.pushes.removeValue(forKey: ref) - } - await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) + @MainActor + private func didReceiveReply(ref: String, status: String) { + let push = mutableState.pushes.removeValue(forKey: ref) + push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) } } diff --git a/Tests/IntegrationTests/RealtimeIntegrationTests.swift b/Tests/IntegrationTests/RealtimeIntegrationTests.swift index 5da85941..0c3c0b7e 100644 --- a/Tests/IntegrationTests/RealtimeIntegrationTests.swift +++ b/Tests/IntegrationTests/RealtimeIntegrationTests.swift @@ -24,247 +24,247 @@ struct TestLogger: SupabaseLogger { } #if !os(Android) && !os(Linux) -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class RealtimeIntegrationTests: XCTestCase { + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + final class RealtimeIntegrationTests: XCTestCase { - let testClock = TestClock() + let testClock = TestClock() - let client = SupabaseClient( - supabaseURL: URL(string: DotEnv.SUPABASE_URL) ?? URL(string: "http://localhost:54321")!, - supabaseKey: DotEnv.SUPABASE_ANON_KEY - ) - - override func setUp() { - super.setUp() + let client = SupabaseClient( + supabaseURL: URL(string: DotEnv.SUPABASE_URL) ?? URL(string: "http://localhost:54321")!, + supabaseKey: DotEnv.SUPABASE_ANON_KEY + ) - _clock = testClock - } + override func setUp() { + super.setUp() - #if !os(Windows) && !os(Linux) && !os(Android) - override func invokeTest() { - withMainSerialExecutor { - super.invokeTest() + _clock = testClock } - } - #endif - func testDisconnectByUser_shouldNotReconnect() async { - await client.realtimeV2.connect() - let status: RealtimeClientStatus = client.realtimeV2.status - XCTAssertEqual(status, .connected) + #if !os(Windows) && !os(Linux) && !os(Android) + override func invokeTest() { + withMainSerialExecutor { + super.invokeTest() + } + } + #endif - client.realtimeV2.disconnect() + func testDisconnectByUser_shouldNotReconnect() async { + await client.realtimeV2.connect() + let status: RealtimeClientStatus = client.realtimeV2.status + XCTAssertEqual(status, .connected) - /// Wait for the reconnection delay - await testClock.advance(by: .seconds(RealtimeClientOptions.defaultReconnectDelay)) + client.realtimeV2.disconnect() - XCTAssertEqual(client.realtimeV2.status, .disconnected) - } + /// Wait for the reconnection delay + await testClock.advance(by: .seconds(RealtimeClientOptions.defaultReconnectDelay)) - func testBroadcast() async throws { - let channel = client.realtimeV2.channel("integration") { - $0.broadcast.receiveOwnBroadcasts = true + XCTAssertEqual(client.realtimeV2.status, .disconnected) } - let receivedMessagesTask = Task { - await channel.broadcastStream(event: "test").prefix(3).collect() - } + func testBroadcast() async throws { + let channel = client.realtimeV2.channel("integration") { + $0.broadcast.receiveOwnBroadcasts = true + } - await Task.yield() + let receivedMessagesTask = Task { + await channel.broadcastStream(event: "test").prefix(3).collect() + } - await channel.subscribe() + await Task.yield() - struct Message: Codable { - var value: Int - } + await channel.subscribe() - try await channel.broadcast(event: "test", message: Message(value: 1)) - try await channel.broadcast(event: "test", message: Message(value: 2)) - try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42]) + struct Message: Codable { + var value: Int + } - let receivedMessages = try await withTimeout(interval: 5) { - await receivedMessagesTask.value - } + try await channel.broadcast(event: "test", message: Message(value: 1)) + try await channel.broadcast(event: "test", message: Message(value: 2)) + try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42]) - assertInlineSnapshot(of: receivedMessages, as: .json) { - """ - [ - { - "event" : "test", - "payload" : { - "value" : 1 - }, - "type" : "broadcast" - }, - { - "event" : "test", - "payload" : { - "value" : 2 + let receivedMessages = try await withTimeout(interval: 5) { + await receivedMessagesTask.value + } + + assertInlineSnapshot(of: receivedMessages, as: .json) { + """ + [ + { + "event" : "test", + "payload" : { + "value" : 1 + }, + "type" : "broadcast" }, - "type" : "broadcast" - }, - { - "event" : "test", - "payload" : { - "another_value" : 42, - "value" : 3 + { + "event" : "test", + "payload" : { + "value" : 2 + }, + "type" : "broadcast" }, - "type" : "broadcast" - } - ] - """ - } - - await channel.unsubscribe() - } - - func testBroadcastWithUnsubscribedChannel() async throws { - let channel = client.realtimeV2.channel("integration") { - $0.broadcast.acknowledgeBroadcasts = true - } - - struct Message: Codable { - var value: Int - } - - try await channel.broadcast(event: "test", message: Message(value: 1)) - try await channel.broadcast(event: "test", message: Message(value: 2)) - try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42]) - } - - func testPresence() async throws { - let channel = client.realtimeV2.channel("integration") { - $0.broadcast.receiveOwnBroadcasts = true - } - - let receivedPresenceChangesTask = Task { - await channel.presenceChange().prefix(4).collect() + { + "event" : "test", + "payload" : { + "another_value" : 42, + "value" : 3 + }, + "type" : "broadcast" + } + ] + """ + } + + await channel.unsubscribe() } - await Task.yield() + func testBroadcastWithUnsubscribedChannel() async throws { + let channel = client.realtimeV2.channel("integration") { + $0.broadcast.acknowledgeBroadcasts = true + } - await channel.subscribe() + struct Message: Codable { + var value: Int + } - struct UserState: Codable, Equatable { - let email: String + try await channel.broadcast(event: "test", message: Message(value: 1)) + try await channel.broadcast(event: "test", message: Message(value: 2)) + try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42]) } - try await channel.track(UserState(email: "test@supabase.com")) - try await channel.track(["email": "test2@supabase.com"]) + func testPresence() async throws { + let channel = client.realtimeV2.channel("integration") { + $0.broadcast.receiveOwnBroadcasts = true + } - await channel.untrack() + let receivedPresenceChangesTask = Task { + await channel.presenceChange().prefix(4).collect() + } - let receivedPresenceChanges = try await withTimeout(interval: 5) { - await receivedPresenceChangesTask.value - } - - let joins = try receivedPresenceChanges.map { try $0.decodeJoins(as: UserState.self) } - let leaves = try receivedPresenceChanges.map { try $0.decodeLeaves(as: UserState.self) } - expectNoDifference( - joins, - [ - [], // This is the first PRESENCE_STATE event. - [UserState(email: "test@supabase.com")], - [UserState(email: "test2@supabase.com")], - [], - ] - ) - - expectNoDifference( - leaves, - [ - [], // This is the first PRESENCE_STATE event. - [], - [UserState(email: "test@supabase.com")], - [UserState(email: "test2@supabase.com")], - ] - ) - - await channel.unsubscribe() - } - - func testPostgresChanges() async throws { - let channel = client.realtimeV2.channel("db-changes") - - let receivedInsertActions = Task { - await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect() - } + await Task.yield() - let receivedUpdateActions = Task { - await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect() - } + await channel.subscribe() - let receivedDeleteActions = Task { - await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect() - } + struct UserState: Codable, Equatable { + let email: String + } - let receivedAnyActionsTask = Task { - await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect() - } + try await channel.track(UserState(email: "test@supabase.com")) + try await channel.track(["email": "test2@supabase.com"]) - await Task.yield() - await channel.subscribe() + await channel.untrack() - struct Entry: Codable, Equatable { - let key: String - let value: AnyJSON - } + let receivedPresenceChanges = try await withTimeout(interval: 5) { + await receivedPresenceChangesTask.value + } - // Wait until a system event for makind sure DB change listeners are set before making DB changes. - _ = await channel.system().first(where: { _ in true }) - - let key = try await - (client.from("key_value_storage") - .insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single() - .execute().value as Entry).key - try await client.from("key_value_storage").update(["value": "value2"]).eq("key", value: key) - .execute() - try await client.from("key_value_storage").delete().eq("key", value: key).execute() - - let insertedEntries = try await receivedInsertActions.value.map { - try $0.decodeRecord( - as: Entry.self, - decoder: JSONDecoder() - ) - } - let updatedEntries = try await receivedUpdateActions.value.map { - try $0.decodeRecord( - as: Entry.self, - decoder: JSONDecoder() + let joins = try receivedPresenceChanges.map { try $0.decodeJoins(as: UserState.self) } + let leaves = try receivedPresenceChanges.map { try $0.decodeLeaves(as: UserState.self) } + expectNoDifference( + joins, + [ + [], // This is the first PRESENCE_STATE event. + [UserState(email: "test@supabase.com")], + [UserState(email: "test2@supabase.com")], + [], + ] ) - } - let deletedEntryIds = await receivedDeleteActions.value.compactMap { - $0.oldRecord["key"]?.stringValue - } - expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")]) - expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")]) - expectNoDifference(deletedEntryIds, [key]) - - let receivedAnyActions = await receivedAnyActionsTask.value - XCTAssertEqual(receivedAnyActions.count, 3) - - if case let .insert(action) = receivedAnyActions[0] { - let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) - expectNoDifference(record, Entry(key: key, value: "value1")) - } else { - XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`") - } + expectNoDifference( + leaves, + [ + [], // This is the first PRESENCE_STATE event. + [], + [UserState(email: "test@supabase.com")], + [UserState(email: "test2@supabase.com")], + ] + ) - if case let .update(action) = receivedAnyActions[1] { - let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) - expectNoDifference(record, Entry(key: key, value: "value2")) - } else { - XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`") + await channel.unsubscribe() } - if case let .delete(action) = receivedAnyActions[2] { - expectNoDifference(key, action.oldRecord["key"]?.stringValue) - } else { - XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`") + func testPostgresChanges() async throws { + let channel = client.realtimeV2.channel("db-changes") + + let receivedInsertActions = Task { + await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect() + } + + let receivedUpdateActions = Task { + await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect() + } + + let receivedDeleteActions = Task { + await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect() + } + + let receivedAnyActionsTask = Task { + await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect() + } + + await Task.yield() + await channel.subscribe() + + struct Entry: Codable, Equatable { + let key: String + let value: AnyJSON + } + + // Wait until a system event for makind sure DB change listeners are set before making DB changes. + _ = await channel.system().first(where: { _ in true }) + + let key = try await + (client.from("key_value_storage") + .insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single() + .execute().value as Entry).key + try await client.from("key_value_storage").update(["value": "value2"]).eq("key", value: key) + .execute() + try await client.from("key_value_storage").delete().eq("key", value: key).execute() + + let insertedEntries = try await receivedInsertActions.value.map { + try $0.decodeRecord( + as: Entry.self, + decoder: JSONDecoder() + ) + } + let updatedEntries = try await receivedUpdateActions.value.map { + try $0.decodeRecord( + as: Entry.self, + decoder: JSONDecoder() + ) + } + let deletedEntryIds = await receivedDeleteActions.value.compactMap { + $0.oldRecord["key"]?.stringValue + } + + expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")]) + expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")]) + expectNoDifference(deletedEntryIds, [key]) + + let receivedAnyActions = await receivedAnyActionsTask.value + XCTAssertEqual(receivedAnyActions.count, 3) + + if case let .insert(action) = receivedAnyActions[0] { + let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) + expectNoDifference(record, Entry(key: key, value: "value1")) + } else { + XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`") + } + + if case let .update(action) = receivedAnyActions[1] { + let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) + expectNoDifference(record, Entry(key: key, value: "value2")) + } else { + XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`") + } + + if case let .delete(action) = receivedAnyActions[2] { + expectNoDifference(key, action.oldRecord["key"]?.stringValue) + } else { + XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`") + } + + await channel.unsubscribe() } - - await channel.unsubscribe() } -} #endif diff --git a/Tests/RealtimeTests/_PushTests.swift b/Tests/RealtimeTests/_PushTests.swift index b644c819..f9c7f877 100644 --- a/Tests/RealtimeTests/_PushTests.swift +++ b/Tests/RealtimeTests/_PushTests.swift @@ -12,91 +12,85 @@ import XCTest @testable import Realtime -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class _PushTests: XCTestCase { - var ws: FakeWebSocket! - var socket: RealtimeClientV2! +#if !os(Android) && !os(Linux) && !os(Windows) + @MainActor + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + final class _PushTests: XCTestCase { + var ws: FakeWebSocket! + var socket: RealtimeClientV2! - #if !os(Windows) && !os(Linux) && !os(Android) - override func invokeTest() { - withMainSerialExecutor { - super.invokeTest() - } - } - #endif + override func setUp() { + super.setUp() - override func setUp() { - super.setUp() + let (client, server) = FakeWebSocket.fakes() + ws = server - let (client, server) = FakeWebSocket.fakes() - ws = server + socket = RealtimeClientV2( + url: URL(string: "https://localhost:54321/v1/realtime")!, + options: RealtimeClientOptions( + headers: ["apiKey": "apikey"] + ), + wsTransport: { _, _ in client }, + http: HTTPClientMock() + ) + } - socket = RealtimeClientV2( - url: URL(string: "https://localhost:54321/v1/realtime")!, - options: RealtimeClientOptions( - headers: ["apiKey": "apikey"] - ), - wsTransport: { _, _ in client }, - http: HTTPClientMock() - ) - } + func testPushWithoutAck() async { + let channel = RealtimeChannelV2( + topic: "realtime:users", + config: RealtimeChannelConfig( + broadcast: .init(acknowledgeBroadcasts: false), + presence: .init(), + isPrivate: false + ), + socket: socket, + logger: nil + ) + let push = PushV2( + channel: channel, + message: RealtimeMessageV2( + joinRef: nil, + ref: "1", + topic: "realtime:users", + event: "broadcast", + payload: [:] + ) + ) + + let status = await push.send() + XCTAssertEqual(status, .ok) + } - func testPushWithoutAck() async { - let channel = RealtimeChannelV2( - topic: "realtime:users", - config: RealtimeChannelConfig( - broadcast: .init(acknowledgeBroadcasts: false), - presence: .init(), - isPrivate: false - ), - socket: socket, - logger: nil - ) - let push = PushV2( - channel: channel, - message: RealtimeMessageV2( - joinRef: nil, - ref: "1", + func testPushWithAck() async { + let channel = RealtimeChannelV2( topic: "realtime:users", - event: "broadcast", - payload: [:] + config: RealtimeChannelConfig( + broadcast: .init(acknowledgeBroadcasts: true), + presence: .init(), + isPrivate: false + ), + socket: socket, + logger: nil + ) + let push = PushV2( + channel: channel, + message: RealtimeMessageV2( + joinRef: nil, + ref: "1", + topic: "realtime:users", + event: "broadcast", + payload: [:] + ) ) - ) - let status = await push.send() - XCTAssertEqual(status, .ok) - } + let task = Task { + await push.send() + } + await Task.megaYield() + push.didReceive(status: .ok) - // FIXME: Flaky test, it fails some time due the task scheduling, even tho we're using withMainSerialExecutor. - // func testPushWithAck() async { - // let channel = RealtimeChannelV2( - // topic: "realtime:users", - // config: RealtimeChannelConfig( - // broadcast: .init(acknowledgeBroadcasts: true), - // presence: .init(), - // isPrivate: false - // ), - // socket: Socket(client: socket), - // logger: nil - // ) - // let push = PushV2( - // channel: channel, - // message: RealtimeMessageV2( - // joinRef: nil, - // ref: "1", - // topic: "realtime:users", - // event: "broadcast", - // payload: [:] - // ) - // ) - // - // let task = Task { - // await push.send() - // } - // await Task.yield() - // await push.didReceive(status: .ok) - // - // let status = await task.value - // XCTAssertEqual(status, .ok) - // } -} + let status = await task.value + XCTAssertEqual(status, .ok) + } + } +#endif