Skip to content

Commit 8f06b41

Browse files
committed
Use new state machines in async server handler
Motivation: In grpc#1394 and grpc#1396 we introduced new state machines for the server interceptors and handler. This change updates the async server handler to make use of them. Modifications: - Add the relevant `@inlinable` and `@usableFromInline` annotations to both state machines. - Refactor the async server handler to use both state machines. - Refactor async handler tests to use a 'real' event loop; the previous mix of embedded and async was unsafe. - Re-enable TSAN Result: - Better separation between interceptors and user func - TSAN is happier - Resolves grpc#1362
1 parent 65f7bee commit 8f06b41

16 files changed

+935
-585
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ jobs:
2525
matrix:
2626
include:
2727
- image: swift:5.6-focal
28+
swift-test-flags: "--sanitize=thread"
2829
- image: swift:5.5-focal
2930
swift-test-flags: "--sanitize=thread"
3031
- image: swift:5.4-focal
@@ -106,8 +107,6 @@ jobs:
106107
GRPC_NO_NIO_SSL: 1
107108
timeout-minutes: 20
108109
- name: Test without NIOSSL
109-
# Skip tests on 5.6: https://bugs.swift.org/browse/SR-15955
110-
if: ${{ matrix.image != 'swift:5.6-focal' }}
111110
run: swift test
112111
env:
113112
GRPC_NO_NIO_SSL: 1

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Actions.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import NIOHPACK
1818

1919
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
2020
extension ServerHandlerStateMachine {
21+
@usableFromInline
2122
enum HandleMetadataAction {
2223
/// Invoke the user handler.
2324
case invokeHandler(Ref<UserInfo>, CallHandlerContext)
2425
/// Cancel the RPC, the metadata was not expected.
2526
case cancel
2627
}
2728

29+
@usableFromInline
2830
enum HandleMessageAction: Hashable {
2931
/// Forward the message to the interceptors, via the interceptor state machine.
3032
case forward
@@ -33,8 +35,10 @@ extension ServerHandlerStateMachine {
3335
}
3436

3537
/// The same as 'HandleMessageAction.
38+
@usableFromInline
3639
typealias HandleEndAction = HandleMessageAction
3740

41+
@usableFromInline
3842
enum SendMessageAction: Equatable {
3943
/// Intercept the message, but first intercept the headers if they are non-nil. Must go via
4044
/// the interceptor state machine first.
@@ -43,13 +47,15 @@ extension ServerHandlerStateMachine {
4347
case drop
4448
}
4549

50+
@usableFromInline
4651
enum SendStatusAction: Equatable {
4752
/// Intercept the status, providing the given trailers.
4853
case intercept(trailers: HPACKHeaders)
4954
/// Drop the status.
5055
case drop
5156
}
5257

58+
@usableFromInline
5359
enum CancelAction: Hashable {
5460
/// Cancel and nil out the handler 'bits'.
5561
case cancelAndNilOutHandlerComponents

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Draining.swift

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,46 @@ extension ServerHandlerStateMachine {
2222
/// closed (i.e. we have seen 'end' but it has not necessarily been consumed by the user handler).
2323
/// We can transition to a new state either by sending the end of the response stream or by
2424
/// cancelling.
25+
@usableFromInline
2526
internal struct Draining {
27+
@usableFromInline
2628
typealias NextStateAndOutput<Output> =
2729
ServerHandlerStateMachine.NextStateAndOutput<
2830
ServerHandlerStateMachine.Draining.NextState,
2931
Output
3032
>
3133

3234
/// Whether the response headers have been written yet.
33-
private var headersWritten: Bool
35+
@usableFromInline
36+
internal private(set) var headersWritten: Bool
37+
@usableFromInline
3438
internal let context: GRPCAsyncServerCallContext
3539

40+
@inlinable
3641
init(from state: ServerHandlerStateMachine.Handling) {
3742
self.headersWritten = state.headersWritten
3843
self.context = state.context
3944
}
4045

46+
@inlinable
4147
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
4248
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
4349
return .init(nextState: .draining(self), output: .cancel)
4450
}
4551

52+
@inlinable
4653
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
4754
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
4855
return .init(nextState: .draining(self), output: .cancel)
4956
}
5057

58+
@inlinable
5159
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
5260
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
5361
return .init(nextState: .draining(self), output: .cancel)
5462
}
5563

64+
@inlinable
5665
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
5766
let headers: HPACKHeaders?
5867

@@ -66,11 +75,13 @@ extension ServerHandlerStateMachine {
6675
return .init(nextState: .draining(self), output: .intercept(headers: headers))
6776
}
6877

78+
@inlinable
6979
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
7080
let trailers = self.context.trailingResponseMetadata
7181
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
7282
}
7383

84+
@inlinable
7485
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
7586
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
7687
}

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Finished.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,36 +17,47 @@
1717

1818
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
1919
extension ServerHandlerStateMachine {
20+
@usableFromInline
2021
internal struct Finished {
22+
@usableFromInline
2123
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
2224
ServerHandlerStateMachine.Finished.NextState,
2325
Output
2426
>
2527

28+
@inlinable
2629
internal init(from state: ServerHandlerStateMachine.Idle) {}
30+
@inlinable
2731
internal init(from state: ServerHandlerStateMachine.Handling) {}
32+
@inlinable
2833
internal init(from state: ServerHandlerStateMachine.Draining) {}
2934

35+
@inlinable
3036
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
3137
return .init(nextState: .finished(self), output: .cancel)
3238
}
3339

40+
@inlinable
3441
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
3542
return .init(nextState: .finished(self), output: .cancel)
3643
}
3744

45+
@inlinable
3846
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
3947
return .init(nextState: .finished(self), output: .cancel)
4048
}
4149

50+
@inlinable
4251
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
4352
return .init(nextState: .finished(self), output: .drop)
4453
}
4554

55+
@inlinable
4656
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
4757
return .init(nextState: .finished(self), output: .drop)
4858
}
4959

60+
@inlinable
5061
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
5162
return .init(nextState: .finished(self), output: .cancelAndNilOutHandlerComponents)
5263
}

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Handling.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,43 +22,52 @@ extension ServerHandlerStateMachine {
2222
/// the request metadata has already been seen). We can transition to a new state either by
2323
/// receiving the end of the request stream or by closing the response stream. Cancelling also
2424
/// moves us to the finished state.
25+
@usableFromInline
2526
internal struct Handling {
27+
@usableFromInline
2628
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
2729
ServerHandlerStateMachine.Handling.NextState,
2830
Output
2931
>
3032

3133
/// Whether response headers have been written (they are written lazily rather than on receipt
3234
/// of the request headers).
35+
@usableFromInline
3336
internal private(set) var headersWritten: Bool
3437

3538
/// A context held by user handler which may be used to alter the response headers or trailers.
39+
@usableFromInline
3640
internal let context: GRPCAsyncServerCallContext
3741

3842
/// Transition from the 'Idle' state.
43+
@inlinable
3944
init(from state: ServerHandlerStateMachine.Idle, context: GRPCAsyncServerCallContext) {
4045
self.headersWritten = false
4146
self.context = context
4247
}
4348

49+
@inlinable
4450
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
4551
// We are in the 'Handling' state because we received metadata. If we receive it again we
4652
// should cancel the RPC.
4753
return .init(nextState: .handling(self), output: .cancel)
4854
}
4955

56+
@inlinable
5057
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
5158
// We can always forward a message since receiving the end of the request stream causes a
5259
// transition to the 'draining' state.
5360
return .init(nextState: .handling(self), output: .forward)
5461
}
5562

63+
@inlinable
5664
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
5765
// The request stream is finished: move to the draining state so the user handler can finish
5866
// executing.
5967
return .init(nextState: .draining(from: self), output: .forward)
6068
}
6169

70+
@inlinable
6271
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
6372
let headers: HPACKHeaders?
6473

@@ -73,13 +82,15 @@ extension ServerHandlerStateMachine {
7382
return .init(nextState: .handling(self), output: .intercept(headers: headers))
7483
}
7584

85+
@inlinable
7686
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
7787
// Sending the status is the final action taken by the user handler. We can always send
7888
// them from this state and doing so means the user handler has completed.
7989
let trailers = self.context.trailingResponseMetadata
8090
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
8191
}
8292

93+
@inlinable
8394
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
8495
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
8596
}

Sources/GRPC/AsyncAwaitSupport/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Idle.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,31 @@ extension ServerHandlerStateMachine {
2121
/// the request headers) and invoke the handler, or we are cancelled.
2222
@usableFromInline
2323
internal struct Idle {
24+
@usableFromInline
2425
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
2526
ServerHandlerStateMachine.Idle.NextState,
2627
Output
2728
>
2829

2930
/// A ref to the `UserInfo`. We hold on to this until we're ready to invoke the handler.
31+
@usableFromInline
3032
let userInfoRef: Ref<UserInfo>
3133
/// A bag of bits required to construct a context passed to the user handler when it is invoked.
34+
@usableFromInline
3235
let callHandlerContext: CallHandlerContext
3336

3437
/// The state of the inbound stream, i.e. the request stream.
38+
@usableFromInline
3539
internal private(set) var inboundState: ServerInterceptorStateMachine.InboundStreamState
3640

41+
@inlinable
3742
init(userInfoRef: Ref<UserInfo>, context: CallHandlerContext) {
3843
self.userInfoRef = userInfoRef
3944
self.callHandlerContext = context
4045
self.inboundState = .idle
4146
}
4247

48+
@inlinable
4349
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
4450
let action: HandleMetadataAction
4551

@@ -55,23 +61,27 @@ extension ServerHandlerStateMachine {
5561
return .init(nextState: .idle(self), output: action)
5662
}
5763

64+
@inlinable
5865
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
5966
// We can't receive a message before the metadata, doing so is a protocol violation.
6067
return .init(nextState: .idle(self), output: .cancel)
6168
}
6269

70+
@inlinable
6371
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
6472
// Receiving 'end' before we start is odd but okay, just cancel.
6573
return .init(nextState: .idle(self), output: .cancel)
6674
}
6775

76+
@inlinable
6877
mutating func handlerInvoked(
6978
context: GRPCAsyncServerCallContext
7079
) -> Self.NextStateAndOutput<Void> {
7180
// The handler was invoked as a result of receiving metadata. Move to the next state.
7281
return .init(nextState: .handling(from: self, context: context))
7382
}
7483

84+
@inlinable
7585
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
7686
// There's no handler to cancel. Move straight to finished.
7787
return .init(nextState: .finished(from: self), output: .none)

0 commit comments

Comments
 (0)