diff --git a/linkerd/proxy/http/Cargo.toml b/linkerd/proxy/http/Cargo.toml index 1cf58a2faf..f2cc7bd610 100644 --- a/linkerd/proxy/http/Cargo.toml +++ b/linkerd/proxy/http/Cargo.toml @@ -21,6 +21,7 @@ http = "0.2" http-body = "0.4" httparse = "1" hyper = { version = "0.14", features = [ + "backports", "client", "deprecated", "http1", diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index a8699068b0..27477aa223 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -1,6 +1,6 @@ use crate::TracingExecutor; use futures::prelude::*; -use hyper::{body::HttpBody, client::conn}; +use hyper::body::HttpBody; use linkerd_error::{Error, Result}; use linkerd_stack::{MakeConnection, Service}; use std::{ @@ -23,8 +23,7 @@ pub struct Connect { #[derive(Debug)] pub struct Connection { - #[allow(deprecated)] // linkerd/linkerd2#8733 - tx: hyper::client::conn::SendRequest, + tx: hyper::client::conn::http2::SendRequest, } // === impl Connect === @@ -87,21 +86,19 @@ where Box::pin( async move { let (io, _meta) = connect.err_into::().await?; - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut builder = conn::Builder::new(); - builder.executor(TracingExecutor).http2_only(true); + let mut builder = hyper::client::conn::http2::Builder::new(TracingExecutor); match flow_control { None => {} Some(FlowControl::Adaptive) => { - builder.http2_adaptive_window(true); + builder.adaptive_window(true); } Some(FlowControl::Fixed { initial_stream_window_size, initial_connection_window_size, }) => { builder - .http2_initial_stream_window_size(initial_stream_window_size) - .http2_initial_connection_window_size(initial_connection_window_size); + .initial_stream_window_size(initial_stream_window_size) + .initial_connection_window_size(initial_connection_window_size); } } @@ -113,17 +110,17 @@ where }) = keep_alive { builder - .http2_keep_alive_timeout(timeout) - .http2_keep_alive_interval(interval) - .http2_keep_alive_while_idle(while_idle); + .keep_alive_timeout(timeout) + .keep_alive_interval(interval) + .keep_alive_while_idle(while_idle); } - builder.http2_max_frame_size(max_frame_size); + builder.max_frame_size(max_frame_size); if let Some(max) = max_concurrent_reset_streams { - builder.http2_max_concurrent_reset_streams(max); + builder.max_concurrent_reset_streams(max); } if let Some(sz) = max_send_buf_size { - builder.http2_max_send_buf_size(sz); + builder.max_send_buf_size(sz); } let (tx, conn) = builder @@ -153,7 +150,7 @@ where { type Response = http::Response; type Error = hyper::Error; - type Future = conn::ResponseFuture; + type Future = Pin>>>; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -175,6 +172,6 @@ where *req.version_mut() = http::Version::HTTP_11; } - self.tx.send_request(req) + self.tx.send_request(req).boxed() } } diff --git a/linkerd/proxy/http/src/server/tests.rs b/linkerd/proxy/http/src/server/tests.rs index 1dbf207033..7615ad8a93 100644 --- a/linkerd/proxy/http/src/server/tests.rs +++ b/linkerd/proxy/http/src/server/tests.rs @@ -2,6 +2,7 @@ use std::vec; use super::*; use bytes::Bytes; +use futures::FutureExt; use http_body::Body; use linkerd_stack::CloneParam; use tokio::time; @@ -26,10 +27,9 @@ async fn h2_connection_window_exhaustion() { h2::ServerParams::default(), // An HTTP/2 client with constrained connection and stream windows to // force window exhaustion. - #[allow(deprecated)] // linkerd/linkerd2#8733 - hyper::client::conn::Builder::new() - .http2_initial_connection_window_size(CLIENT_CONN_WINDOW) - .http2_initial_stream_window_size(CLIENT_STREAM_WINDOW), + hyper::client::conn::http2::Builder::new(TracingExecutor) + .initial_connection_window_size(CLIENT_CONN_WINDOW) + .initial_stream_window_size(CLIENT_STREAM_WINDOW), ) .await; @@ -100,8 +100,8 @@ async fn h2_stream_window_exhaustion() { // A basic HTTP/2 server configuration with no overrides. h2::ServerParams::default(), // An HTTP/2 client with stream windows to force window exhaustion. - #[allow(deprecated)] // linkerd/linkerd2#8733 - hyper::client::conn::Builder::new().http2_initial_stream_window_size(CLIENT_STREAM_WINDOW), + hyper::client::conn::http2::Builder::new(TracingExecutor) + .initial_stream_window_size(CLIENT_STREAM_WINDOW), ) .await; @@ -144,8 +144,7 @@ async fn h2_stream_window_exhaustion() { const LOG_LEVEL: &str = "h2::proto=trace,hyper=trace,linkerd=trace,info"; struct TestServer { - #[allow(deprecated)] // linkerd/linkerd2#8733 - client: hyper::client::conn::SendRequest, + client: hyper::client::conn::http2::SendRequest, server: Handle, } @@ -184,8 +183,16 @@ async fn timeout(inner: F) -> Result impl TestServer { #[tracing::instrument(skip_all)] - #[allow(deprecated)] // linkerd/linkerd2#8733 - async fn connect(params: Params, client: &mut hyper::client::conn::Builder) -> Self { + async fn connect_h2( + h2: h2::ServerParams, + client: &mut hyper::client::conn::http2::Builder, + ) -> Self { + let params = Params { + drain: drain(), + version: Version::H2, + http2: h2, + }; + // Build the HTTP server with a mocked inner service so that we can handle // requests. let (mock, server) = mock::pair(); @@ -196,7 +203,6 @@ impl TestServer { // Build a real HTTP/2 client using the mocked socket. let (client, task) = client - .executor(crate::TracingExecutor) .handshake::<_, BoxBody>(cio) .await .expect("client connect"); @@ -205,21 +211,6 @@ impl TestServer { Self { client, server } } - #[allow(deprecated)] // linkerd/linkerd2#8733 - async fn connect_h2(h2: h2::ServerParams, client: &mut hyper::client::conn::Builder) -> Self { - Self::connect( - // A basic HTTP/2 server configuration with no overrides. - Params { - drain: drain(), - version: Version::H2, - http2: h2, - }, - // An HTTP/2 client with constrained connection and stream windows to accomodate - client.http2_only(true), - ) - .await - } - /// Issues a request through the client to the mocked server and processes the /// response. The mocked response body sender and the readable response body are /// returned. @@ -228,7 +219,8 @@ impl TestServer { self.server.allow(1); let mut call0 = self .client - .send_request(http::Request::new(BoxBody::default())); + .send_request(http::Request::new(BoxBody::default())) + .boxed(); let (_req, next) = tokio::select! { _ = (&mut call0) => unreachable!("client cannot receive a response"), next = self.server.next_request() => next.expect("server not dropped"),