diff --git a/Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift b/Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift index 50e50bd6d..a2631df67 100644 --- a/Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift +++ b/Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift @@ -239,7 +239,7 @@ internal final actor AsyncWriter { /// /// The call may be suspend if the writer is paused. /// - /// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks + /// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks /// have been suspended. @inlinable internal func write(_ element: Element) async throws { @@ -256,7 +256,7 @@ internal final actor AsyncWriter { // - error (the writer is complete or the queue is full). if self._completionState.isPendingOrCompleted { - continuation.resume(throwing: AsyncWriterError.alreadyFinished) + continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished) } else if !self._isPaused, self._pendingElements.isEmpty { self._delegate.write(element) continuation.resume() @@ -264,7 +264,7 @@ internal final actor AsyncWriter { // The continuation will be resumed later. self._pendingElements.append(PendingElement(element, continuation: continuation)) } else { - continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites) + continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites) } } @@ -279,7 +279,7 @@ internal final actor AsyncWriter { @inlinable internal func _finish(_ end: End, continuation: CheckedContinuation) { if self._completionState.isPendingOrCompleted { - continuation.resume(throwing: AsyncWriterError.alreadyFinished) + continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished) } else if !self._isPaused, self._pendingElements.isEmpty { self._completionState = .completed self._delegate.writeEnd(end) @@ -291,10 +291,35 @@ internal final actor AsyncWriter { } } -@usableFromInline -internal enum AsyncWriterError: Error, Hashable { - case tooManyPendingWrites - case alreadyFinished +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +extension AsyncWriter where End == Void { + @inlinable + internal func finish() async throws { + try await self.finish(()) + } +} + +public struct GRPCAsyncWriterError: Error, Hashable { + private let wrapped: Wrapped + + @usableFromInline + internal enum Wrapped { + case tooManyPendingWrites + case alreadyFinished + } + + @usableFromInline + internal init(_ wrapped: Wrapped) { + self.wrapped = wrapped + } + + /// There are too many writes pending. This may occur when too many Tasks are writing + /// concurrently. + public static let tooManyPendingWrites = Self(.tooManyPendingWrites) + + /// The writer has already finished. This may occur when the RPC completes prematurely, or when + /// a user calls finish more than once. + public static let alreadyFinished = Self(.alreadyFinished) } @usableFromInline diff --git a/Sources/GRPC/AsyncAwaitSupport/Call+AsyncRequestStreamWriter.swift b/Sources/GRPC/AsyncAwaitSupport/Call+AsyncRequestStreamWriter.swift new file mode 100644 index 000000000..f0506b815 --- /dev/null +++ b/Sources/GRPC/AsyncAwaitSupport/Call+AsyncRequestStreamWriter.swift @@ -0,0 +1,33 @@ +/* + * Copyright 2021, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#if compiler(>=5.5) + +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +extension Call { + internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter { + let delegate = GRPCAsyncRequestStreamWriter.Delegate( + compressionEnabled: self.options.messageEncoding.enabledForRequests + ) { request, metadata in + self.send(.message(request, metadata), promise: nil) + } finish: { + self.send(.end, promise: nil) + } + + return GRPCAsyncRequestStreamWriter(asyncWriter: .init(delegate: delegate)) + } +} + +#endif // compiler(>=5.5) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift index 3450cfcc8..6bb6189e4 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift @@ -15,7 +15,6 @@ */ #if compiler(>=5.5) -import _NIOConcurrency import NIOHPACK /// Async-await variant of BidirectionalStreamingCall. @@ -23,6 +22,10 @@ import NIOHPACK public struct GRPCAsyncBidirectionalStreamingCall { private let call: Call private let responseParts: StreamingResponseParts + private let responseSource: PassthroughMessageSource + + /// A request stream writer for sending messages to the server. + public let requestStream: GRPCAsyncRequestStreamWriter /// The stream of responses from the server. public let responses: GRPCAsyncResponseStream @@ -74,93 +77,59 @@ public struct GRPCAsyncBidirectionalStreamingCall { private init(call: Call) { self.call = call - // Initialise `responseParts` with an empty response handler because we - // provide the responses as an AsyncSequence in `responseStream`. self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in } - - // Call and StreamingResponseParts are reference types so we grab a - // referecence to them here to avoid capturing mutable self in the closure - // passed to the AsyncThrowingStream initializer. - // - // The alternative would be to declare the responseStream as: - // ``` - // public private(set) var responseStream: AsyncThrowingStream! - // ``` - // - // UPDATE: Additionally we expect to replace this soon with an AsyncSequence - // implementation that supports yielding values from outside the closure. - let call = self.call - let responseParts = self.responseParts - let responseStream = AsyncThrowingStream(Response.self) { continuation in - call.invokeStreamingRequests { error in - responseParts.handleError(error) - continuation.finish(throwing: error) - } onResponsePart: { responsePart in - responseParts.handle(responsePart) - switch responsePart { - case let .message(response): - continuation.yield(response) - case .metadata: - break - case .end: - continuation.finish() - } - } - } - self.responses = .init(responseStream) + self.responseSource = PassthroughMessageSource() + self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource)) + self.requestStream = call.makeRequestStreamWriter() } /// We expose this as the only non-private initializer so that the caller /// knows that invocation is part of initialisation. internal static func makeAndInvoke(call: Call) -> Self { - Self(call: call) - } - - // MARK: - Requests - - /// Sends a message to the service. - /// - /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`. - /// - /// - Parameters: - /// - message: The message to send. - /// - compression: Whether compression should be used for this message. Ignored if compression - /// was not enabled for the RPC. - public func sendMessage( - _ message: Request, - compression: Compression = .deferToCallDefault - ) async throws { - let compress = self.call.compress(compression) - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise) - // TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel? - try await promise.futureResult.get() - } - - /// Sends a sequence of messages to the service. - /// - /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`. - /// - /// - Parameters: - /// - messages: The sequence of messages to send. - /// - compression: Whether compression should be used for this message. Ignored if compression - /// was not enabled for the RPC. - public func sendMessages( - _ messages: S, - compression: Compression = .deferToCallDefault - ) async throws where S: Sequence, S.Element == Request { - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.sendMessages(messages, compression: compression, promise: promise) - try await promise.futureResult.get() + let asyncCall = Self(call: call) + + asyncCall.call.invokeStreamingRequests( + onError: { error in + asyncCall.responseParts.handleError(error) + asyncCall.responseSource.finish(throwing: error) + }, + onResponsePart: AsyncCall.makeResponsePartHandler( + responseParts: asyncCall.responseParts, + responseSource: asyncCall.responseSource + ) + ) + + return asyncCall } +} - /// Terminates a stream of messages sent to the service. - /// - /// - Important: This should only ever be called once. - public func sendEnd() async throws { - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.send(.end, promise: promise) - try await promise.futureResult.get() +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +internal enum AsyncCall { + internal static func makeResponsePartHandler( + responseParts: StreamingResponseParts, + responseSource: PassthroughMessageSource + ) -> (GRPCClientResponsePart) -> Void { + return { responsePart in + // Handle the metadata, trailers and status. + responseParts.handle(responsePart) + + // Handle the response messages and status. + switch responsePart { + case .metadata: + () + + case let .message(response): + // TODO: when we support backpressure we will need to stop ignoring the return value. + _ = responseSource.yield(response) + + case let .end(status, _): + if status.isOk { + responseSource.finish() + } else { + responseSource.finish(throwing: status) + } + } + } } } diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift index b6b33b74e..4deb03b1a 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift @@ -23,6 +23,9 @@ public struct GRPCAsyncClientStreamingCall { private let call: Call private let responseParts: UnaryResponseParts + /// A request stream writer for sending messages to the server. + public let requestStream: GRPCAsyncRequestStreamWriter + /// The options used to make the RPC. public var options: CallOptions { return self.call.options @@ -81,6 +84,7 @@ public struct GRPCAsyncClientStreamingCall { onError: self.responseParts.handleError(_:), onResponsePart: self.responseParts.handle(_:) ) + self.requestStream = call.makeRequestStreamWriter() } /// We expose this as the only non-private initializer so that the caller @@ -88,53 +92,6 @@ public struct GRPCAsyncClientStreamingCall { internal static func makeAndInvoke(call: Call) -> Self { Self(call: call) } - - // MARK: - Requests - - /// Sends a message to the service. - /// - /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`. - /// - /// - Parameters: - /// - message: The message to send. - /// - compression: Whether compression should be used for this message. Ignored if compression - /// was not enabled for the RPC. - public func sendMessage( - _ message: Request, - compression: Compression = .deferToCallDefault - ) async throws { - let compress = self.call.compress(compression) - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise) - // TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel? - try await promise.futureResult.get() - } - - /// Sends a sequence of messages to the service. - /// - /// - Important: Callers must terminate the stream of messages by calling `sendEnd()`. - /// - /// - Parameters: - /// - messages: The sequence of messages to send. - /// - compression: Whether compression should be used for this message. Ignored if compression - /// was not enabled for the RPC. - public func sendMessages( - _ messages: S, - compression: Compression = .deferToCallDefault - ) async throws where S: Sequence, S.Element == Request { - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.sendMessages(messages, compression: compression, promise: promise) - try await promise.futureResult.get() - } - - /// Terminates a stream of messages sent to the service. - /// - /// - Important: This should only ever be called once. - public func sendEnd() async throws { - let promise = self.call.eventLoop.makePromise(of: Void.self) - self.call.send(.end, promise: promise) - try await promise.futureResult.get() - } } #endif diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift new file mode 100644 index 000000000..2be7795d6 --- /dev/null +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStreamWriter.swift @@ -0,0 +1,127 @@ +/* + * Copyright 2021, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#if compiler(>=5.5) + +/// An object allowing the holder -- a client -- to send requests on an RPC. +/// +/// Requests may be sent using ``send(_:compression:)``. After all requests have been sent +/// the user is responsible for closing the request stream by calling ``finish()``. +/// +/// ``` +/// // Send a request on the request stream, use the compression setting configured for the RPC. +/// try await stream.send(request) +/// +/// // Send a request and explicitly disable compression. +/// try await stream.send(request, compression: .disabled) +/// +/// // Finish the stream to indicate that no more messages will be sent. +/// try await stream.finish() +/// ``` +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +public struct GRPCAsyncRequestStreamWriter { + @usableFromInline + internal let asyncWriter: AsyncWriter> + + @inlinable + internal init(asyncWriter: AsyncWriter>) { + self.asyncWriter = asyncWriter + } + + /// Send a single request. + /// + /// To ensure requests are delivered in order callers should `await` the result of this call + /// before sending another request. Callers who do not need this guarantee do not have to `await` + /// the completion of this call and may send messages concurrently from multiple ``Task``s. + /// However, it is important to note that no more than 16 writes may be pending at any one time + /// and attempting to exceed this will result in an ``GRPCAsyncWriterError.tooManyPendingWrites`` + /// error being thrown. + /// + /// Callers must call ``finish()`` when they have no more requests left to send. + /// + /// - Parameters: + /// - request: The request to send. + /// - compression: Whether the request should be compressed or not. Ignored if compression was + /// not enabled for the RPC. + /// - Throws: ``GRPCAsyncWriterError`` if there are too many pending writes or the request stream + /// has already been finished. + @inlinable + public func send( + _ request: Request, + compression: Compression = .deferToCallDefault + ) async throws { + try await self.asyncWriter.write((request, compression)) + } + + /// Finish the request stream for the RPC. This must be called when there are no more requests to + /// be sent. + /// + /// - Throws: ``GRPCAsyncWriterError`` if the request stream has already been finished. + public func finish() async throws { + try await self.asyncWriter.finish() + } +} + +@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) +extension GRPCAsyncRequestStreamWriter { + /// A delegate for the writer which writes messages to an underlying receiver.` + @usableFromInline + internal final class Delegate: AsyncWriterDelegate { + @usableFromInline + internal typealias Element = (Request, Compression) + + @usableFromInline + internal typealias End = Void + + @usableFromInline + internal let _compressionEnabled: Bool + + @usableFromInline + internal let _send: (Request, MessageMetadata) -> Void + + @usableFromInline + internal let _finish: () -> Void + + @inlinable + internal init( + compressionEnabled: Bool, + send: @escaping (Request, MessageMetadata) -> Void, + finish: @escaping () -> Void + ) { + self._compressionEnabled = compressionEnabled + self._send = send + self._finish = finish + } + + @inlinable + internal func write(_ element: (Request, Compression)) { + let (request, compression) = element + let compress = compression.isEnabled(callDefault: self._compressionEnabled) + + // TODO: be smarter about inserting flushes. + // + // We currently always flush after every write which may trigger more syscalls than necessary. + let metadata = MessageMetadata(compress: compress, flush: true) + self._send(request, metadata) + } + + @inlinable + internal func writeEnd(_ end: Void) { + self._finish() + } + } +} + +#endif // compiler(>=5.5) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift index 247ac34a9..77268d399 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift @@ -20,7 +20,7 @@ @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) public struct GRPCAsyncResponseStream: AsyncSequence { @usableFromInline - internal typealias WrappedStream = AsyncThrowingStream + internal typealias WrappedStream = PassthroughMessageSequence @usableFromInline internal let stream: WrappedStream diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift index 671410689..3ed679f3f 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift @@ -22,6 +22,7 @@ import NIOHPACK public struct GRPCAsyncServerStreamingCall { private let call: Call private let responseParts: StreamingResponseParts + private let responseSource: PassthroughMessageSource /// The stream of responses from the server. public let responses: GRPCAsyncResponseStream @@ -71,45 +72,13 @@ public struct GRPCAsyncServerStreamingCall { } } - private init( - call: Call, - _ request: Request - ) { + private init(call: Call) { self.call = call - // Initialise `responseParts` with an empty response handler because we - // provide the responses as an AsyncSequence in `responseStream`. + // We ignore messages in the closure and instead feed them into the response source when we + // invoke the `call`. self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in } - - // Call and StreamingResponseParts are reference types so we grab a - // referecence to them here to avoid capturing mutable self in the closure - // passed to the AsyncThrowingStream initializer. - // - // The alternative would be to declare the responseStream as: - // ``` - // public private(set) var responseStream: AsyncThrowingStream! - // ``` - // - // UPDATE: Additionally we expect to replace this soon with an AsyncSequence - // implementation that supports yielding values from outside the closure. - let call = self.call - let responseParts = self.responseParts - self - .responses = GRPCAsyncResponseStream(AsyncThrowingStream(Response.self) { continuation in - call.invokeUnaryRequest(request) { error in - responseParts.handleError(error) - continuation.finish(throwing: error) - } onResponsePart: { responsePart in - responseParts.handle(responsePart) - switch responsePart { - case let .message(response): - continuation.yield(response) - case .metadata: - break - case .end: - continuation.finish() - } - } - }) + self.responseSource = PassthroughMessageSource() + self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource)) } /// We expose this as the only non-private initializer so that the caller @@ -118,7 +87,21 @@ public struct GRPCAsyncServerStreamingCall { call: Call, _ request: Request ) -> Self { - Self(call: call, request) + let asyncCall = Self(call: call) + + asyncCall.call.invokeUnaryRequest( + request, + onError: { error in + asyncCall.responseParts.handleError(error) + asyncCall.responseSource.finish(throwing: error) + }, + onResponsePart: AsyncCall.makeResponsePartHandler( + responseParts: asyncCall.responseParts, + responseSource: asyncCall.responseSource + ) + ) + + return asyncCall } } diff --git a/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift b/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift index 9ddaa4a02..7dd7a5be0 100644 --- a/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift +++ b/Tests/GRPCTests/AsyncAwaitSupport/AsyncIntegrationTests.swift @@ -77,10 +77,10 @@ final class AsyncIntegrationTests: GRPCTestCase { XCTAsyncTest { let collect = self.echo.makeCollectCall() - try await collect.sendMessage(.with { $0.text = "boyle" }) - try await collect.sendMessage(.with { $0.text = "jeffers" }) - try await collect.sendMessage(.with { $0.text = "holt" }) - try await collect.sendEnd() + try await collect.requestStream.send(.with { $0.text = "boyle" }) + try await collect.requestStream.send(.with { $0.text = "jeffers" }) + try await collect.requestStream.send(.with { $0.text = "holt" }) + try await collect.requestStream.finish() let initialMetadata = try await collect.initialMetadata initialMetadata.assertFirst("200", forName: ":status") @@ -125,12 +125,12 @@ final class AsyncIntegrationTests: GRPCTestCase { var responseIterator = update.responses.map { $0.text }.makeAsyncIterator() for (i, name) in ["boyle", "jeffers", "holt"].enumerated() { - try await update.sendMessage(.with { $0.text = name }) + try await update.requestStream.send(.with { $0.text = name }) let response = try await responseIterator.next() XCTAssertEqual(response, "Swift echo update (\(i)): \(name)") } - try await update.sendEnd() + try await update.requestStream.finish() // This isn't right after we make the call as servers are not guaranteed to send metadata back // immediately. Concretely, we don't send initial metadata back until the first response diff --git a/Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift b/Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift index dab150044..5f8406c95 100644 --- a/Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift +++ b/Tests/GRPCTests/AsyncAwaitSupport/AsyncWriterTests.swift @@ -67,7 +67,7 @@ internal class AsyncWriterTests: GRPCTestCase { await writer.toggleWritability() await XCTAssertThrowsError(try await writer.write("pontiac")) { error in - XCTAssertEqual(error as? AsyncWriterError, .tooManyPendingWrites) + XCTAssertEqual(error as? GRPCAsyncWriterError, .tooManyPendingWrites) } // resume (we must finish the writer.) @@ -87,7 +87,7 @@ internal class AsyncWriterTests: GRPCTestCase { XCTAssertEqual(delegate.end, 0) await XCTAssertThrowsError(try await writer.write("cheddar")) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } XCTAssertTrue(delegate.elements.isEmpty) @@ -103,7 +103,7 @@ internal class AsyncWriterTests: GRPCTestCase { XCTAssertEqual(delegate.end, 0) await XCTAssertThrowsError(try await writer.finish(1)) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } // Still 0. @@ -151,7 +151,7 @@ internal class AsyncWriterTests: GRPCTestCase { do { try await writer.finish(1) } catch { - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) // Resume. await writer.toggleWritability() } @@ -161,7 +161,7 @@ internal class AsyncWriterTests: GRPCTestCase { do { try await writer.finish(2) } catch { - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) // Resume. await writer.toggleWritability() } @@ -170,7 +170,7 @@ internal class AsyncWriterTests: GRPCTestCase { // We should definitely be finished by this point. await XCTAssertThrowsError(try await writer.finish(3)) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } } } @@ -193,7 +193,7 @@ internal class AsyncWriterTests: GRPCTestCase { } catch is CancellationError { // Cancellation is fine: we cancelled while the write was pending. () - } catch let error as AsyncWriterError { + } catch let error as GRPCAsyncWriterError { // Already finish is also fine: we cancelled before the write was enqueued. XCTAssertEqual(error, .alreadyFinished) } catch { @@ -201,7 +201,7 @@ internal class AsyncWriterTests: GRPCTestCase { } await XCTAssertThrowsError(try await writer.write("bar")) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } XCTAssertTrue(delegate.elements.isEmpty) @@ -227,7 +227,7 @@ internal class AsyncWriterTests: GRPCTestCase { } catch is CancellationError { // Cancellation is fine: we cancelled while the write was pending. () - } catch let error as AsyncWriterError { + } catch let error as GRPCAsyncWriterError { // Already finish is also fine: we cancelled before the write was enqueued. XCTAssertEqual(error, .alreadyFinished) } catch { @@ -235,7 +235,7 @@ internal class AsyncWriterTests: GRPCTestCase { } await XCTAssertThrowsError(try await writer.finish(42)) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } XCTAssertTrue(delegate.elements.isEmpty) @@ -250,13 +250,13 @@ internal class AsyncWriterTests: GRPCTestCase { await writer.cancel() await XCTAssertThrowsError(try await writer.write("1")) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } // Fine, no need to throw. Nothing should change. await writer.cancel() await XCTAssertThrowsError(try await writer.write("2")) { error in - XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) + XCTAssertEqual(error as? GRPCAsyncWriterError, .alreadyFinished) } XCTAssertTrue(delegate.elements.isEmpty) diff --git a/Tests/GRPCTests/GRPCAsyncClientCallTests.swift b/Tests/GRPCTests/GRPCAsyncClientCallTests.swift index 18d6a56d9..02e5f4fe3 100644 --- a/Tests/GRPCTests/GRPCAsyncClientCallTests.swift +++ b/Tests/GRPCTests/GRPCAsyncClientCallTests.swift @@ -96,9 +96,9 @@ class GRPCAsyncClientCallTests: GRPCTestCase { ) for word in ["boyle", "jeffers", "holt"] { - try await collect.sendMessage(.with { $0.text = word }) + try await collect.requestStream.send(.with { $0.text = word }) } - try await collect.sendEnd() + try await collect.requestStream.finish() await assertThat(try await collect.initialMetadata, .is(.equalTo(Self.OKInitialMetadata))) await assertThat(try await collect.response, .doesNotThrow()) @@ -133,9 +133,9 @@ class GRPCAsyncClientCallTests: GRPCTestCase { ) for word in ["boyle", "jeffers", "holt"] { - try await update.sendMessage(.with { $0.text = word }) + try await update.requestStream.send(.with { $0.text = word }) } - try await update.sendEnd() + try await update.requestStream.finish() let numResponses = try await update.responses.map { _ in 1 }.reduce(0, +) @@ -156,11 +156,11 @@ class GRPCAsyncClientCallTests: GRPCTestCase { var responseStreamIterator = update.responses.makeAsyncIterator() for word in ["boyle", "jeffers", "holt"] { - try await update.sendMessage(.with { $0.text = word }) + try await update.requestStream.send(.with { $0.text = word }) await assertThat(try await responseStreamIterator.next(), .is(.notNil())) } - try await update.sendEnd() + try await update.requestStream.finish() await assertThat(try await responseStreamIterator.next(), .is(.nil())) @@ -185,10 +185,10 @@ class GRPCAsyncClientCallTests: GRPCTestCase { // Send requests, then end, in a task. taskGroup.addTask { for word in ["boyle", "jeffers", "holt"] { - try await update.sendMessage(.with { $0.text = word }) + try await update.requestStream.send(.with { $0.text = word }) await counter.incrementRequests() } - try await update.sendEnd() + try await update.requestStream.finish() } // Get responses in a separate task. taskGroup.addTask {