Skip to content

Replace the placeholder client request and response mechanisms #1264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
///
/// The call may be suspend if the writer is paused.
///
/// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
/// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks
/// have been suspended.
@inlinable
internal func write(_ element: Element) async throws {
Expand All @@ -256,15 +256,15 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
// - error (the writer is complete or the queue is full).

if self._completionState.isPendingOrCompleted {
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
} else if !self._isPaused, self._pendingElements.isEmpty {
self._delegate.write(element)
continuation.resume()
} else if self._pendingElements.count < self._maxPendingElements {
// The continuation will be resumed later.
self._pendingElements.append(PendingElement(element, continuation: continuation))
} else {
continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites)
}
}

Expand All @@ -279,7 +279,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
@inlinable
internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
if self._completionState.isPendingOrCompleted {
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
} else if !self._isPaused, self._pendingElements.isEmpty {
self._completionState = .completed
self._delegate.writeEnd(end)
Expand All @@ -291,10 +291,35 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
}
}

@usableFromInline
internal enum AsyncWriterError: Error, Hashable {
case tooManyPendingWrites
case alreadyFinished
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension AsyncWriter where End == Void {
@inlinable
internal func finish() async throws {
try await self.finish(())
}
}

public struct GRPCAsyncWriterError: Error, Hashable {
private let wrapped: Wrapped

@usableFromInline
internal enum Wrapped {
case tooManyPendingWrites
case alreadyFinished
}

@usableFromInline
internal init(_ wrapped: Wrapped) {
self.wrapped = wrapped
}

/// There are too many writes pending. This may occur when too many Tasks are writing
/// concurrently.
public static let tooManyPendingWrites = Self(.tooManyPendingWrites)

/// The writer has already finished. This may occur when the RPC completes prematurely, or when
/// a user calls finish more than once.
public static let alreadyFinished = Self(.alreadyFinished)
}

@usableFromInline
Expand Down
33 changes: 33 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/Call+AsyncRequestStreamWriter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension Call {
internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter<Request> {
let delegate = GRPCAsyncRequestStreamWriter<Request>.Delegate(
compressionEnabled: self.options.messageEncoding.enabledForRequests
) { request, metadata in
self.send(.message(request, metadata), promise: nil)
} finish: {
self.send(.end, promise: nil)
}

return GRPCAsyncRequestStreamWriter(asyncWriter: .init(delegate: delegate))
}
}

#endif // compiler(>=5.5)
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
#if compiler(>=5.5)

import _NIOConcurrency
import NIOHPACK

/// Async-await variant of BidirectionalStreamingCall.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>
private let responseSource: PassthroughMessageSource<Response, Error>

/// A request stream writer for sending messages to the server.
public let requestStream: GRPCAsyncRequestStreamWriter<Request>

/// The stream of responses from the server.
public let responses: GRPCAsyncResponseStream<Response>
Expand Down Expand Up @@ -74,93 +77,59 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {

private init(call: Call<Request, Response>) {
self.call = call
// Initialise `responseParts` with an empty response handler because we
// provide the responses as an AsyncSequence in `responseStream`.
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }

// Call and StreamingResponseParts are reference types so we grab a
// referecence to them here to avoid capturing mutable self in the closure
// passed to the AsyncThrowingStream initializer.
//
// The alternative would be to declare the responseStream as:
// ```
// public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
// ```
//
// UPDATE: Additionally we expect to replace this soon with an AsyncSequence
// implementation that supports yielding values from outside the closure.
let call = self.call
let responseParts = self.responseParts
let responseStream = AsyncThrowingStream(Response.self) { continuation in
call.invokeStreamingRequests { error in
responseParts.handleError(error)
continuation.finish(throwing: error)
} onResponsePart: { responsePart in
responseParts.handle(responsePart)
switch responsePart {
case let .message(response):
continuation.yield(response)
case .metadata:
break
case .end:
continuation.finish()
}
}
}
self.responses = .init(responseStream)
self.responseSource = PassthroughMessageSource<Response, Error>()
self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
self.requestStream = call.makeRequestStreamWriter()
}

/// We expose this as the only non-private initializer so that the caller
/// knows that invocation is part of initialisation.
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
Self(call: call)
}

// MARK: - Requests

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - message: The message to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessage(
_ message: Request,
compression: Compression = .deferToCallDefault
) async throws {
let compress = self.call.compress(compression)
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
try await promise.futureResult.get()
}

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessages<S>(
_ messages: S,
compression: Compression = .deferToCallDefault
) async throws where S: Sequence, S.Element == Request {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.sendMessages(messages, compression: compression, promise: promise)
try await promise.futureResult.get()
let asyncCall = Self(call: call)

asyncCall.call.invokeStreamingRequests(
onError: { error in
asyncCall.responseParts.handleError(error)
asyncCall.responseSource.finish(throwing: error)
},
onResponsePart: AsyncCall.makeResponsePartHandler(
responseParts: asyncCall.responseParts,
responseSource: asyncCall.responseSource
)
)

return asyncCall
}
}

/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
public func sendEnd() async throws {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.end, promise: promise)
try await promise.futureResult.get()
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
internal enum AsyncCall {
internal static func makeResponsePartHandler<Response>(
responseParts: StreamingResponseParts<Response>,
responseSource: PassthroughMessageSource<Response, Error>
) -> (GRPCClientResponsePart<Response>) -> Void {
return { responsePart in
// Handle the metadata, trailers and status.
responseParts.handle(responsePart)

// Handle the response messages and status.
switch responsePart {
case .metadata:
()

case let .message(response):
// TODO: when we support backpressure we will need to stop ignoring the return value.
_ = responseSource.yield(response)

case let .end(status, _):
if status.isOk {
responseSource.finish()
} else {
responseSource.finish(throwing: status)
}
}
}
}
}

Expand Down
51 changes: 4 additions & 47 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
private let call: Call<Request, Response>
private let responseParts: UnaryResponseParts<Response>

/// A request stream writer for sending messages to the server.
public let requestStream: GRPCAsyncRequestStreamWriter<Request>

/// The options used to make the RPC.
public var options: CallOptions {
return self.call.options
Expand Down Expand Up @@ -81,60 +84,14 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
onError: self.responseParts.handleError(_:),
onResponsePart: self.responseParts.handle(_:)
)
self.requestStream = call.makeRequestStreamWriter()
}

/// We expose this as the only non-private initializer so that the caller
/// knows that invocation is part of initialisation.
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
Self(call: call)
}

// MARK: - Requests

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - message: The message to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessage(
_ message: Request,
compression: Compression = .deferToCallDefault
) async throws {
let compress = self.call.compress(compression)
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
try await promise.futureResult.get()
}

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessages<S>(
_ messages: S,
compression: Compression = .deferToCallDefault
) async throws where S: Sequence, S.Element == Request {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.sendMessages(messages, compression: compression, promise: promise)
try await promise.futureResult.get()
}

/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
public func sendEnd() async throws {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.end, promise: promise)
try await promise.futureResult.get()
}
}

#endif
Loading