diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index f3457de..9ac41d6 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -43,6 +43,7 @@ public actor ServiceGroup: Sendable { private let cancellationSignals: [UnixSignal] /// The current state of the group. private var state: State + private var injectedChildTaskResultsStream: AsyncStream? = nil /// Initializes a new ``ServiceGroup``. /// @@ -103,6 +104,12 @@ public actor ServiceGroup: Sendable { self.maximumGracefulShutdownDuration = configuration._maximumGracefulShutdownDuration self.maximumCancellationDuration = configuration._maximumCancellationDuration } + + /// For testing, injects ChildTaskResults into the graceful shutdown process as if they were coming from the task group. + /// nil entries or finishing the stream will consume the actual results coming from the task group. + internal func setInjectedChildTaskResultsStream(_ new: AsyncStream) { + self.injectedChildTaskResultsStream = new + } /// Runs all the services by spinning up a child task per service. /// Furthermore, this method sets up the correct signal handlers @@ -177,7 +184,7 @@ public actor ServiceGroup: Sendable { } } - private enum ChildTaskResult { + internal enum ChildTaskResult { case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int) case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error) case signalCaught(UnixSignal) @@ -291,6 +298,7 @@ public actor ServiceGroup: Sendable { // We are storing the services in an optional array now. When a slot in the array is // empty it indicates that the service has been shutdown. var services = services.map { Optional($0) } + var hasShutdownGracefully = false precondition(gracefulShutdownManagers.count == services.count, "We did not create a graceful shutdown manager per service") @@ -451,11 +459,17 @@ public actor ServiceGroup: Sendable { group: &group, gracefulShutdownManagers: gracefulShutdownManagers ) + + hasShutdownGracefully = true } catch { return .failure(error) } case .cancellationCaught: + // can be safely ignored if graceful shutdown has already occurred + guard !hasShutdownGracefully else { + continue + } // We caught cancellation in our child task so we have to spawn // our cancellation timeout task if needed self.logger.debug("Caught cancellation.") @@ -507,30 +521,59 @@ public actor ServiceGroup: Sendable { // We are storing the first error of a service that threw here. var error: Error? - + var runningServices = services + + var injectedChildTaskResultsStreamIterator = self.injectedChildTaskResultsStream?.makeAsyncIterator() + // We have to shutdown the services in reverse. To do this // we are going to signal each child task the graceful shutdown and then wait for // its exit. - for (gracefulShutdownIndex, gracefulShutdownManager) in gracefulShutdownManagers.lazy.enumerated().reversed() { - guard let service = services[gracefulShutdownIndex] else { + var gracefulShutdownManagersIterator = gracefulShutdownManagers.lazy.enumerated().reversed().makeIterator() + var iteratorEntry = gracefulShutdownManagersIterator.next() + var lastServiceShutdownIndex: Int? + + while let (gracefulShutdownIndex, gracefulShutdownManager) = iteratorEntry { + guard let service = runningServices[gracefulShutdownIndex] else { + // if this service had previously shutdown + if lastServiceShutdownIndex != gracefulShutdownIndex { + self.logger.debug( + "Service already finished. Skipping shutdown" + ) + } + + // move to the next service + iteratorEntry = gracefulShutdownManagersIterator.next() + continue + } + + // if graceful shutdown hasn't been triggered for this service yet + if lastServiceShutdownIndex != gracefulShutdownIndex { self.logger.debug( - "Service already finished. Skipping shutdown" + "Triggering graceful shutdown for service", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + ] ) - continue + + gracefulShutdownManager.shutdownGracefully() + lastServiceShutdownIndex = gracefulShutdownIndex + } + + var result: ChildTaskResult? + + // if there is an injected result and that injected result is not a nil entry (which will be ignored) + if let injectedOptionalResult = await injectedChildTaskResultsStreamIterator?.next(), let injectedResult = injectedOptionalResult { + result = injectedResult + } else { + // otherwise get the next result from the task group + result = try await group.next() } - self.logger.debug( - "Triggering graceful shutdown for service", - metadata: [ - self.loggingConfiguration.keys.serviceKey: "\(service.service)", - ] - ) - - gracefulShutdownManager.shutdownGracefully() - - let result = try await group.next() switch result { case .serviceFinished(let service, let index): + // this service is now shutdown + runningServices[index] = nil + if group.isCancelled { // The group is cancelled and we expect all services to finish continue @@ -559,7 +602,12 @@ public actor ServiceGroup: Sendable { throw ServiceGroupError.serviceFinishedUnexpectedly() } - case .serviceThrew(let service, _, let serviceError): + case .serviceThrew(let service, let index, let serviceError): + // we no longer need to wait on this service to shutdown + // if this is the current service being shutdown, waiting for it will be skipped + // on the next iteration, otherwise we will continue waiting for the current service + runningServices[index] = nil + switch service.failureTerminationBehavior.behavior { case .cancelGroup: self.logger.debug( @@ -608,6 +656,8 @@ public actor ServiceGroup: Sendable { ) self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) + } else { + self.logger.debug("Ignoring signal \(signal) during graceful shutdown.") } case .gracefulShutdownTimedOut: @@ -629,8 +679,8 @@ public actor ServiceGroup: Sendable { case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: // We just have to tolerate this since signals and parent graceful shutdowns downs can race. - continue - + self.logger.debug("Ignoring result \(String(describing: result)) during graceful shutdown.") + case nil: fatalError("Invalid result from group.next().") } @@ -692,7 +742,7 @@ public actor ServiceGroup: Sendable { // This should be removed once we support Swift 5.9+ extension AsyncStream { - fileprivate static func makeStream( + internal static func makeStream( of elementType: Element.Type = Element.self, bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index b227d56..649bb1a 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import Logging -import ServiceLifecycle +@testable import ServiceLifecycle import UnixSignals import XCTest @@ -1171,6 +1171,568 @@ final class ServiceGroupTests: XCTestCase { try await XCTAsyncAssertNoThrow(await group.next()) } } + + func testTriggerGracefulShutdown_concurrentSignalSequenceFinished() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)] + ) + + let (injectedChildTaskResultsStream, injectedChildTaskResultsStreamContinuation) = AsyncStream.makeStream(of: ServiceGroup.ChildTaskResult?.self) + await serviceGroup.setInjectedChildTaskResultsStream(injectedChildTaskResultsStream) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + // this will simulate a `.signalSequenceFinished` result from the task group after graceful shutdown + // has started but before the completion signal of the first service. + injectedChildTaskResultsStreamContinuation.yield(.signalSequenceFinished) + injectedChildTaskResultsStreamContinuation.finish() + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_concurrentGracefulShutdownFinished() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)] + ) + + let (injectedChildTaskResultsStream, injectedChildTaskResultsStreamContinuation) = AsyncStream.makeStream(of: ServiceGroup.ChildTaskResult?.self) + await serviceGroup.setInjectedChildTaskResultsStream(injectedChildTaskResultsStream) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + // this will simulate a `.gracefulShutdownFinished` result from the task group after graceful shutdown + // has started but before the completion signal of the first service. + injectedChildTaskResultsStreamContinuation.yield(.gracefulShutdownFinished) + injectedChildTaskResultsStreamContinuation.finish() + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_concurrentGracefulShutdownCaught() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)] + ) + + let (injectedChildTaskResultsStream, injectedChildTaskResultsStreamContinuation) = AsyncStream.makeStream(of: ServiceGroup.ChildTaskResult?.self) + await serviceGroup.setInjectedChildTaskResultsStream(injectedChildTaskResultsStream) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + // this will simulate a `.gracefulShutdownFinished` result from the task group after graceful shutdown + // has started but before the completion signal of the first service. + injectedChildTaskResultsStreamContinuation.yield(.gracefulShutdownCaught) + injectedChildTaskResultsStreamContinuation.finish() + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_ignoredSignal() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1), .init(service: service2), .init(service: service3)] + ) + + let (injectedChildTaskResultsStream, injectedChildTaskResultsStreamContinuation) = AsyncStream.makeStream(of: ServiceGroup.ChildTaskResult?.self) + await serviceGroup.setInjectedChildTaskResultsStream(injectedChildTaskResultsStream) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + // this will simulate a `.gracefulShutdownFinished` result from the task group after graceful shutdown + // has started but before the completion signal of the first service. + injectedChildTaskResultsStreamContinuation.yield(.signalCaught(.sigusr1)) + injectedChildTaskResultsStreamContinuation.finish() + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_serviceThrows_inOrder_gracefullyShutdownGroup() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service2, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3, failureTerminationBehavior: .gracefullyShutdownGroup)] + ) + + do { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + + XCTFail("Expected error not thrown") + } catch is ExampleError { + // expected error + } + } + + func testTriggerGracefulShutdown_serviceThrows_inOrder_ignore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .ignore), + .init(service: service2, failureTerminationBehavior: .ignore), + .init(service: service3, failureTerminationBehavior: .ignore)] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_serviceThrows_outOfOrder_gracefullyShutdownGroup() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service2, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3, failureTerminationBehavior: .gracefullyShutdownGroup)] + ) + + do { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service (even though the second service + // is gracefully shutting down) + await service1.resumeRunContinuation(with: .failure(ExampleError())) + + // Waiting to see that the one remaining are still running + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The first service shutdown will be skipped + try await group.waitForAll() + } + + XCTFail("Expected error not thrown") + } catch is ExampleError { + // expected error + } + } + + func testTriggerGracefulShutdown_serviceThrows_outOfOrder_ignore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .ignore), + .init(service: service2, failureTerminationBehavior: .ignore), + .init(service: service3, failureTerminationBehavior: .ignore)] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service (even though the second service + // is gracefully shutting down) + await service1.resumeRunContinuation(with: .failure(ExampleError())) + + // Waiting to see that the one remaining are still running + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The first service shutdown will be skipped + try await group.waitForAll() + } + } // MARK: - Helpers