Skip to content

Commit 3ce61d9

Browse files
authored
fix(codec): Enforce encoders/decoders are Sync (#84)
Closes #81
1 parent 5b4f468 commit 3ce61d9

File tree

11 files changed

+62
-53
lines changed

11 files changed

+62
-53
lines changed

tonic-build/src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {
124124

125125
quote! {
126126
#stream_doc
127-
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
127+
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;
128128

129129
#method_doc
130130
async fn #name(&self, request: tonic::Request<#req_message>)
@@ -142,7 +142,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {
142142

143143
quote! {
144144
#stream_doc
145-
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
145+
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;
146146

147147
#method_doc
148148
async fn #name(&self, request: tonic::Request<tonic::Streaming<#req_message>>)

tonic-examples/src/routeguide/server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ impl server::RouteGuide for RouteGuide {
108108
Ok(Response::new(summary))
109109
}
110110

111-
type RouteChatStream = Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>;
111+
type RouteChatStream =
112+
Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>>;
112113

113114
async fn route_chat(
114115
&self,
@@ -138,7 +139,7 @@ impl server::RouteGuide for RouteGuide {
138139

139140
Ok(Response::new(Box::pin(output)
140141
as Pin<
141-
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>,
142+
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>,
142143
>))
143144
}
144145
}

tonic-interop/src/server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ pub struct TestService;
1212

1313
type Result<T> = std::result::Result<Response<T>, Status>;
1414
type Streaming<T> = Request<tonic::Streaming<T>>;
15-
type Stream<T> =
16-
Pin<Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
15+
type Stream<T> = Pin<
16+
Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + Sync + 'static>,
17+
>;
1718

1819
#[tonic::async_trait]
1920
impl pb::server::TestService for TestService {

tonic/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ tls = []
4141
name = "bench_main"
4242
harness = false
4343

44-
[dev-dependencies]
45-
rand = "0.7.2"
46-
criterion = "0.3"
47-
4844
[dependencies]
4945
bytes = "0.4"
5046
futures-core-preview = "=0.3.0-alpha.19"
@@ -83,6 +79,11 @@ openssl1 = { package = "openssl", version = "0.10", optional = true }
8379
# rustls
8480
tokio-rustls = { version = "=0.12.0-alpha.4", optional = true }
8581

82+
[dev-dependencies]
83+
static_assertions = "1.0"
84+
rand = "0.7.2"
85+
criterion = "0.3"
86+
8687
[package.metadata.docs.rs]
8788
all-features = true
8889
rustdoc-args = ["--cfg", "docsrs"]

tonic/src/body.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::{
1515
pub(crate) type BytesBuf = <Bytes as IntoBuf>::Buf;
1616

1717
/// A trait alias for [`http_body::Body`].
18-
pub trait Body: sealed::Sealed {
18+
pub trait Body: sealed::Sealed + Send + Sync {
1919
/// The body data type.
2020
type Data: Buf;
2121
/// The errors produced from the body.
@@ -45,7 +45,7 @@ pub trait Body: sealed::Sealed {
4545

4646
impl<T> Body for T
4747
where
48-
T: HttpBody,
48+
T: HttpBody + Send + Sync + 'static,
4949
T::Error: Into<Error>,
5050
{
5151
type Data = T::Data;
@@ -83,7 +83,7 @@ mod sealed {
8383

8484
/// A type erased http body.
8585
pub struct BoxBody {
86-
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + 'static>>,
86+
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static>>,
8787
}
8888

8989
struct MapBody<B>(B);
@@ -92,7 +92,7 @@ impl BoxBody {
9292
/// Create a new `BoxBody` mapping item and error to the default types.
9393
pub fn new<B>(inner: B) -> Self
9494
where
95-
B: Body<Data = BytesBuf, Error = Status> + Send + 'static,
95+
B: Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static,
9696
{
9797
BoxBody {
9898
inner: Box::pin(inner),
@@ -102,7 +102,7 @@ impl BoxBody {
102102
/// Create a new `BoxBody` mapping item and error to the default types.
103103
pub fn map_from<B>(inner: B) -> Self
104104
where
105-
B: Body + Send + 'static,
105+
B: Body + Send + Sync + 'static,
106106
B::Data: Into<Bytes>,
107107
B::Error: Into<crate::Error>,
108108
{

tonic/src/client/grpc.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ impl<T> Grpc<T> {
6262
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
6363
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
6464
C: Codec<Encode = M1, Decode = M2>,
65-
M1: Send + 'static,
66-
M2: Send + 'static,
65+
M1: Send + Sync + 'static,
66+
M2: Send + Sync + 'static,
6767
{
6868
let request = request.map(|m| stream::once(future::ready(m)));
6969
self.client_streaming(request, path, codec).await
@@ -81,10 +81,10 @@ impl<T> Grpc<T> {
8181
T::ResponseBody: Body + HttpBody + Send + 'static,
8282
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
8383
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
84-
S: Stream<Item = M1> + Send + 'static,
84+
S: Stream<Item = M1> + Send + Sync + 'static,
8585
C: Codec<Encode = M1, Decode = M2>,
86-
M1: Send + 'static,
87-
M2: Send + 'static,
86+
M1: Send + Sync + 'static,
87+
M2: Send + Sync + 'static,
8888
{
8989
let (mut parts, body) = self.streaming(request, path, codec).await?.into_parts();
9090

@@ -115,8 +115,8 @@ impl<T> Grpc<T> {
115115
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
116116
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
117117
C: Codec<Encode = M1, Decode = M2>,
118-
M1: Send + 'static,
119-
M2: Send + 'static,
118+
M1: Send + Sync + 'static,
119+
M2: Send + Sync + 'static,
120120
{
121121
let request = request.map(|m| stream::once(future::ready(m)));
122122
self.streaming(request, path, codec).await
@@ -134,10 +134,10 @@ impl<T> Grpc<T> {
134134
T::ResponseBody: Body + HttpBody + Send + 'static,
135135
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
136136
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
137-
S: Stream<Item = M1> + Send + 'static,
137+
S: Stream<Item = M1> + Send + Sync + 'static,
138138
C: Codec<Encode = M1, Decode = M2>,
139-
M1: Send + 'static,
140-
M2: Send + 'static,
139+
M1: Send + Sync + 'static,
140+
M2: Send + Sync + 'static,
141141
{
142142
let mut parts = Parts::default();
143143
parts.path_and_query = Some(path);

tonic/src/codec/decode.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const BUFFER_SIZE: usize = 8 * 1024;
1919
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
2020
/// to fetch the message stream and trailing metadata
2121
pub struct Streaming<T> {
22-
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
22+
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + Sync + 'static>,
2323
body: BoxBody,
2424
state: State,
2525
direction: Direction,
@@ -45,40 +45,40 @@ enum Direction {
4545
impl<T> Streaming<T> {
4646
pub(crate) fn new_response<B, D>(decoder: D, body: B, status_code: StatusCode) -> Self
4747
where
48-
B: Body + Send + 'static,
48+
B: Body + Send + Sync + 'static,
4949
B::Data: Into<Bytes>,
5050
B::Error: Into<crate::Error>,
51-
D: Decoder<Item = T, Error = Status> + Send + 'static,
51+
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
5252
{
5353
Self::new(decoder, body, Direction::Response(status_code))
5454
}
5555

5656
pub(crate) fn new_empty<B, D>(decoder: D, body: B) -> Self
5757
where
58-
B: Body + Send + 'static,
58+
B: Body + Send + Sync + 'static,
5959
B::Data: Into<Bytes>,
6060
B::Error: Into<crate::Error>,
61-
D: Decoder<Item = T, Error = Status> + Send + 'static,
61+
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
6262
{
6363
Self::new(decoder, body, Direction::EmptyResponse)
6464
}
6565

6666
pub(crate) fn new_request<B, D>(decoder: D, body: B) -> Self
6767
where
68-
B: Body + Send + 'static,
68+
B: Body + Send + Sync + 'static,
6969
B::Data: Into<Bytes>,
7070
B::Error: Into<crate::Error>,
71-
D: Decoder<Item = T, Error = Status> + Send + 'static,
71+
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
7272
{
7373
Self::new(decoder, body, Direction::Request)
7474
}
7575

7676
fn new<B, D>(decoder: D, body: B, direction: Direction) -> Self
7777
where
78-
B: Body + Send + 'static,
78+
B: Body + Send + Sync + 'static,
7979
B::Data: Into<Bytes>,
8080
B::Error: Into<crate::Error>,
81-
D: Decoder<Item = T, Error = Status> + Send + 'static,
81+
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
8282
{
8383
Self {
8484
decoder: Box::new(decoder),
@@ -291,3 +291,6 @@ impl<T> fmt::Debug for Streaming<T> {
291291
f.debug_struct("Streaming").finish()
292292
}
293293
}
294+
295+
#[cfg(test)]
296+
static_assertions::assert_impl_all!(Streaming<()>: Send, Sync);

tonic/src/codec/encode.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ pub(crate) fn encode_server<T, U>(
1616
source: U,
1717
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
1818
where
19-
T: Encoder<Error = Status>,
20-
U: Stream<Item = Result<T::Item, Status>>,
19+
T: Encoder<Error = Status> + Send + Sync + 'static,
20+
T::Item: Send + Sync,
21+
U: Stream<Item = Result<T::Item, Status>> + Send + Sync + 'static,
2122
{
2223
let stream = encode(encoder, source).into_stream();
2324
EncodeBody::new_server(stream)
@@ -28,8 +29,9 @@ pub(crate) fn encode_client<T, U>(
2829
source: U,
2930
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
3031
where
31-
T: Encoder<Error = Status>,
32-
U: Stream<Item = T::Item>,
32+
T: Encoder<Error = Status> + Send + Sync + 'static,
33+
T::Item: Send + Sync,
34+
U: Stream<Item = T::Item> + Send + Sync + 'static,
3335
{
3436
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
3537
EncodeBody::new_client(stream)
@@ -88,7 +90,7 @@ pub(crate) struct EncodeBody<S> {
8890

8991
impl<S> EncodeBody<S>
9092
where
91-
S: Stream<Item = Result<crate::body::BytesBuf, Status>>,
93+
S: Stream<Item = Result<crate::body::BytesBuf, Status>> + Send + Sync + 'static,
9294
{
9395
pub(crate) fn new_client(inner: S) -> Self {
9496
Self {

tonic/src/codec/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub trait Codec: Default {
2828
type Decode: Send + 'static;
2929

3030
/// The encoder that can encode a message.
31-
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + 'static;
31+
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + Sync + 'static;
3232
/// The encoder that can decode a message.
33-
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + 'static;
33+
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + Sync + 'static;
3434

3535
/// Fetch the encoder.
3636
fn encoder(&mut self) -> Self::Encoder;

tonic/src/request.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub trait IntoRequest<T>: sealed::Sealed {
7777
/// ```
7878
pub trait IntoStreamingRequest: sealed::Sealed {
7979
/// The RPC request stream type
80-
type Stream: Stream<Item = Self::Message> + Send + 'static;
80+
type Stream: Stream<Item = Self::Message> + Send + Sync + 'static;
8181

8282
/// The RPC request type
8383
type Message;
@@ -182,7 +182,7 @@ impl<T> IntoRequest<T> for Request<T> {
182182

183183
impl<T> IntoStreamingRequest for T
184184
where
185-
T: Stream + Send + 'static,
185+
T: Stream + Send + Sync + 'static,
186186
{
187187
type Stream = T;
188188
type Message = T::Item;
@@ -194,7 +194,7 @@ where
194194

195195
impl<T> IntoStreamingRequest for Request<T>
196196
where
197-
T: Stream + Send + 'static,
197+
T: Stream + Send + Sync + 'static,
198198
{
199199
type Stream = T;
200200
type Message = T::Item;

0 commit comments

Comments
 (0)