Skip to content

Commit da674ff

Browse files
authored
Merge pull request #40 from Clearcals/master
Added support for PubSub
2 parents 66f1af0 + 13cc926 commit da674ff

16 files changed

+664
-1
lines changed

Package.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ let package = Package(
88
products: [
99
.library(
1010
name: "GoogleCloudKit",
11-
targets: ["Core", "Storage", "Datastore", "SecretManager"]
11+
targets: ["Core", "Storage", "Datastore", "SecretManager", "PubSub"]
1212
),
1313
.library(
1414
name: "GoogleCloudCore",
@@ -30,6 +30,10 @@ let package = Package(
3030
name: "GoogleCloudTranslation",
3131
targets: ["Translation"]
3232
),
33+
.library(
34+
name: "GoogleCloudPubSub",
35+
targets: ["PubSub"]
36+
),
3337
],
3438
dependencies: [
3539
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.2.0"),
@@ -72,6 +76,13 @@ let package = Package(
7276
],
7377
path: "Translation/Sources"
7478
),
79+
.target(
80+
name: "PubSub",
81+
dependencies: [
82+
.target(name: "Core")
83+
],
84+
path: "PubSub/Sources/"
85+
),
7586
.testTarget(
7687
name: "CoreTests",
7788
dependencies: [
@@ -101,5 +112,12 @@ let package = Package(
101112
],
102113
path: "Translation/Tests/"
103114
),
115+
.testTarget(
116+
name: "PubSubTests",
117+
dependencies: [
118+
.target(name: "PubSub")
119+
],
120+
path: "PubSub/Tests/"
121+
),
104122
]
105123
)
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
//
2+
// SubscriptionsAPI.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/21/21.
6+
//
7+
8+
import Core
9+
import NIO
10+
import AsyncHTTPClient
11+
import Foundation
12+
13+
public protocol SubscriptionsAPI {
14+
/// Gets the configuration details of a subscription.
15+
///
16+
/// - parameter `subscriptionId`: The name of the subscription to get
17+
/// - returns: An instance of the `Subscription`
18+
func get(subscriptionId: String) -> EventLoopFuture<GoogleCloudPubSubSubscription>
19+
20+
/// Acknowledges the messages associated with the ackIds in the AcknowledgeRequest.
21+
///
22+
/// - parameters `subscriptionId`: ID of the subscription whose message is being acknowledged
23+
/// `ackIds`: The acknowledgment ID for the messages being acknowledged that was returned by the Pub/Sub system in the subscriptions.pull response. Must not be empty.
24+
func acknowledge(subscriptionId: String, ackIds: [String]) -> EventLoopFuture<EmptyResponse>
25+
26+
/// Creates a subscription to a given topic.
27+
/// - parameter `subscriptionId`: The name of the subscription to be created.
28+
/// `topicId`: The name of the topic from which this subscription is receiving messages.
29+
/// `pushEndpoint`: A URL locating the endpoint to which messages should be pushed.
30+
/// `pushConfigAttributes`: Endpoint configuration attributes that can be used to control different aspects of the message delivery.
31+
/// `pushConfigOidcTokenServiceAccountEmail`:Service account email to be used for generating the OIDC token.
32+
/// `pushConfigOidcTokenAudience`: Audience to be used when generating OIDC token.
33+
/// `ackDeadlineSeconds`: The approximate amount of time (on a best-effort basis) Pub/Sub waits for the subscriber to acknowledge receipt before resending the message
34+
/// `retainAckedMessages`: Indicates whether to retain acknowledged messages.
35+
/// `messageRetentionDuration`: How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published
36+
/// `labels`: An object containing a list of "key": value pairs.
37+
/// `enableMessageOrdering`: If true, messages published with the same orderingKey in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order.
38+
/// `expirationPolicyTTL`: A policy that specifies the conditions for this subscription's expiration.
39+
/// `filter`: An expression written in the Pub/Sub filter language.
40+
/// `deadLetterPolicyTopic`: The name of the topic to which dead letter messages should be published.
41+
/// `deadLetterPolicyMaxDeliveryAttempts`: The maximum number of delivery attempts for any message.
42+
/// `retryPolicyMinimumBackoff`: The minimum delay between consecutive deliveries of a given message.
43+
/// `retryPolicyMaximumBackoff`: The maximum delay between consecutive deliveries of a given message.
44+
/// `detached`: Indicates whether the subscription is detached from its topic.
45+
///
46+
/// - returns: If successful, the response body contains a newly created instance of Subscription.
47+
/// If the subscription already exists, returns ALREADY_EXISTS. If the corresponding topic doesn't exist, returns NOT_FOUND.
48+
func create(subscriptionId: String,
49+
topicId: String,
50+
pushEndpoint: String?,
51+
pushConfigAttributes: [String: String]?,
52+
pushConfigOidcTokenServiceAccountEmail: String?,
53+
pushConfigOidcTokenAudience: String?,
54+
ackDeadlineSeconds: Int?,
55+
retainAckedMessages: Bool?,
56+
messageRetentionDuration: String?,
57+
labels: [String: String]?,
58+
enableMessageOrdering: Bool?,
59+
expirationPolicyTTL: String?,
60+
filter: String?,
61+
deadLetterPolicyTopic: String?,
62+
deadLetterPolicyMaxDeliveryAttempts: Int?,
63+
retryPolicyMinimumBackoff: String?,
64+
retryPolicyMaximumBackoff: String?,
65+
detached: Bool?) -> EventLoopFuture<GoogleCloudPubSubSubscription>
66+
}
67+
68+
public final class GoogleCloudPubSubSubscriptionsAPI: SubscriptionsAPI {
69+
let endpoint: String
70+
let request: GoogleCloudPubSubRequest
71+
let encoder = JSONEncoder()
72+
73+
init(request: GoogleCloudPubSubRequest, endpoint: String) {
74+
self.request = request
75+
self.endpoint = endpoint
76+
}
77+
78+
public func get(subscriptionId: String) -> EventLoopFuture<GoogleCloudPubSubSubscription> {
79+
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId)")
80+
}
81+
82+
public func acknowledge(subscriptionId: String, ackIds: [String]) -> EventLoopFuture<EmptyResponse> {
83+
do {
84+
let acks = AcknowledgeRequest(ackIds: ackIds)
85+
let body = try HTTPClient.Body.data(encoder.encode(acks))
86+
return request.send(method: .POST,
87+
path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId):acknowledge",
88+
body: body)
89+
} catch {
90+
return request.eventLoop.makeFailedFuture(error)
91+
}
92+
}
93+
94+
public func create(subscriptionId: String,
95+
topicId: String,
96+
pushEndpoint: String?,
97+
pushConfigAttributes: [String: String]?,
98+
pushConfigOidcTokenServiceAccountEmail: String?,
99+
pushConfigOidcTokenAudience: String?,
100+
ackDeadlineSeconds: Int?,
101+
retainAckedMessages: Bool?,
102+
messageRetentionDuration: String?,
103+
labels: [String: String]?,
104+
enableMessageOrdering: Bool?,
105+
expirationPolicyTTL: String?,
106+
filter: String?,
107+
deadLetterPolicyTopic: String?,
108+
deadLetterPolicyMaxDeliveryAttempts: Int?,
109+
retryPolicyMinimumBackoff: String?,
110+
retryPolicyMaximumBackoff: String?,
111+
detached: Bool?) -> EventLoopFuture<GoogleCloudPubSubSubscription> {
112+
do {
113+
var pushConfig: PushConfig? = nil
114+
if let pushEndpoint = pushEndpoint {
115+
var oidcToken: OidcToken? = nil
116+
if let serviceAccountEmail = pushConfigOidcTokenServiceAccountEmail, let audience = pushConfigOidcTokenAudience {
117+
oidcToken = OidcToken(serviceAccountEmail: serviceAccountEmail, audience: audience)
118+
}
119+
120+
pushConfig = PushConfig(pushEndpoint: pushEndpoint,
121+
attributes: pushConfigAttributes,
122+
oidcToken: oidcToken)
123+
}
124+
125+
var expirationPolicy: ExpirationPolicy? = nil
126+
if let ttl = expirationPolicyTTL {
127+
expirationPolicy = ExpirationPolicy(ttl: ttl)
128+
}
129+
130+
var deadLetterPolicy: DeadLetterPolicy? = nil
131+
if let deadLetterPolicyTopic = deadLetterPolicyTopic {
132+
deadLetterPolicy = DeadLetterPolicy(deadLetterTopic: deadLetterPolicyTopic,
133+
maxDeliveryAttempts: deadLetterPolicyMaxDeliveryAttempts)
134+
}
135+
136+
var retryPolicy: RetryPolicy? = nil
137+
if let min = retryPolicyMinimumBackoff, let max = retryPolicyMaximumBackoff {
138+
retryPolicy = RetryPolicy(minimumBackoff: min,
139+
maximumBackoff: max)
140+
}
141+
142+
let subscription = GoogleCloudPubSubSubscription(name: subscriptionId,
143+
topic: "projects/\(request.project)/topics/\(topicId)",
144+
pushConfig: pushConfig,
145+
ackDeadlineSeconds: ackDeadlineSeconds,
146+
retainAckedMessages: retainAckedMessages,
147+
messageRetentionDuration: messageRetentionDuration,
148+
labels: labels,
149+
enableMessageOrdering: enableMessageOrdering,
150+
expirationPolicy: expirationPolicy,
151+
filter: filter,
152+
deadLetterPolicy: deadLetterPolicy,
153+
retryPolicy: retryPolicy,
154+
detached: detached)
155+
let body = try HTTPClient.Body.data(encoder.encode(subscription))
156+
return request.send(method: .PUT,
157+
path: "\(endpoint)/v1/projects/\(request.project)/subscriptions/\(subscriptionId)",
158+
body: body)
159+
} catch {
160+
return request.eventLoop.makeFailedFuture(error)
161+
}
162+
}
163+
}

PubSub/Sources/API/TopicsAPI.swift

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import Core
2+
import NIO
3+
import AsyncHTTPClient
4+
import Foundation
5+
6+
public protocol TopicsAPI {
7+
/// Gets the configuration of a topic.
8+
///
9+
/// - parameter `topicId`: Name of the topic
10+
/// - returns: If successful, the response body contains an instance of `Topic`.
11+
func get(topicId: String) -> EventLoopFuture<GoogleCloudPubSubTopic>
12+
13+
/// Lists matching topics.
14+
///
15+
/// - parameter `pageSize`: Maximum number of topics to return.
16+
/// `pageToken`: The value returned by the last ListTopicsResponse; indicates that this is a
17+
/// continuation of a prior topics.list call, and that the system should return the next page of data
18+
/// - returns: Returns a list of topics and the `nextPageToken`
19+
func list(pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubListTopicResponse>
20+
21+
/// Adds one or more messages to the topic.
22+
///
23+
/// - parameter `topidId`: Name of the topic
24+
/// `data`: Data to be passed in the message
25+
/// `attributes`: Attributes for this message
26+
/// `orderingKey`: Identifies related messages for which publish order should be respected
27+
/// - returns: Returns an array of `messageId`. `MessageId` is the server-assigned ID of each published message, in the same order as the messages in the request. IDs are guaranteed to be unique within the topic.
28+
func publish(topicId: String, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse>
29+
30+
/// Lists the names of the attached subscriptions on this topic.
31+
func getSubscriptionsList(topicId: String, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse>
32+
}
33+
34+
public final class GoogleCloudPubSubTopicsAPI: TopicsAPI {
35+
let endpoint: String
36+
let request: GoogleCloudPubSubRequest
37+
let encoder = JSONEncoder()
38+
39+
init(request: GoogleCloudPubSubRequest,
40+
endpoint: String) {
41+
self.request = request
42+
self.endpoint = endpoint
43+
}
44+
45+
public func get(topicId: String) -> EventLoopFuture<GoogleCloudPubSubTopic> {
46+
return request.send(method: .GET, path: "\(endpoint)/v1/projects/\(request.project)/topics/\(topicId)")
47+
}
48+
49+
public func list(pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubListTopicResponse> {
50+
var query = "pageSize=\(pageSize ?? 10)"
51+
if let pageToken = pageToken {
52+
query.append(contentsOf: "&pageToken=\(pageToken)")
53+
}
54+
55+
return request.send(method: .GET,
56+
path: "\(endpoint)/v1/projects/\(request.project)/topics",
57+
query: query)
58+
}
59+
60+
public func publish(topicId: String, data: String, attributes: [String: String]?, orderingKey: String?) -> EventLoopFuture<GoogleCloudPublishResponse> {
61+
do {
62+
let message = GoogleCloudPubSubMessage(data: data, attributes: attributes, orderingKey: orderingKey)
63+
let publishRequest = GoogleCloudPublishRequest(messages: [message])
64+
let body = try HTTPClient.Body.data(encoder.encode(publishRequest))
65+
let path = "\(endpoint)/v1/projects/\(request.project)/topics/\(topicId):publish"
66+
67+
print("<<<--- Publish on: \(path) --->")
68+
69+
return request.send(method: .POST,
70+
path: path,
71+
body: body)
72+
} catch {
73+
return request.eventLoop.makeFailedFuture(error)
74+
}
75+
}
76+
77+
public func getSubscriptionsList(topicId: String, pageSize: Int?, pageToken: String?) -> EventLoopFuture<GooglePubSubTopicSubscriptionListResponse> {
78+
var query = "pageSize=\(pageSize ?? 10)"
79+
if let pageToken = pageToken {
80+
query.append(contentsOf: "&pageToken=\(pageToken)")
81+
}
82+
83+
return request.send(method: .GET,
84+
path: "\(endpoint)/v1/projects/\(request.project)/topics/subscriptions",
85+
query: query)
86+
}
87+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//
2+
// AcknowledgeRequest.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/21/21.
6+
//
7+
8+
import Core
9+
import Foundation
10+
11+
public struct AcknowledgeRequest: GoogleCloudModel {
12+
public var ackIds: [String]
13+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//
2+
// ListTopicResponse.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/20/21.
6+
//
7+
8+
import Core
9+
import Foundation
10+
11+
public struct GooglePubSubListTopicResponse: GoogleCloudModel {
12+
/// The resulting topics.
13+
public var topics: [GoogleCloudPubSubTopic]
14+
15+
/// If not empty, indicates that there may be more topics that match the request; this value should be passed in a new ListTopicsRequest.
16+
public var nextPageToken: String?
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//
2+
// PubSubMessage.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/20/21.
6+
//
7+
8+
import Core
9+
import Foundation
10+
11+
public struct GoogleCloudPubSubMessage: GoogleCloudModel {
12+
public var data: String?
13+
public var attributes: [String: String]?
14+
public var messageId: String?
15+
public var publishTime: String?
16+
public var orderingKey: String?
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//
2+
// PublishRequest.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/21/21.
6+
//
7+
8+
import Core
9+
import Foundation
10+
11+
public struct GoogleCloudPublishRequest: GoogleCloudModel {
12+
public let messages: [GoogleCloudPubSubMessage]
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//
2+
// PublishResponse.swift
3+
//
4+
//
5+
// Created by Susheel Athmakuri on 6/21/21.
6+
//
7+
8+
import Core
9+
import Foundation
10+
11+
public struct GoogleCloudPublishResponse: GoogleCloudModel {
12+
public let messageIds: [String]
13+
}

0 commit comments

Comments
 (0)