Skip to content

Commit 07eaf9c

Browse files
committed
Server handler interceptor state machine
Motivation: The existing async server handler has a state machine which deals with the interceptor pipeline and the lifecycle of user handler. These are really two separate entities each with its own state machine. This PR is the first in a series which will eventually update the async server handler to make use of the two state machines. Modifications: - Add a 'ServerInterceptorStateMachine' and tests. This acts as a filter for inbound and outbound request and response messages, both as they enter and exit the interceptor pipeline. Message parts delievered in the wrong order lead to the RPC being cancelled, parts delievered after the RPC has finished lead to them being dropped. Cancelling the RPC informs the caller to nil out the associated interceptor pipeline. - Tests - This is not used anywhere (yet). Result: Interceptor state machine is in place.
1 parent c4efc34 commit 07eaf9c

File tree

7 files changed

+960
-0
lines changed

7 files changed

+960
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.6)
17+
extension ServerInterceptorStateMachine {
18+
enum InterceptAction: Hashable {
19+
/// Forward the message to the interceptor pipeline.
20+
case intercept
21+
/// Cancel the call.
22+
case cancel
23+
/// Drop the message.
24+
case drop
25+
26+
init(from streamFilter: ServerInterceptorStateMachine.StreamFilter) {
27+
switch streamFilter {
28+
case .accept:
29+
self = .intercept
30+
case .reject:
31+
self = .cancel
32+
}
33+
}
34+
}
35+
36+
enum InterceptedAction: Hashable {
37+
/// Forward the message to the network or user handler.
38+
case forward
39+
/// Cancel the call.
40+
case cancel
41+
/// Drop the message.
42+
case drop
43+
44+
init(from streamFilter: ServerInterceptorStateMachine.StreamFilter) {
45+
switch streamFilter {
46+
case .accept:
47+
self = .forward
48+
case .reject:
49+
self = .cancel
50+
}
51+
}
52+
}
53+
54+
enum CancelAction: Hashable {
55+
/// Nil out the interceptor pipeline.
56+
case nilOutInterceptorPipeline
57+
/// Do nothing.
58+
case none
59+
}
60+
}
61+
#endif // compiler(>=5.6)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.6)
17+
extension ServerInterceptorStateMachine {
18+
/// The 'Finished' state is, as the name suggests, a terminal state. Nothing can happen in this
19+
/// state.
20+
@usableFromInline
21+
struct Finished {
22+
typealias NextStateAndOutput<Output> =
23+
ServerInterceptorStateMachine.NextStateAndOutput<Self.NextState, Output>
24+
25+
init(from state: ServerInterceptorStateMachine.Intercepting) {}
26+
27+
mutating func interceptRequestMetadata() -> Self.NextStateAndOutput<InterceptAction> {
28+
return .init(nextState: .finished(self), output: .drop)
29+
}
30+
31+
mutating func interceptRequestMessage() -> Self.NextStateAndOutput<InterceptAction> {
32+
return .init(nextState: .finished(self), output: .drop)
33+
}
34+
35+
mutating func interceptRequestEnd() -> Self.NextStateAndOutput<InterceptAction> {
36+
return .init(nextState: .finished(self), output: .drop)
37+
}
38+
39+
mutating func interceptedRequestMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
40+
return .init(nextState: .finished(self), output: .drop)
41+
}
42+
43+
mutating func interceptedRequestMessage() -> Self.NextStateAndOutput<InterceptedAction> {
44+
return .init(nextState: .finished(self), output: .drop)
45+
}
46+
47+
mutating func interceptedRequestEnd() -> Self.NextStateAndOutput<InterceptedAction> {
48+
return .init(nextState: .finished(self), output: .drop)
49+
}
50+
51+
mutating func interceptResponseMetadata() -> Self.NextStateAndOutput<InterceptAction> {
52+
return .init(nextState: .finished(self), output: .drop)
53+
}
54+
55+
mutating func interceptResponseMessage() -> Self.NextStateAndOutput<InterceptAction> {
56+
return .init(nextState: .finished(self), output: .drop)
57+
}
58+
59+
mutating func interceptResponseStatus() -> Self.NextStateAndOutput<InterceptAction> {
60+
return .init(nextState: .finished(self), output: .drop)
61+
}
62+
63+
mutating func interceptedResponseMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
64+
return .init(nextState: .finished(self), output: .drop)
65+
}
66+
67+
mutating func interceptedResponseMessage() -> Self.NextStateAndOutput<InterceptedAction> {
68+
return .init(nextState: .finished(self), output: .drop)
69+
}
70+
71+
mutating func interceptedResponseStatus() -> Self.NextStateAndOutput<InterceptedAction> {
72+
return .init(nextState: .finished(self), output: .drop)
73+
}
74+
75+
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
76+
return .init(nextState: .finished(self), output: .nilOutInterceptorPipeline)
77+
}
78+
}
79+
}
80+
#endif // compiler(>=5.6)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2022, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.6)
17+
extension ServerInterceptorStateMachine {
18+
/// The 'Intercepting' state is responsible for validating that appropriate message parts are
19+
/// forwarded to the interceptor pipeline and that messages parts which have been emitted from the
20+
/// interceptors are valid to forward to either the network or the user handler (as interceptors
21+
/// may emit new message parts).
22+
///
23+
/// We only transition to the next state on `cancel` (which happens at the end of every RPC).
24+
@usableFromInline
25+
struct Intercepting {
26+
typealias NextStateAndOutput<Output> =
27+
ServerInterceptorStateMachine.NextStateAndOutput<Self.NextState, Output>
28+
29+
/// From the network into the interceptors.
30+
private var requestStreamIn: InboundStreamState
31+
/// From the interceptors out to the handler.
32+
private var requestStreamOut: InboundStreamState
33+
34+
/// From the handler into the interceptors.
35+
private var responseStreamIn: OutboundStreamState
36+
/// From the interceptors out to the network.
37+
private var responseStreamOut: OutboundStreamState
38+
39+
init() {
40+
self.requestStreamIn = .idle
41+
self.requestStreamOut = .idle
42+
self.responseStreamIn = .idle
43+
self.responseStreamOut = .idle
44+
}
45+
46+
mutating func interceptRequestMetadata() -> Self.NextStateAndOutput<InterceptAction> {
47+
let filter = self.requestStreamIn.receiveMetadata()
48+
return .init(nextState: .intercepting(self), output: .init(from: filter))
49+
}
50+
51+
mutating func interceptRequestMessage() -> Self.NextStateAndOutput<InterceptAction> {
52+
let filter = self.requestStreamIn.receiveMessage()
53+
return .init(nextState: .intercepting(self), output: .init(from: filter))
54+
}
55+
56+
mutating func interceptRequestEnd() -> Self.NextStateAndOutput<InterceptAction> {
57+
let filter = self.requestStreamIn.receiveEnd()
58+
return .init(nextState: .intercepting(self), output: .init(from: filter))
59+
}
60+
61+
mutating func interceptedRequestMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
62+
let filter = self.requestStreamOut.receiveMetadata()
63+
return .init(nextState: .intercepting(self), output: .init(from: filter))
64+
}
65+
66+
mutating func interceptedRequestMessage() -> Self.NextStateAndOutput<InterceptedAction> {
67+
let filter = self.requestStreamOut.receiveMessage()
68+
return .init(nextState: .intercepting(self), output: .init(from: filter))
69+
}
70+
71+
mutating func interceptedRequestEnd() -> Self.NextStateAndOutput<InterceptedAction> {
72+
let filter = self.requestStreamOut.receiveEnd()
73+
return .init(nextState: .intercepting(self), output: .init(from: filter))
74+
}
75+
76+
mutating func interceptResponseMetadata() -> Self.NextStateAndOutput<InterceptAction> {
77+
let filter = self.responseStreamIn.sendMetadata()
78+
return .init(nextState: .intercepting(self), output: .init(from: filter))
79+
}
80+
81+
mutating func interceptResponseMessage() -> Self.NextStateAndOutput<InterceptAction> {
82+
let filter = self.responseStreamIn.sendMessage()
83+
return .init(nextState: .intercepting(self), output: .init(from: filter))
84+
}
85+
86+
mutating func interceptResponseStatus() -> Self.NextStateAndOutput<InterceptAction> {
87+
let filter = self.responseStreamIn.sendEnd()
88+
return .init(nextState: .intercepting(self), output: .init(from: filter))
89+
}
90+
91+
mutating func interceptedResponseMetadata() -> Self.NextStateAndOutput<InterceptedAction> {
92+
let filter = self.responseStreamOut.sendMetadata()
93+
return .init(nextState: .intercepting(self), output: .init(from: filter))
94+
}
95+
96+
mutating func interceptedResponseMessage() -> Self.NextStateAndOutput<InterceptedAction> {
97+
let filter = self.responseStreamOut.sendMessage()
98+
return .init(nextState: .intercepting(self), output: .init(from: filter))
99+
}
100+
101+
mutating func interceptedResponseStatus() -> Self.NextStateAndOutput<InterceptedAction> {
102+
let filter = self.responseStreamOut.sendEnd()
103+
return .init(nextState: .intercepting(self), output: .init(from: filter))
104+
}
105+
106+
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
107+
return .init(nextState: .finished(from: self), output: .nilOutInterceptorPipeline)
108+
}
109+
}
110+
}
111+
#endif // compiler(>=5.6)

0 commit comments

Comments
 (0)