Skip to content

Commit 2611b09

Browse files
authored
feat(functions): add experimental invoke with streamed responses (#346)
* feat(functions): add experimental invoke with streamed responses * chore: use HTTPClient * fix x-region header after rebase * rename method to `_invokeWithStreamedResponse`
1 parent e62ad89 commit 2611b09

File tree

2 files changed

+95
-28
lines changed

2 files changed

+95
-28
lines changed

Sources/Functions/FunctionsClient.swift

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import _Helpers
2-
import Foundation
2+
@preconcurrency import Foundation
33

44
#if canImport(FoundationNetworking)
55
import FoundationNetworking
@@ -20,21 +20,23 @@ public actor FunctionsClient {
2020
var headers: [String: String]
2121
/// The Region to invoke the functions in.
2222
let region: String?
23-
/// The fetch handler used to make requests.
24-
let fetch: FetchHandler
23+
24+
private let http: HTTPClient
2525

2626
/// Initializes a new instance of `FunctionsClient`.
2727
///
2828
/// - Parameters:
2929
/// - url: The base URL for the functions.
3030
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
3131
/// - region: The Region to invoke the functions in.
32+
/// - logger: SupabaseLogger instance to use.
3233
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
3334
@_disfavoredOverload
3435
public init(
3536
url: URL,
3637
headers: [String: String] = [:],
3738
region: String? = nil,
39+
logger: (any SupabaseLogger)? = nil,
3840
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
3941
) {
4042
self.url = url
@@ -43,7 +45,7 @@ public actor FunctionsClient {
4345
self.headers["X-Client-Info"] = "functions-swift/\(version)"
4446
}
4547
self.region = region
46-
self.fetch = fetch
48+
http = HTTPClient(logger: logger, fetchHandler: fetch)
4749
}
4850

4951
/// Initializes a new instance of `FunctionsClient`.
@@ -52,20 +54,16 @@ public actor FunctionsClient {
5254
/// - url: The base URL for the functions.
5355
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
5456
/// - region: The Region to invoke the functions in.
57+
/// - logger: SupabaseLogger instance to use.
5558
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
5659
public init(
5760
url: URL,
5861
headers: [String: String] = [:],
5962
region: FunctionRegion? = nil,
63+
logger: (any SupabaseLogger)? = nil,
6064
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
6165
) {
62-
self.url = url
63-
self.headers = headers
64-
if headers["X-Client-Info"] == nil {
65-
self.headers["X-Client-Info"] = "functions-swift/\(version)"
66-
}
67-
self.region = region?.rawValue
68-
self.fetch = fetch
66+
self.init(url: url, headers: headers, region: region?.rawValue, logger: logger, fetch: fetch)
6967
}
7068

7169
/// Updates the authorization header.
@@ -92,10 +90,10 @@ public actor FunctionsClient {
9290
options: FunctionInvokeOptions = .init(),
9391
decode: (Data, HTTPURLResponse) throws -> Response
9492
) async throws -> Response {
95-
let (data, response) = try await rawInvoke(
93+
let response = try await rawInvoke(
9694
functionName: functionName, invokeOptions: options
9795
)
98-
return try decode(data, response)
96+
return try decode(response.data, response.response)
9997
}
10098

10199
/// Invokes a function and decodes the response as a specific type.
@@ -130,33 +128,101 @@ public actor FunctionsClient {
130128
private func rawInvoke(
131129
functionName: String,
132130
invokeOptions: FunctionInvokeOptions
133-
) async throws -> (Data, HTTPURLResponse) {
131+
) async throws -> Response {
132+
var request = Request(
133+
path: functionName,
134+
method: .post,
135+
headers: invokeOptions.headers.merging(headers) { invoke, _ in invoke },
136+
body: invokeOptions.body
137+
)
138+
139+
if let region = invokeOptions.region ?? region {
140+
request.headers["x-region"] = region
141+
}
142+
143+
let response = try await http.fetch(request, baseURL: url)
144+
145+
guard 200 ..< 300 ~= response.statusCode else {
146+
throw FunctionsError.httpError(code: response.statusCode, data: response.data)
147+
}
148+
149+
let isRelayError = response.response.value(forHTTPHeaderField: "x-relay-error") == "true"
150+
if isRelayError {
151+
throw FunctionsError.relayError
152+
}
153+
154+
return response
155+
}
156+
157+
/// Invokes a function with streamed response.
158+
///
159+
/// Function MUST return a `text/event-stream` content type for this method to work.
160+
///
161+
/// - Parameters:
162+
/// - functionName: The name of the function to invoke.
163+
/// - invokeOptions: Options for invoking the function.
164+
/// - Returns: A stream of Data.
165+
///
166+
/// - Warning: Experimental method.
167+
/// - Note: This method doesn't use the same underlying `URLSession` as the remaining methods in the library.
168+
public func _invokeWithStreamedResponse(
169+
_ functionName: String,
170+
options invokeOptions: FunctionInvokeOptions = .init()
171+
) -> AsyncThrowingStream<Data, any Error> {
172+
let (stream, continuation) = AsyncThrowingStream<Data, any Error>.makeStream()
173+
let delegate = StreamResponseDelegate(continuation: continuation)
174+
175+
let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: nil)
176+
134177
let url = url.appendingPathComponent(functionName)
135178
var urlRequest = URLRequest(url: url)
136179
urlRequest.allHTTPHeaderFields = invokeOptions.headers.merging(headers) { invoke, _ in invoke }
137180
urlRequest.httpMethod = (invokeOptions.method ?? .post).rawValue
138181
urlRequest.httpBody = invokeOptions.body
139182

140-
let region = invokeOptions.region ?? region
141-
if let region {
142-
urlRequest.setValue(region, forHTTPHeaderField: "x-region")
183+
let task = session.dataTask(with: urlRequest) { data, response, _ in
184+
guard let httpResponse = response as? HTTPURLResponse else {
185+
continuation.finish(throwing: URLError(.badServerResponse))
186+
return
187+
}
188+
189+
guard 200 ..< 300 ~= httpResponse.statusCode else {
190+
let error = FunctionsError.httpError(code: httpResponse.statusCode, data: data ?? Data())
191+
continuation.finish(throwing: error)
192+
return
193+
}
194+
195+
let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
196+
if isRelayError {
197+
continuation.finish(throwing: FunctionsError.relayError)
198+
}
143199
}
144200

145-
let (data, response) = try await fetch(urlRequest)
201+
task.resume()
146202

147-
guard let httpResponse = response as? HTTPURLResponse else {
148-
throw URLError(.badServerResponse)
149-
}
203+
continuation.onTermination = { _ in
204+
task.cancel()
150205

151-
guard 200 ..< 300 ~= httpResponse.statusCode else {
152-
throw FunctionsError.httpError(code: httpResponse.statusCode, data: data)
206+
// Hold a strong reference to delegate until continuation terminates.
207+
_ = delegate
153208
}
154209

155-
let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
156-
if isRelayError {
157-
throw FunctionsError.relayError
158-
}
210+
return stream
211+
}
212+
}
213+
214+
final class StreamResponseDelegate: NSObject, URLSessionDataDelegate, Sendable {
215+
let continuation: AsyncThrowingStream<Data, any Error>.Continuation
216+
217+
init(continuation: AsyncThrowingStream<Data, any Error>.Continuation) {
218+
self.continuation = continuation
219+
}
220+
221+
func urlSession(_: URLSession, dataTask _: URLSessionDataTask, didReceive data: Data) {
222+
continuation.yield(data)
223+
}
159224

160-
return (data, httpResponse)
225+
func urlSession(_: URLSession, task _: URLSessionTask, didCompleteWithError error: (any Error)?) {
226+
continuation.finish(throwing: error)
161227
}
162228
}

Sources/Supabase/SupabaseClient.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public final class SupabaseClient: @unchecked Sendable {
7171
url: functionsURL,
7272
headers: defaultHeaders,
7373
region: options.functions.region,
74+
logger: options.global.logger,
7475
fetch: fetchWithAuth
7576
)
7677

0 commit comments

Comments
 (0)