Skip to content

Address some race conditions during the graceful shutdown process. #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 70 additions & 20 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public actor ServiceGroup: Sendable {
private let cancellationSignals: [UnixSignal]
/// The current state of the group.
private var state: State
private var injectedChildTaskResultsStream: AsyncStream<ChildTaskResult?>? = nil

/// Initializes a new ``ServiceGroup``.
///
Expand Down Expand Up @@ -103,6 +104,12 @@ public actor ServiceGroup: Sendable {
self.maximumGracefulShutdownDuration = configuration._maximumGracefulShutdownDuration
self.maximumCancellationDuration = configuration._maximumCancellationDuration
}

/// For testing, injects ChildTaskResults into the graceful shutdown process as if they were coming from the task group.
/// nil entries or finishing the stream will consume the actual results coming from the task group.
internal func setInjectedChildTaskResultsStream(_ new: AsyncStream<ChildTaskResult?>) {
self.injectedChildTaskResultsStream = new
}

/// Runs all the services by spinning up a child task per service.
/// Furthermore, this method sets up the correct signal handlers
Expand Down Expand Up @@ -177,7 +184,7 @@ public actor ServiceGroup: Sendable {
}
}

private enum ChildTaskResult {
internal enum ChildTaskResult {
case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int)
case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error)
case signalCaught(UnixSignal)
Expand Down Expand Up @@ -291,6 +298,7 @@ public actor ServiceGroup: Sendable {
// We are storing the services in an optional array now. When a slot in the array is
// empty it indicates that the service has been shutdown.
var services = services.map { Optional($0) }
var hasShutdownGracefully = false

precondition(gracefulShutdownManagers.count == services.count, "We did not create a graceful shutdown manager per service")

Expand Down Expand Up @@ -451,11 +459,17 @@ public actor ServiceGroup: Sendable {
group: &group,
gracefulShutdownManagers: gracefulShutdownManagers
)

hasShutdownGracefully = true
} catch {
return .failure(error)
}

case .cancellationCaught:
// can be safely ignored if graceful shutdown has already occurred
guard !hasShutdownGracefully else {
continue
}
// We caught cancellation in our child task so we have to spawn
// our cancellation timeout task if needed
self.logger.debug("Caught cancellation.")
Expand Down Expand Up @@ -507,30 +521,59 @@ public actor ServiceGroup: Sendable {

// We are storing the first error of a service that threw here.
var error: Error?

var runningServices = services

var injectedChildTaskResultsStreamIterator = self.injectedChildTaskResultsStream?.makeAsyncIterator()

// We have to shutdown the services in reverse. To do this
// we are going to signal each child task the graceful shutdown and then wait for
// its exit.
for (gracefulShutdownIndex, gracefulShutdownManager) in gracefulShutdownManagers.lazy.enumerated().reversed() {
guard let service = services[gracefulShutdownIndex] else {
var gracefulShutdownManagersIterator = gracefulShutdownManagers.lazy.enumerated().reversed().makeIterator()
var iteratorEntry = gracefulShutdownManagersIterator.next()
var lastServiceShutdownIndex: Int?

while let (gracefulShutdownIndex, gracefulShutdownManager) = iteratorEntry {
guard let service = runningServices[gracefulShutdownIndex] else {
// if this service had previously shutdown
if lastServiceShutdownIndex != gracefulShutdownIndex {
self.logger.debug(
"Service already finished. Skipping shutdown"
)
}

// move to the next service
iteratorEntry = gracefulShutdownManagersIterator.next()
continue
}

// if graceful shutdown hasn't been triggered for this service yet
if lastServiceShutdownIndex != gracefulShutdownIndex {
self.logger.debug(
"Service already finished. Skipping shutdown"
"Triggering graceful shutdown for service",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)
continue

gracefulShutdownManager.shutdownGracefully()
lastServiceShutdownIndex = gracefulShutdownIndex
}

var result: ChildTaskResult?

// if there is an injected result and that injected result is not a nil entry (which will be ignored)
if let injectedOptionalResult = await injectedChildTaskResultsStreamIterator?.next(), let injectedResult = injectedOptionalResult {
result = injectedResult
} else {
// otherwise get the next result from the task group
result = try await group.next()
}
self.logger.debug(
"Triggering graceful shutdown for service",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)

gracefulShutdownManager.shutdownGracefully()

let result = try await group.next()

switch result {
case .serviceFinished(let service, let index):
// this service is now shutdown
runningServices[index] = nil

if group.isCancelled {
// The group is cancelled and we expect all services to finish
continue
Expand Down Expand Up @@ -559,7 +602,12 @@ public actor ServiceGroup: Sendable {
throw ServiceGroupError.serviceFinishedUnexpectedly()
}

case .serviceThrew(let service, _, let serviceError):
case .serviceThrew(let service, let index, let serviceError):
// we no longer need to wait on this service to shutdown
// if this is the current service being shutdown, waiting for it will be skipped
// on the next iteration, otherwise we will continue waiting for the current service
runningServices[index] = nil

switch service.failureTerminationBehavior.behavior {
case .cancelGroup:
self.logger.debug(
Expand Down Expand Up @@ -608,6 +656,8 @@ public actor ServiceGroup: Sendable {
)

self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)
} else {
self.logger.debug("Ignoring signal \(signal) during graceful shutdown.")
}

case .gracefulShutdownTimedOut:
Expand All @@ -629,8 +679,8 @@ public actor ServiceGroup: Sendable {

case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished:
// We just have to tolerate this since signals and parent graceful shutdowns downs can race.
continue

self.logger.debug("Ignoring result \(String(describing: result)) during graceful shutdown.")
case nil:
fatalError("Invalid result from group.next().")
}
Expand Down Expand Up @@ -692,7 +742,7 @@ public actor ServiceGroup: Sendable {

// This should be removed once we support Swift 5.9+
extension AsyncStream {
fileprivate static func makeStream(
internal static func makeStream(
of elementType: Element.Type = Element.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
Expand Down
Loading