Skip to content

Commit dfe09bc

Browse files
authored
feat(realtime): add closure based methods (#345)
* feat(realtime): add closure based methods * feat(realtime): add closure based method for broadcast * test: add test for closure based methods * fix ObservationTokenTests
1 parent f470874 commit dfe09bc

File tree

7 files changed

+254
-118
lines changed

7 files changed

+254
-118
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
//
2+
// RealtimeChannel+AsyncAwait.swift
3+
//
4+
//
5+
// Created by Guilherme Souza on 17/04/24.
6+
//
7+
8+
import Foundation
9+
10+
extension RealtimeChannelV2 {
11+
/// Listen for clients joining / leaving the channel using presences.
12+
public func presenceChange() -> AsyncStream<any PresenceAction> {
13+
let (stream, continuation) = AsyncStream<any PresenceAction>.makeStream()
14+
15+
let subscription = onPresenceChange {
16+
continuation.yield($0)
17+
}
18+
19+
continuation.onTermination = { _ in
20+
subscription.cancel()
21+
}
22+
23+
return stream
24+
}
25+
26+
/// Listen for postgres changes in a channel.
27+
public func postgresChange(
28+
_: InsertAction.Type,
29+
schema: String = "public",
30+
table: String? = nil,
31+
filter: String? = nil
32+
) -> AsyncStream<InsertAction> {
33+
postgresChange(event: .insert, schema: schema, table: table, filter: filter)
34+
.compactMap { $0.wrappedAction as? InsertAction }
35+
.eraseToStream()
36+
}
37+
38+
/// Listen for postgres changes in a channel.
39+
public func postgresChange(
40+
_: UpdateAction.Type,
41+
schema: String = "public",
42+
table: String? = nil,
43+
filter: String? = nil
44+
) -> AsyncStream<UpdateAction> {
45+
postgresChange(event: .update, schema: schema, table: table, filter: filter)
46+
.compactMap { $0.wrappedAction as? UpdateAction }
47+
.eraseToStream()
48+
}
49+
50+
/// Listen for postgres changes in a channel.
51+
public func postgresChange(
52+
_: DeleteAction.Type,
53+
schema: String = "public",
54+
table: String? = nil,
55+
filter: String? = nil
56+
) -> AsyncStream<DeleteAction> {
57+
postgresChange(event: .delete, schema: schema, table: table, filter: filter)
58+
.compactMap { $0.wrappedAction as? DeleteAction }
59+
.eraseToStream()
60+
}
61+
62+
/// Listen for postgres changes in a channel.
63+
public func postgresChange(
64+
_: SelectAction.Type,
65+
schema: String = "public",
66+
table: String? = nil,
67+
filter: String? = nil
68+
) -> AsyncStream<SelectAction> {
69+
postgresChange(event: .select, schema: schema, table: table, filter: filter)
70+
.compactMap { $0.wrappedAction as? SelectAction }
71+
.eraseToStream()
72+
}
73+
74+
/// Listen for postgres changes in a channel.
75+
public func postgresChange(
76+
_: AnyAction.Type,
77+
schema: String = "public",
78+
table: String? = nil,
79+
filter: String? = nil
80+
) -> AsyncStream<AnyAction> {
81+
postgresChange(event: .all, schema: schema, table: table, filter: filter)
82+
}
83+
84+
private func postgresChange(
85+
event: PostgresChangeEvent,
86+
schema: String,
87+
table: String?,
88+
filter: String?
89+
) -> AsyncStream<AnyAction> {
90+
let (stream, continuation) = AsyncStream<AnyAction>.makeStream()
91+
let subscription = _onPostgresChange(
92+
event: event,
93+
schema: schema,
94+
table: table,
95+
filter: filter
96+
) {
97+
continuation.yield($0)
98+
}
99+
continuation.onTermination = { _ in
100+
subscription.cancel()
101+
}
102+
return stream
103+
}
104+
105+
/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
106+
public func broadcastStream(event: String) -> AsyncStream<JSONObject> {
107+
let (stream, continuation) = AsyncStream<JSONObject>.makeStream()
108+
109+
let subscription = onBroadcast(event: event) {
110+
continuation.yield($0)
111+
}
112+
113+
continuation.onTermination = { _ in
114+
subscription.cancel()
115+
}
116+
117+
return stream
118+
}
119+
120+
/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
121+
@available(*, deprecated, renamed: "broadcastStream(event:)")
122+
public func broadcast(event: String) -> AsyncStream<JSONObject> {
123+
broadcastStream(event: event)
124+
}
125+
}

Sources/Realtime/V2/PushV2.swift

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,15 @@ actor PushV2 {
2020
}
2121

2222
func send() async -> PushStatus {
23-
do {
24-
try await channel?.socket?.ws.send(message)
23+
await channel?.socket?.push(message)
2524

26-
if channel?.config.broadcast.acknowledgeBroadcasts == true {
27-
return await withCheckedContinuation {
28-
receivedContinuation = $0
29-
}
25+
if channel?.config.broadcast.acknowledgeBroadcasts == true {
26+
return await withCheckedContinuation {
27+
receivedContinuation = $0
3028
}
31-
32-
return .ok
33-
} catch {
34-
await channel?.socket?.config.logger?.debug(
35-
"""
36-
Failed to send message:
37-
\(message)
38-
39-
Error:
40-
\(error)
41-
"""
42-
)
43-
return .error
4429
}
30+
31+
return .ok
4532
}
4633

4734
func didReceive(status: PushStatus) {

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 59 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public struct RealtimeChannelConfig: Sendable {
1515
}
1616

1717
public actor RealtimeChannelV2 {
18+
public typealias Subscription = ObservationToken
19+
1820
public enum Status: Sendable {
1921
case unsubscribed
2022
case subscribing
@@ -340,94 +342,85 @@ public actor RealtimeChannelV2 {
340342
}
341343

342344
/// Listen for clients joining / leaving the channel using presences.
343-
public func presenceChange() -> AsyncStream<any PresenceAction> {
344-
let (stream, continuation) = AsyncStream<any PresenceAction>.makeStream()
345-
346-
let id = callbackManager.addPresenceCallback {
347-
continuation.yield($0)
348-
}
349-
350-
let logger = logger
351-
352-
continuation.onTermination = { [weak callbackManager] _ in
345+
public func onPresenceChange(
346+
_ callback: @escaping @Sendable (any PresenceAction) -> Void
347+
) -> Subscription {
348+
let id = callbackManager.addPresenceCallback(callback: callback)
349+
return Subscription { [weak callbackManager, logger] in
353350
logger?.debug("Removing presence callback with id: \(id)")
354351
callbackManager?.removeCallback(id: id)
355352
}
356-
357-
return stream
358353
}
359354

360355
/// Listen for postgres changes in a channel.
361-
public func postgresChange(
356+
public func onPostgresChange(
362357
_: InsertAction.Type,
363358
schema: String = "public",
364359
table: String? = nil,
365-
filter: String? = nil
366-
) -> AsyncStream<InsertAction> {
367-
postgresChange(event: .insert, schema: schema, table: table, filter: filter)
368-
.compactMap { $0.wrappedAction as? InsertAction }
369-
.eraseToStream()
360+
filter: String? = nil,
361+
callback: @escaping @Sendable (InsertAction) -> Void
362+
) -> Subscription {
363+
_onPostgresChange(
364+
event: .insert,
365+
schema: schema,
366+
table: table,
367+
filter: filter
368+
) {
369+
guard case let .insert(action) = $0 else { return }
370+
callback(action)
371+
}
370372
}
371373

372374
/// Listen for postgres changes in a channel.
373-
public func postgresChange(
375+
public func onPostgresChange(
374376
_: UpdateAction.Type,
375377
schema: String = "public",
376378
table: String? = nil,
377-
filter: String? = nil
378-
) -> AsyncStream<UpdateAction> {
379-
postgresChange(event: .update, schema: schema, table: table, filter: filter)
380-
.compactMap { $0.wrappedAction as? UpdateAction }
381-
.eraseToStream()
379+
filter: String? = nil,
380+
callback: @escaping @Sendable (UpdateAction) -> Void
381+
) -> Subscription {
382+
_onPostgresChange(
383+
event: .update,
384+
schema: schema,
385+
table: table,
386+
filter: filter
387+
) {
388+
guard case let .update(action) = $0 else { return }
389+
callback(action)
390+
}
382391
}
383392

384393
/// Listen for postgres changes in a channel.
385-
public func postgresChange(
394+
public func onPostgresChange(
386395
_: DeleteAction.Type,
387396
schema: String = "public",
388397
table: String? = nil,
389-
filter: String? = nil
390-
) -> AsyncStream<DeleteAction> {
391-
postgresChange(event: .delete, schema: schema, table: table, filter: filter)
392-
.compactMap { $0.wrappedAction as? DeleteAction }
393-
.eraseToStream()
394-
}
395-
396-
/// Listen for postgres changes in a channel.
397-
public func postgresChange(
398-
_: SelectAction.Type,
399-
schema: String = "public",
400-
table: String? = nil,
401-
filter: String? = nil
402-
) -> AsyncStream<SelectAction> {
403-
postgresChange(event: .select, schema: schema, table: table, filter: filter)
404-
.compactMap { $0.wrappedAction as? SelectAction }
405-
.eraseToStream()
406-
}
407-
408-
/// Listen for postgres changes in a channel.
409-
public func postgresChange(
410-
_: AnyAction.Type,
411-
schema: String = "public",
412-
table: String? = nil,
413-
filter: String? = nil
414-
) -> AsyncStream<AnyAction> {
415-
postgresChange(event: .all, schema: schema, table: table, filter: filter)
398+
filter: String? = nil,
399+
callback: @escaping @Sendable (DeleteAction) -> Void
400+
) -> Subscription {
401+
_onPostgresChange(
402+
event: .delete,
403+
schema: schema,
404+
table: table,
405+
filter: filter
406+
) {
407+
guard case let .delete(action) = $0 else { return }
408+
callback(action)
409+
}
416410
}
417411

418-
private func postgresChange(
412+
func _onPostgresChange(
419413
event: PostgresChangeEvent,
420414
schema: String,
421415
table: String?,
422-
filter: String?
423-
) -> AsyncStream<AnyAction> {
416+
filter: String?,
417+
callback: @escaping @Sendable (AnyAction) -> Void
418+
) -> Subscription {
424419
precondition(
425420
status != .subscribed,
426421
"You cannot call postgresChange after joining the channel"
427422
)
428423

429-
let (stream, continuation) = AsyncStream<AnyAction>.makeStream()
430-
431424
let config = PostgresJoinConfig(
432425
event: event,
433426
schema: schema,
@@ -437,44 +430,23 @@ public actor RealtimeChannelV2 {
437430

438431
clientChanges.append(config)
439432

440-
let id = callbackManager.addPostgresCallback(filter: config) { action in
441-
continuation.yield(action)
442-
}
443-
444-
let logger = logger
445-
446-
continuation.onTermination = { [weak callbackManager] _ in
433+
let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
434+
return Subscription { [weak callbackManager, logger] in
447435
logger?.debug("Removing postgres callback with id: \(id)")
448436
callbackManager?.removeCallback(id: id)
449437
}
450-
451-
return stream
452438
}
453439

454-
/// Listen for broadcast messages sent by other clients within the same channel under a specific
455-
/// `event`.
456-
public func broadcastStream(event: String) -> AsyncStream<JSONObject> {
457-
let (stream, continuation) = AsyncStream<JSONObject>.makeStream()
458-
459-
let id = callbackManager.addBroadcastCallback(event: event) {
460-
continuation.yield($0)
461-
}
462-
463-
let logger = logger
464-
465-
continuation.onTermination = { [weak callbackManager] _ in
440+
/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
441+
public func onBroadcast(
442+
event: String,
443+
callback: @escaping @Sendable (JSONObject) -> Void
444+
) -> Subscription {
445+
let id = callbackManager.addBroadcastCallback(event: event, callback: callback)
446+
return Subscription { [weak callbackManager, logger] in
466447
logger?.debug("Removing broadcast callback with id: \(id)")
467448
callbackManager?.removeCallback(id: id)
468449
}
469-
470-
return stream
471-
}
472-
473-
/// Listen for broadcast messages sent by other clients within the same channel under a specific
474-
/// `event`.
475-
@available(*, deprecated, renamed: "broadcastStream(event:)")
476-
public func broadcast(event: String) -> AsyncStream<JSONObject> {
477-
broadcastStream(event: event)
478450
}
479451

480452
@discardableResult

0 commit comments

Comments
 (0)