Skip to content

feat(realtime): add closure based methods #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions Sources/Realtime/RealtimeChannel+AsyncAwait.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//
// RealtimeChannel+AsyncAwait.swift
//
//
// Created by Guilherme Souza on 17/04/24.
//

import Foundation

extension RealtimeChannelV2 {
/// Listen for clients joining / leaving the channel using presences.
public func presenceChange() -> AsyncStream<any PresenceAction> {
let (stream, continuation) = AsyncStream<any PresenceAction>.makeStream()

let subscription = onPresenceChange {
continuation.yield($0)
}

continuation.onTermination = { _ in
subscription.cancel()
}

return stream
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: InsertAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<InsertAction> {
postgresChange(event: .insert, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? InsertAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: UpdateAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<UpdateAction> {
postgresChange(event: .update, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? UpdateAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: DeleteAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<DeleteAction> {
postgresChange(event: .delete, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? DeleteAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: SelectAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<SelectAction> {
postgresChange(event: .select, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? SelectAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: AnyAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<AnyAction> {
postgresChange(event: .all, schema: schema, table: table, filter: filter)
}

private func postgresChange(
event: PostgresChangeEvent,
schema: String,
table: String?,
filter: String?
) -> AsyncStream<AnyAction> {
let (stream, continuation) = AsyncStream<AnyAction>.makeStream()
let subscription = _onPostgresChange(
event: event,
schema: schema,
table: table,
filter: filter
) {
continuation.yield($0)
}
continuation.onTermination = { _ in
subscription.cancel()
}
return stream
}

/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
public func broadcastStream(event: String) -> AsyncStream<JSONObject> {
let (stream, continuation) = AsyncStream<JSONObject>.makeStream()

let subscription = onBroadcast(event: event) {
continuation.yield($0)
}

continuation.onTermination = { _ in
subscription.cancel()
}

return stream
}

/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
@available(*, deprecated, renamed: "broadcastStream(event:)")
public func broadcast(event: String) -> AsyncStream<JSONObject> {
broadcastStream(event: event)
}
}
25 changes: 6 additions & 19 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,15 @@ actor PushV2 {
}

func send() async -> PushStatus {
do {
try await channel?.socket?.ws.send(message)
await channel?.socket?.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
return await withCheckedContinuation {
receivedContinuation = $0
}
if channel?.config.broadcast.acknowledgeBroadcasts == true {
return await withCheckedContinuation {
receivedContinuation = $0
}

return .ok
} catch {
await channel?.socket?.config.logger?.debug(
"""
Failed to send message:
\(message)

Error:
\(error)
"""
)
return .error
}

return .ok
}

func didReceive(status: PushStatus) {
Expand Down
146 changes: 59 additions & 87 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public struct RealtimeChannelConfig: Sendable {
}

public actor RealtimeChannelV2 {
public typealias Subscription = ObservationToken

public enum Status: Sendable {
case unsubscribed
case subscribing
Expand Down Expand Up @@ -340,94 +342,85 @@ public actor RealtimeChannelV2 {
}

/// Listen for clients joining / leaving the channel using presences.
public func presenceChange() -> AsyncStream<any PresenceAction> {
let (stream, continuation) = AsyncStream<any PresenceAction>.makeStream()

let id = callbackManager.addPresenceCallback {
continuation.yield($0)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
public func onPresenceChange(
_ callback: @escaping @Sendable (any PresenceAction) -> Void
) -> Subscription {
let id = callbackManager.addPresenceCallback(callback: callback)
return Subscription { [weak callbackManager, logger] in
logger?.debug("Removing presence callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

return stream
}

/// Listen for postgres changes in a channel.
public func postgresChange(
public func onPostgresChange(
_: InsertAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<InsertAction> {
postgresChange(event: .insert, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? InsertAction }
.eraseToStream()
filter: String? = nil,
callback: @escaping @Sendable (InsertAction) -> Void
) -> Subscription {
_onPostgresChange(
event: .insert,
schema: schema,
table: table,
filter: filter
) {
guard case let .insert(action) = $0 else { return }
callback(action)
}
}

/// Listen for postgres changes in a channel.
public func postgresChange(
public func onPostgresChange(
_: UpdateAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<UpdateAction> {
postgresChange(event: .update, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? UpdateAction }
.eraseToStream()
filter: String? = nil,
callback: @escaping @Sendable (UpdateAction) -> Void
) -> Subscription {
_onPostgresChange(
event: .update,
schema: schema,
table: table,
filter: filter
) {
guard case let .update(action) = $0 else { return }
callback(action)
}
}

/// Listen for postgres changes in a channel.
public func postgresChange(
public func onPostgresChange(
_: DeleteAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<DeleteAction> {
postgresChange(event: .delete, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? DeleteAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: SelectAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<SelectAction> {
postgresChange(event: .select, schema: schema, table: table, filter: filter)
.compactMap { $0.wrappedAction as? SelectAction }
.eraseToStream()
}

/// Listen for postgres changes in a channel.
public func postgresChange(
_: AnyAction.Type,
schema: String = "public",
table: String? = nil,
filter: String? = nil
) -> AsyncStream<AnyAction> {
postgresChange(event: .all, schema: schema, table: table, filter: filter)
filter: String? = nil,
callback: @escaping @Sendable (DeleteAction) -> Void
) -> Subscription {
_onPostgresChange(
event: .delete,
schema: schema,
table: table,
filter: filter
) {
guard case let .delete(action) = $0 else { return }
callback(action)
}
}

private func postgresChange(
func _onPostgresChange(
event: PostgresChangeEvent,
schema: String,
table: String?,
filter: String?
) -> AsyncStream<AnyAction> {
filter: String?,
callback: @escaping @Sendable (AnyAction) -> Void
) -> Subscription {
precondition(
status != .subscribed,
"You cannot call postgresChange after joining the channel"
)

let (stream, continuation) = AsyncStream<AnyAction>.makeStream()

let config = PostgresJoinConfig(
event: event,
schema: schema,
Expand All @@ -437,44 +430,23 @@ public actor RealtimeChannelV2 {

clientChanges.append(config)

let id = callbackManager.addPostgresCallback(filter: config) { action in
continuation.yield(action)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return Subscription { [weak callbackManager, logger] in
logger?.debug("Removing postgres callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

return stream
}

/// Listen for broadcast messages sent by other clients within the same channel under a specific
/// `event`.
public func broadcastStream(event: String) -> AsyncStream<JSONObject> {
let (stream, continuation) = AsyncStream<JSONObject>.makeStream()

let id = callbackManager.addBroadcastCallback(event: event) {
continuation.yield($0)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
public func onBroadcast(
event: String,
callback: @escaping @Sendable (JSONObject) -> Void
) -> Subscription {
let id = callbackManager.addBroadcastCallback(event: event, callback: callback)
return Subscription { [weak callbackManager, logger] in
logger?.debug("Removing broadcast callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

return stream
}

/// Listen for broadcast messages sent by other clients within the same channel under a specific
/// `event`.
@available(*, deprecated, renamed: "broadcastStream(event:)")
public func broadcast(event: String) -> AsyncStream<JSONObject> {
broadcastStream(event: event)
}

@discardableResult
Expand Down
Loading