Skip to content

Commit 7eda823

Browse files
alceLucioFranco
authored andcommitted
fix(client): Use Stream instead of TrySteam for client calls (#61)
1 parent 8cddf8a commit 7eda823

File tree

5 files changed

+16
-21
lines changed

5 files changed

+16
-21
lines changed

tonic-build/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ fn generate_client_streaming(method: &Method, proto: &str, path: String) -> Toke
133133
quote! {
134134
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
135135
-> Result<tonic::Response<#response>, tonic::Status>
136-
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
136+
where S: Stream<Item = #request> + Send + 'static,
137137
{
138138
self.ready().await?;
139139
let codec = tonic::codec::ProstCodec::new();
@@ -151,7 +151,7 @@ fn generate_streaming(method: &Method, proto: &str, path: String) -> TokenStream
151151
quote! {
152152
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
153153
-> Result<tonic::Response<tonic::codec::Streaming<#response>>, tonic::Status>
154-
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
154+
where S: Stream<Item = #request> + Send + 'static,
155155
{
156156
self.ready().await?;
157157
let codec = tonic::codec::ProstCodec::new();

tonic-examples/src/routeguide/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2525

2626
println!("FEATURE = {:?}", response);
2727

28-
let outbound = async_stream::try_stream! {
28+
let outbound = async_stream::stream! {
2929
let mut interval = Interval::new_interval(Duration::from_secs(1));
3030

3131
while let Some(time) = interval.next().await {

tonic-interop/src/client.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,10 @@ pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAsser
7979
// }
8080

8181
pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
82-
let requests = REQUEST_LENGTHS
83-
.iter()
84-
.map(|len| StreamingInputCallRequest {
85-
payload: Some(crate::client_payload(*len as usize)),
86-
..Default::default()
87-
})
88-
.map(|v| Ok(v));
82+
let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
83+
payload: Some(crate::client_payload(*len as usize)),
84+
..Default::default()
85+
});
8986

9087
let stream = stream::iter(requests);
9188

@@ -154,9 +151,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
154151
let (mut tx, rx) = mpsc::unbounded_channel();
155152
tx.try_send(make_ping_pong_request(0)).unwrap();
156153

157-
let result = client
158-
.full_duplex_call(Request::new(rx.map(|s| Ok(s))))
159-
.await;
154+
let result = client.full_duplex_call(Request::new(rx)).await;
160155

161156
assertions.push(test_assert!(
162157
"call must be successful",
@@ -272,7 +267,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
272267
let result = client.unary_call(Request::new(simple_req)).await;
273268
validate_response(result, assertions);
274269

275-
let stream = stream::iter(vec![Ok(duplex_req)]);
270+
let stream = stream::iter(vec![duplex_req]);
276271
let result = match client.full_duplex_call(Request::new(stream)).await {
277272
Ok(response) => {
278273
let stream = response.into_inner();
@@ -359,7 +354,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
359354
req_unary.metadata_mut().insert(key1, value1.clone());
360355
req_unary.metadata_mut().insert_bin(key2, value2.clone());
361356

362-
let stream = stream::iter(vec![Ok(make_ping_pong_request(0))]);
357+
let stream = stream::iter(vec![make_ping_pong_request(0)]);
363358
let mut req_stream = Request::new(stream);
364359
req_stream.metadata_mut().insert(key1, value1.clone());
365360
req_stream.metadata_mut().insert_bin(key2, value2.clone());

tonic/src/client/grpc.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl<T> Grpc<T> {
6565
M1: Send + 'static,
6666
M2: Send + 'static,
6767
{
68-
let request = request.map(|m| stream::once(future::ok(m)));
68+
let request = request.map(|m| stream::once(future::ready(m)));
6969
self.client_streaming(request, path, codec).await
7070
}
7171

@@ -81,7 +81,7 @@ impl<T> Grpc<T> {
8181
T::ResponseBody: Body + HttpBody + Send + 'static,
8282
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
8383
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
84-
S: Stream<Item = Result<M1, Status>> + Send + 'static,
84+
S: Stream<Item = M1> + Send + 'static,
8585
C: Codec<Encode = M1, Decode = M2>,
8686
M1: Send + 'static,
8787
M2: Send + 'static,
@@ -118,7 +118,7 @@ impl<T> Grpc<T> {
118118
M1: Send + 'static,
119119
M2: Send + 'static,
120120
{
121-
let request = request.map(|m| stream::once(future::ok(m)));
121+
let request = request.map(|m| stream::once(future::ready(m)));
122122
self.streaming(request, path, codec).await
123123
}
124124

@@ -134,7 +134,7 @@ impl<T> Grpc<T> {
134134
T::ResponseBody: Body + HttpBody + Send + 'static,
135135
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
136136
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
137-
S: Stream<Item = Result<M1, Status>> + Send + 'static,
137+
S: Stream<Item = M1> + Send + 'static,
138138
C: Codec<Encode = M1, Decode = M2>,
139139
M1: Send + 'static,
140140
M2: Send + 'static,

tonic/src/codec/encode.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ pub(crate) fn encode_client<T, U>(
2929
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
3030
where
3131
T: Encoder<Error = Status>,
32-
U: Stream<Item = Result<T::Item, Status>>,
32+
U: Stream<Item = T::Item>,
3333
{
34-
let stream = encode(encoder, source).into_stream();
34+
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
3535
EncodeBody::new_client(stream)
3636
}
3737

0 commit comments

Comments
 (0)