diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index d67eede..6c1c41d 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -752,14 +752,15 @@ extension Optional where Wrapped == String { } } -internal func withAsyncTaskCleanupHandler( - _ body: () async throws -> Result, +internal func withAsyncTaskCleanupHandler( + _ body: () async throws -> Success, onCleanup handler: @Sendable @escaping () async -> Void, isolation: isolated (any Actor)? = #isolation -) async rethrows -> Result { +) async rethrows -> Success { + let runCancellationHandlerPromise = Promise() return try await withThrowingTaskGroup( of: Void.self, - returning: Result.self + returning: Success.self ) { group in group.addTask { // Keep this task sleep indefinitely until the parent task is cancelled. @@ -767,16 +768,140 @@ internal func withAsyncTaskCleanupHandler( // before the time ends. We then run the cancel handler. do { while true { try await Task.sleep(nanoseconds: 1_000_000_000) } } catch {} // Run task cancel handler - await handler() + runCancellationHandlerPromise.fulfill(with: true) + } + + group.addTask { + if await runCancellationHandlerPromise.value { + await handler() + } + } + + defer { + group.cancelAll() } do { let result = try await body() - group.cancelAll() + runCancellationHandlerPromise.fulfill(with: false) return result } catch { - await handler() + runCancellationHandlerPromise.fulfill(with: true) throw error } } } + +#if canImport(Darwin) +import os +#else +import Synchronization +#endif + +/// Represents a placeholder for a value which will be provided by synchronous code running in another execution context. +internal final class Promise: Sendable { + private struct State { + var waiters: [CheckedContinuation] = [] + var value: Result? + } + + #if canImport(Darwin) + private let state: OSAllocatedUnfairLock + #else + private let state: Mutex + #endif + + /// Creates a new promise in the unfulfilled state. + public init() { + #if canImport(Darwin) + state = OSAllocatedUnfairLock(initialState: State(value: nil)) + #else + state = Mutex(State(value: nil)) + #endif + } + + deinit { + state.withLock { state in + precondition(state.waiters.isEmpty, "Deallocated with remaining waiters") + } + } + + /// Fulfills the promise with the specified result. + /// + /// - returns: Whether the promise was already fulfilled. + @discardableResult + public func fulfill(with result: Result) -> Bool { + let (waiters, alreadyFulfilled): ([CheckedContinuation], Bool) = state.withLock { state in + if state.value != nil { + return ([], true) + } + state.value = result + let waiters = state.waiters + state.waiters.removeAll() + return (waiters, false) + } + + // Resume the continuations outside the lock to avoid potential deadlock if invoked in a cancellation handler. + for waiter in waiters { + waiter.resume(with: result) + } + + return alreadyFulfilled + } + + /// Fulfills the promise with the specified value. + /// + /// - returns: Whether the promise was already fulfilled. + @discardableResult + public func fulfill(with value: Success) -> Bool { + fulfill(with: .success(value)) + } + + /// Fulfills the promise with the specified error. + /// + /// - returns: Whether the promise was already fulfilled. + @discardableResult + public func fail(throwing error: Failure) -> Bool { + fulfill(with: .failure(error)) + } +} + +extension Promise where Success == Void { + /// Fulfills the promise. + /// + /// - returns: Whether the promise was already fulfilled. + @discardableResult + internal func fulfill() -> Bool { + fulfill(with: ()) + } +} + +extension Promise where Failure == Never { + /// Suspends if the promise is not yet fulfilled, and returns the value once it is. + internal var value: Success { + get async { + await withCheckedContinuation { continuation in + let value: Result? = state.withLock { state in + if let value = state.value { + return value + } else { + state.waiters.append(continuation) + return nil + } + } + + // Resume the continuations outside the lock to avoid potential deadlock if invoked in a cancellation handler. + if let value { + continuation.resume(with: value) + } + } + } + } + + /// Suspends if the promise is not yet fulfilled, and returns the result once it is. + internal var result: Result { + get async { + await .success(value) + } + } +}