Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions linkerd/proxy/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ http = "0.2"
http-body = "0.4"
httparse = "1"
hyper = { version = "0.14", features = [
"backports",
"client",
"deprecated",
"http1",
Expand Down
31 changes: 14 additions & 17 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::TracingExecutor;
use futures::prelude::*;
use hyper::{body::HttpBody, client::conn};
use hyper::body::HttpBody;
use linkerd_error::{Error, Result};
use linkerd_stack::{MakeConnection, Service};
use std::{
Expand All @@ -23,8 +23,7 @@ pub struct Connect<C, B> {

#[derive(Debug)]
pub struct Connection<B> {
#[allow(deprecated)] // linkerd/linkerd2#8733
tx: hyper::client::conn::SendRequest<B>,
tx: hyper::client::conn::http2::SendRequest<B>,
}

// === impl Connect ===
Expand Down Expand Up @@ -87,21 +86,19 @@ where
Box::pin(
async move {
let (io, _meta) = connect.err_into::<Error>().await?;
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut builder = conn::Builder::new();
builder.executor(TracingExecutor).http2_only(true);
let mut builder = hyper::client::conn::http2::Builder::new(TracingExecutor);
match flow_control {
None => {}
Some(FlowControl::Adaptive) => {
builder.http2_adaptive_window(true);
builder.adaptive_window(true);
}
Some(FlowControl::Fixed {
initial_stream_window_size,
initial_connection_window_size,
}) => {
builder
.http2_initial_stream_window_size(initial_stream_window_size)
.http2_initial_connection_window_size(initial_connection_window_size);
.initial_stream_window_size(initial_stream_window_size)
.initial_connection_window_size(initial_connection_window_size);
}
}

Expand All @@ -113,17 +110,17 @@ where
}) = keep_alive
{
builder
.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval)
.http2_keep_alive_while_idle(while_idle);
.keep_alive_timeout(timeout)
.keep_alive_interval(interval)
.keep_alive_while_idle(while_idle);
}

builder.http2_max_frame_size(max_frame_size);
builder.max_frame_size(max_frame_size);
if let Some(max) = max_concurrent_reset_streams {
builder.http2_max_concurrent_reset_streams(max);
builder.max_concurrent_reset_streams(max);
}
if let Some(sz) = max_send_buf_size {
builder.http2_max_send_buf_size(sz);
builder.max_send_buf_size(sz);
}

let (tx, conn) = builder
Expand Down Expand Up @@ -153,7 +150,7 @@ where
{
type Response = http::Response<hyper::Body>;
type Error = hyper::Error;
type Future = conn::ResponseFuture;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -175,6 +172,6 @@ where
*req.version_mut() = http::Version::HTTP_11;
}

self.tx.send_request(req)
self.tx.send_request(req).boxed()
}
}
46 changes: 19 additions & 27 deletions linkerd/proxy/http/src/server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::vec;

use super::*;
use bytes::Bytes;
use futures::FutureExt;
use http_body::Body;
use linkerd_stack::CloneParam;
use tokio::time;
Expand All @@ -26,10 +27,9 @@ async fn h2_connection_window_exhaustion() {
h2::ServerParams::default(),
// An HTTP/2 client with constrained connection and stream windows to
// force window exhaustion.
#[allow(deprecated)] // linkerd/linkerd2#8733
hyper::client::conn::Builder::new()
.http2_initial_connection_window_size(CLIENT_CONN_WINDOW)
.http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
hyper::client::conn::http2::Builder::new(TracingExecutor)
.initial_connection_window_size(CLIENT_CONN_WINDOW)
.initial_stream_window_size(CLIENT_STREAM_WINDOW),
)
.await;

Expand Down Expand Up @@ -100,8 +100,8 @@ async fn h2_stream_window_exhaustion() {
// A basic HTTP/2 server configuration with no overrides.
h2::ServerParams::default(),
// An HTTP/2 client with stream windows to force window exhaustion.
#[allow(deprecated)] // linkerd/linkerd2#8733
hyper::client::conn::Builder::new().http2_initial_stream_window_size(CLIENT_STREAM_WINDOW),
hyper::client::conn::http2::Builder::new(TracingExecutor)
.initial_stream_window_size(CLIENT_STREAM_WINDOW),
)
.await;

Expand Down Expand Up @@ -144,8 +144,7 @@ async fn h2_stream_window_exhaustion() {
const LOG_LEVEL: &str = "h2::proto=trace,hyper=trace,linkerd=trace,info";

struct TestServer {
#[allow(deprecated)] // linkerd/linkerd2#8733
client: hyper::client::conn::SendRequest<BoxBody>,
client: hyper::client::conn::http2::SendRequest<BoxBody>,
server: Handle,
}

Expand Down Expand Up @@ -184,8 +183,16 @@ async fn timeout<F: Future>(inner: F) -> Result<F::Output, time::error::Elapsed>

impl TestServer {
#[tracing::instrument(skip_all)]
#[allow(deprecated)] // linkerd/linkerd2#8733
async fn connect(params: Params, client: &mut hyper::client::conn::Builder) -> Self {
async fn connect_h2(
h2: h2::ServerParams,
client: &mut hyper::client::conn::http2::Builder,
) -> Self {
let params = Params {
drain: drain(),
version: Version::H2,
http2: h2,
};

// Build the HTTP server with a mocked inner service so that we can handle
// requests.
let (mock, server) = mock::pair();
Expand All @@ -196,7 +203,6 @@ impl TestServer {

// Build a real HTTP/2 client using the mocked socket.
let (client, task) = client
.executor(crate::TracingExecutor)
.handshake::<_, BoxBody>(cio)
.await
.expect("client connect");
Expand All @@ -205,21 +211,6 @@ impl TestServer {
Self { client, server }
}

#[allow(deprecated)] // linkerd/linkerd2#8733
async fn connect_h2(h2: h2::ServerParams, client: &mut hyper::client::conn::Builder) -> Self {
Self::connect(
// A basic HTTP/2 server configuration with no overrides.
Params {
drain: drain(),
version: Version::H2,
http2: h2,
},
// An HTTP/2 client with constrained connection and stream windows to accomodate
client.http2_only(true),
)
.await
}

/// Issues a request through the client to the mocked server and processes the
/// response. The mocked response body sender and the readable response body are
/// returned.
Expand All @@ -228,7 +219,8 @@ impl TestServer {
self.server.allow(1);
let mut call0 = self
.client
.send_request(http::Request::new(BoxBody::default()));
.send_request(http::Request::new(BoxBody::default()))
.boxed();
let (_req, next) = tokio::select! {
_ = (&mut call0) => unreachable!("client cannot receive a response"),
next = self.server.next_request() => next.expect("server not dropped"),
Expand Down
Loading