diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index 0749922d1..475cea840 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -423,6 +423,14 @@ extension ClientConnection { /// Defaults to 30 minutes. public var connectionIdleTimeout: TimeAmount = .minutes(30) + /// The maximum allowed age of a connection. + /// + /// If set, no new RPCs will be started on the connection after the connection has been opened + /// for this period of time. Existing RPCs will be allowed to continue and the connection will + /// close once all RPCs on the connection have finished. If this isn't set then connections have + /// no limit on their lifetime. + public var connectionMaxAge: TimeAmount? = nil + /// The behavior used to determine when an RPC should start. That is, whether it should wait for /// an active connection or fail quickly if no connection is currently available. /// @@ -635,6 +643,7 @@ extension ChannelPipeline.SynchronousOperations { connectionManager: ConnectionManager, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, httpTargetWindowSize: Int, httpMaxFrameSize: Int, httpMaxResetStreams: Int, @@ -672,6 +681,7 @@ extension ChannelPipeline.SynchronousOperations { connectionManager: connectionManager, multiplexer: h2Multiplexer, idleTimeout: connectionIdleTimeout, + maxAge: connectionMaxAge, keepalive: connectionKeepalive, logger: logger ) diff --git a/Sources/GRPC/ConnectionManagerChannelProvider.swift b/Sources/GRPC/ConnectionManagerChannelProvider.swift index 3a23e85c2..6bae5c516 100644 --- a/Sources/GRPC/ConnectionManagerChannelProvider.swift +++ b/Sources/GRPC/ConnectionManagerChannelProvider.swift @@ -60,6 +60,8 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { internal var connectionKeepalive: ClientConnectionKeepalive @usableFromInline internal var connectionIdleTimeout: TimeAmount + @usableFromInline + internal var connectionMaxAge: TimeAmount? @usableFromInline internal var tlsMode: TLSMode @@ -100,6 +102,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: ConnectionTarget, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, @@ -113,6 +116,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: connectionTarget, connectionKeepalive: connectionKeepalive, connectionIdleTimeout: connectionIdleTimeout, + connectionMaxAge: connectionMaxAge, tlsMode: tlsMode, tlsConfiguration: tlsConfiguration, httpTargetWindowSize: httpTargetWindowSize, @@ -131,6 +135,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: ConnectionTarget, connectionKeepalive: ClientConnectionKeepalive, connectionIdleTimeout: TimeAmount, + connectionMaxAge: TimeAmount?, tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, @@ -142,6 +147,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { self.connectionTarget = connectionTarget self.connectionKeepalive = connectionKeepalive self.connectionIdleTimeout = connectionIdleTimeout + self.connectionMaxAge = connectionMaxAge self.tlsMode = tlsMode self.tlsConfiguration = tlsConfiguration @@ -182,6 +188,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionTarget: configuration.target, connectionKeepalive: configuration.connectionKeepalive, connectionIdleTimeout: configuration.connectionIdleTimeout, + connectionMaxAge: configuration.connectionMaxAge, tlsMode: tlsMode, tlsConfiguration: configuration.tlsConfiguration, httpTargetWindowSize: configuration.httpTargetWindowSize, @@ -264,6 +271,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionManager: connectionManager, connectionKeepalive: self.connectionKeepalive, connectionIdleTimeout: self.connectionIdleTimeout, + connectionMaxAge: self.connectionMaxAge, httpTargetWindowSize: self.httpTargetWindowSize, httpMaxFrameSize: self.httpMaxFrameSize, httpMaxResetStreams: self.httpMaxResetStreams, diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index e84b903db..53f966d8f 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -156,6 +156,14 @@ extension GRPCChannelPool { /// If a connection becomes idle, starting a new RPC will automatically create a new connection. public var idleTimeout = TimeAmount.minutes(30) + /// The maximum allowed age of a connection. + /// + /// If set, no new RPCs will be started on the connection after the connection has been opened + /// for this period of time. Existing RPCs will be allowed to continue and the connection will + /// close once all RPCs on the connection have finished. If this isn't set then connections have + /// no limit on their lifetime. + public var maxConnectionAge: TimeAmount? = nil + /// The connection keepalive configuration. public var keepalive = ClientConnectionKeepalive() diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index 0c7b95fcd..255ea3142 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -86,6 +86,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, @@ -100,6 +101,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, @@ -114,6 +116,7 @@ internal final class PooledChannel: GRPCChannel { connectionTarget: configuration.target, connectionKeepalive: configuration.keepalive, connectionIdleTimeout: configuration.idleTimeout, + connectionMaxAge: configuration.maxConnectionAge, tlsMode: tlsMode, tlsConfiguration: configuration.transportSecurity.tlsConfiguration, httpTargetWindowSize: configuration.http2.targetWindowSize, diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 0f9492163..4d86f226d 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -27,11 +27,18 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { /// If nil, then we shouldn't schedule idle tasks. private let idleTimeout: TimeAmount? + /// The maximum amount of time the connection is allowed to live before quiescing. + private let maxAge: TimeAmount? + /// The ping handler. private var pingHandler: PingHandler + /// The scheduled task which will close the connection gently after the max connection age + /// has been reached. + private var scheduledMaxAgeClose: Scheduled? + /// The scheduled task which will close the connection after the keep-alive timeout has expired. - private var scheduledClose: Scheduled? + private var scheduledKeepAliveClose: Scheduled? /// The scheduled task which will ping. private var scheduledPing: RepeatedTask? @@ -75,6 +82,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { connectionManager: ConnectionManager, multiplexer: HTTP2StreamMultiplexer, idleTimeout: TimeAmount, + maxAge: TimeAmount?, keepalive configuration: ClientConnectionKeepalive, logger: Logger ) { @@ -95,6 +103,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { minimumSentPingIntervalWithoutData: configuration.minimumSentPingIntervalWithoutData ) self.creationTime = .now() + self.maxAge = maxAge } init( @@ -116,6 +125,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { maximumPingStrikes: configuration.maximumPingStrikes ) self.creationTime = .now() + self.maxAge = nil } private func perform(operations: GRPCIdleHandlerStateMachine.Operations) { @@ -218,8 +228,8 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { ) case .cancelScheduledTimeout: - self.scheduledClose?.cancel() - self.scheduledClose = nil + self.scheduledKeepAliveClose?.cancel() + self.scheduledKeepAliveClose = nil case let .schedulePing(delay, timeout): self.schedulePing(in: delay, timeout: timeout) @@ -267,7 +277,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { } private func scheduleClose(in timeout: TimeAmount) { - self.scheduledClose = self.context?.eventLoop.scheduleTask(in: timeout) { + self.scheduledKeepAliveClose = self.context?.eventLoop.scheduleTask(in: timeout) { self.stateMachine.logger.debug("keepalive timer expired") self.perform(operations: self.stateMachine.shutdownNow()) } @@ -334,6 +344,16 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { remote: context.remoteAddress ) + // If a max age has been set then start a timer. This will only be cancelled when it fires or when + // the channel eventually becomes inactive. + if let maxAge = self.maxAge { + assert(self.scheduledMaxAgeClose == nil) + self.scheduledMaxAgeClose = context.eventLoop.scheduleTask(in: maxAge) { + let operations = self.stateMachine.reachedMaxAge() + self.perform(operations: operations) + } + } + // No state machine action here. switch self.mode { case let .client(connectionManager, multiplexer): @@ -341,15 +361,18 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { case .server: () } + context.fireChannelActive() } func channelInactive(context: ChannelHandlerContext) { self.perform(operations: self.stateMachine.channelInactive()) self.scheduledPing?.cancel() - self.scheduledClose?.cancel() + self.scheduledKeepAliveClose?.cancel() + self.scheduledMaxAgeClose?.cancel() self.scheduledPing = nil - self.scheduledClose = nil + self.scheduledKeepAliveClose = nil + self.scheduledMaxAgeClose = nil context.fireChannelInactive() } diff --git a/Sources/GRPC/GRPCIdleHandlerStateMachine.swift b/Sources/GRPC/GRPCIdleHandlerStateMachine.swift index 5fbbe5c71..fc1134591 100644 --- a/Sources/GRPC/GRPCIdleHandlerStateMachine.swift +++ b/Sources/GRPC/GRPCIdleHandlerStateMachine.swift @@ -465,6 +465,13 @@ struct GRPCIdleHandlerStateMachine { return operations } + /// The connection has reached it's max allowable age. Let existing RPCs continue, but don't + /// allow any new ones. + mutating func reachedMaxAge() -> Operations { + // Treat this as if the other side sent us a GOAWAY: gently shutdown the connection. + self.receiveGoAway() + } + /// We've received a GOAWAY frame from the remote peer. Either the remote peer wants to close the /// connection or they're responding to us shutting down the connection. mutating func receiveGoAway() -> Operations { diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 77486b0c9..db3127f54 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -165,6 +165,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -217,6 +218,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -273,6 +275,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -322,6 +325,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -350,6 +354,7 @@ extension ConnectionManagerTests { inboundStreamInitializer: nil ), idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -391,6 +396,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -464,6 +470,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -536,6 +543,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -654,6 +662,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -730,6 +739,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -807,6 +817,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: firstH2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -855,6 +866,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: secondH2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -905,6 +917,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1063,6 +1076,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1120,6 +1134,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: h2mux, idleTimeout: .minutes(5), + maxAge: nil, keepalive: .init(), logger: self.logger ) @@ -1201,6 +1216,7 @@ extension ConnectionManagerTests { connectionManager: manager, multiplexer: multiplexer, idleTimeout: .minutes(5), + maxAge: nil, keepalive: ClientConnectionKeepalive(), logger: self.logger ) @@ -1314,6 +1330,7 @@ extension ConnectionManagerTests { connectionManager: connectionManager, multiplexer: multiplexer, idleTimeout: .minutes(60), + maxAge: nil, keepalive: .init(), logger: self.clientLogger ) diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index d25489630..2a079183c 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -1299,6 +1299,7 @@ extension ChannelController: ConnectionManagerChannelProvider { connectionManager: connectionManager, multiplexer: multiplexer, idleTimeout: .minutes(5), + maxAge: nil, keepalive: ClientConnectionKeepalive(), logger: logger ) diff --git a/Tests/GRPCTests/MaxAgeTests.swift b/Tests/GRPCTests/MaxAgeTests.swift new file mode 100644 index 000000000..8b4cc1120 --- /dev/null +++ b/Tests/GRPCTests/MaxAgeTests.swift @@ -0,0 +1,241 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import EchoImplementation +import EchoModel +import GRPC +import NIOConcurrencyHelpers +import NIOCore +import NIOPosix +import XCTest + +final class MaxAgeTests: XCTestCase { + private func withEchoClient( + group: any EventLoopGroup, + configure: (inout GRPCChannelPool.Configuration) -> Void, + test: (Echo_EchoNIOClient) throws -> Void + ) throws { + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + + let server = try Server.insecure(group: group) + .withServiceProviders([EchoProvider()]) + .bind(host: "127.0.0.1", port: 0) + .wait() + + defer { + try? server.close().wait() + } + + let port = server.channel.localAddress!.port! + + let pool = try GRPCChannelPool.with( + target: .host("127.0.0.1", port: port), + transportSecurity: .plaintext, + eventLoopGroup: eventLoop, + configure + ) + + defer { + try? pool.close().wait() + } + + try test(Echo_EchoNIOClient(channel: pool)) + } + + func testMaxAgeIsRespected() throws { + // Verifies that the max-age config is respected by using the connection pool delegate to + // start new RPCs when each connection closes (which close by aging out). It'll also record + // various events that happen as part of the lifecycle of each connection. + + // The pool creates one sub-pool per event loop. Use a single loop to simplify connection + // counting. + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + let done = eventLoop.makePromise(of: [RPCOnConnectionClosedDelegate.Event].self) + let iterations = 2 + let delegate = RPCOnConnectionClosedDelegate(iterations: iterations, done: done) + // This needs to be relatively short so the test doesn't take too long but not so short that + // the connection is closed before it's actually used. + let maxConnectionAge: TimeAmount = .milliseconds(50) + + try withEchoClient(group: eventLoop) { config in + config.maxConnectionAge = maxConnectionAge + config.delegate = delegate + } test: { echo in + // This creates a retain cycle (delegate → echo → channel → delegate), break it when the + // test is done. + delegate.setEcho(echo) + defer { delegate.setEcho(nil) } + + let startTime = NIODeadline.now() + + // Do an RPC to kick things off. + let rpc = try echo.get(.with { $0.text = "hello" }).response.wait() + XCTAssertEqual(rpc.text, "Swift echo get: hello") + + // Wait for the delegate to finish driving the RPCs. + let events = try done.futureResult.wait() + let endTime = NIODeadline.now() + + // Add an iteration as one is done by the test (as opposed to the delegate). Each iteration + // has three events: connected, quiescing, closed. + XCTAssertEqual(events.count, (iterations + 1) * 3) + + // Check each triplet is as expected: connected, quiescing, then closed. + for startIndex in stride(from: events.startIndex, to: events.endIndex, by: 3) { + switch (events[startIndex], events[startIndex + 1], events[startIndex + 2]) { + case (.connectSucceeded(let id1), .connectionQuiescing(let id2), .connectionClosed(let id3)): + XCTAssertEqual(id1, id2) + XCTAssertEqual(id2, id3) + default: + XCTFail("Invalid event triplet: \(events[startIndex ... startIndex + 2])") + } + } + + // Check the duration was in the right ballpark. + let duration = (endTime - startTime) + let minDuration = iterations * maxConnectionAge + XCTAssertGreaterThanOrEqual(duration, minDuration) + // Allow a few seconds of slack for max duration as some CI systems can be slow. + let maxDuration = iterations * maxConnectionAge + .seconds(5) + XCTAssertLessThanOrEqual(duration, maxDuration) + } + } + + private final class RPCOnConnectionClosedDelegate: GRPCConnectionPoolDelegate { + enum Event: Sendable, Hashable { + case connectSucceeded(GRPCConnectionID) + case connectionQuiescing(GRPCConnectionID) + case connectionClosed(GRPCConnectionID) + } + + private struct State { + var events: [Event] = [] + var echo: Echo_EchoNIOClient? = nil + var iterations: Int + } + + private let state: NIOLockedValueBox + private let done: EventLoopPromise<[Event]> + + func setEcho(_ echo: Echo_EchoNIOClient?) { + self.state.withLockedValue { state in + state.echo = echo + } + } + + init(iterations: Int, done: EventLoopPromise<[Event]>) { + self.state = NIOLockedValueBox(State(iterations: iterations)) + self.done = done + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + self.state.withLockedValue { state in + state.events.append(.connectSucceeded(id)) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.state.withLockedValue { state in + state.events.append(.connectionQuiescing(id)) + } + } + + func connectionClosed(id: GRPCConnectionID, error: (any Error)?) { + enum Action { + case doNextRPC(Echo_EchoNIOClient) + case done([Event]) + } + + let action: Action = self.state.withLockedValue { state in + state.events.append(.connectionClosed(id)) + + if state.iterations > 0 { + state.iterations -= 1 + return .doNextRPC(state.echo!) + } else { + return .done(state.events) + } + } + + switch action { + case .doNextRPC(let echo): + // Start an RPC to trigger a connect. The result doesn't matter: + _ = echo.get(.with { $0.text = "hello" }) + case .done(let events): + self.done.succeed(events) + } + } + + func connectionAdded(id: GRPCConnectionID) {} + func connectionRemoved(id: GRPCConnectionID) {} + func startedConnecting(id: GRPCConnectionID) {} + func connectFailed(id: GRPCConnectionID, error: any Error) {} + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + } + } + + func testRPCContinuesAfterQuiescing() throws { + // Check that an in-flight RPC can continue to run after the connection is quiescing as a result + // of aging out. + + // The pool creates one sub-pool per event loop. Use a single loop to simplify connection + // counting. + let eventLoop = MultiThreadedEventLoopGroup.singleton.next() + let isQuiescing = eventLoop.makePromise(of: Void.self) + + try withEchoClient(group: eventLoop) { config in + config.maxConnectionAge = .milliseconds(50) + config.delegate = SucceedOnQuiescing(promise: isQuiescing) + } test: { echo in + // Send an initial message. + let rpc = echo.collect() + try rpc.sendMessage(.with { $0.text = "1" }).wait() + + // Wait for the connection to quiesce. + try isQuiescing.futureResult.wait() + + // Send a few more messages then end. + try rpc.sendMessage(.with { $0.text = "2" }).wait() + try rpc.sendMessage(.with { $0.text = "3" }).wait() + try rpc.sendEnd().wait() + + let response = try rpc.response.wait() + XCTAssertEqual(response.text, "Swift echo collect: 1 2 3") + } + } + + final class SucceedOnQuiescing: GRPCConnectionPoolDelegate { + private let quiescingPromise: EventLoopPromise + + init(promise: EventLoopPromise) { + self.quiescingPromise = promise + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.quiescingPromise.succeed() + } + + func connectionAdded(id: GRPCConnectionID) {} + func connectionRemoved(id: GRPCConnectionID) {} + func startedConnecting(id: GRPCConnectionID) {} + func connectFailed(id: GRPCConnectionID, error: any Error) {} + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) {} + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + } + func connectionClosed(id: GRPCConnectionID, error: (any Error)?) {} + } + +}