diff --git a/linkerd/app/integration/src/client.rs b/linkerd/app/integration/src/client.rs index a7c108d43b..bdca51f462 100644 --- a/linkerd/app/integration/src/client.rs +++ b/linkerd/app/integration/src/client.rs @@ -131,8 +131,7 @@ impl Client { pub fn request( &self, builder: http::request::Builder, - ) -> impl Future, ClientError>> + Send + Sync + 'static - { + ) -> impl Future, ClientError>> + Send + 'static { self.send_req(builder.body(Bytes::new().into()).unwrap()) } @@ -156,8 +155,7 @@ impl Client { pub(crate) fn send_req( &self, mut req: Request, - ) -> impl Future, ClientError>> + Send + Sync + 'static - { + ) -> impl Future, ClientError>> + Send + 'static { if req.uri().scheme().is_none() { if self.tls.is_some() { *req.uri_mut() = format!("https://{}{}", self.authority, req.uri().path()) @@ -172,7 +170,7 @@ impl Client { tracing::debug!(headers = ?req.headers(), "request"); let (tx, rx) = oneshot::channel(); let _ = self.tx.send((req.map(Into::into), tx)); - async { rx.await.expect("request cancelled") }.in_current_span() + async move { rx.await.expect("request cancelled") }.in_current_span() } pub async fn wait_for_closed(self) { @@ -221,7 +219,7 @@ enum Run { Http2, } -pub type Running = Pin + Send + Sync + 'static>>; +pub type Running = Pin + Send + 'static>>; fn run( addr: SocketAddr, diff --git a/linkerd/app/integration/src/lib.rs b/linkerd/app/integration/src/lib.rs index 86e07bb7e0..1b19ff5856 100644 --- a/linkerd/app/integration/src/lib.rs +++ b/linkerd/app/integration/src/lib.rs @@ -256,7 +256,7 @@ impl fmt::Display for HumanDuration { pub async fn cancelable( drain: drain::Watch, - f: impl Future> + Send + 'static, + f: impl Future>, ) -> Result<(), E> { tokio::select! { res = f => res, diff --git a/linkerd/app/integration/src/server.rs b/linkerd/app/integration/src/server.rs index ef5e331fc0..44f9be5279 100644 --- a/linkerd/app/integration/src/server.rs +++ b/linkerd/app/integration/src/server.rs @@ -59,7 +59,7 @@ pub struct Listening { } type RspFuture = - Pin, Error>> + Send + Sync + 'static>>; + Pin, Error>> + Send + 'static>>; impl Listening { pub fn connections(&self) -> usize { @@ -128,7 +128,7 @@ impl Server { pub fn route_async(mut self, path: &str, cb: F) -> Self where F: Fn(Request) -> U + Send + Sync + 'static, - U: TryFuture> + Send + Sync + 'static, + U: TryFuture> + Send + 'static, U::Error: Into + Send + 'static, { let func = move |req| Box::pin(cb(req).map_err(Into::into)) as RspFuture; @@ -219,9 +219,17 @@ impl Server { tracing::trace!(?result, "serve done"); result }; - tokio::spawn( - cancelable(drain.clone(), f).instrument(span.clone().or_current()), - ); + // let fut = Box::pin(cancelable(drain.clone(), f).instrument(span.clone().or_current())) + let drain = drain.clone(); + tokio::spawn(async move { + tokio::select! { + res = f => res, + _ = drain.signaled() => { + tracing::debug!("canceled!"); + Ok(()) + } + } + }); } } .instrument( diff --git a/linkerd/app/integration/src/tests/profiles.rs b/linkerd/app/integration/src/tests/profiles.rs index c0e90e040f..067fc19b94 100644 --- a/linkerd/app/integration/src/tests/profiles.rs +++ b/linkerd/app/integration/src/tests/profiles.rs @@ -293,7 +293,8 @@ mod cross_version { .method("POST") .body(body) .unwrap(); - let res = tokio::spawn(async move { client.request_body(req).await }); + let fut = client.send_req(req); + let res = tokio::spawn(fut); tx.send_data(Bytes::from_static(b"hello")) .await .expect("the whole body should be read"); @@ -301,7 +302,7 @@ mod cross_version { .await .expect("the whole body should be read"); drop(tx); - let res = res.await.unwrap(); + let res = res.await.unwrap().unwrap(); assert_eq!(res.status(), 200); } @@ -392,7 +393,8 @@ mod cross_version { .method("POST") .body(body) .unwrap(); - let res = tokio::spawn(async move { client.request_body(req).await }); + let fut = client.send_req(req); + let res = tokio::spawn(fut); // send a 32k chunk tx.send_data(Bytes::from(&[1u8; 32 * 1024][..])) .await @@ -406,7 +408,7 @@ mod cross_version { .await .expect("the whole body should be read"); drop(tx); - let res = res.await.unwrap(); + let res = res.await.unwrap().unwrap(); assert_eq!(res.status(), 533); }