Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{h1, h2},
svc::{queue, ExtractParam, Param},
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout},
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout, Backlog},
};
use std::time::Duration;

Expand All @@ -11,6 +11,7 @@ pub struct ServerConfig {
pub addr: DualListenAddr,
pub keepalive: Keepalive,
pub user_timeout: UserTimeout,
pub backlog: Backlog,
pub http2: h2::ServerParams,
}

Expand Down Expand Up @@ -84,3 +85,9 @@ impl Param<UserTimeout> for ServerConfig {
self.user_timeout
}
}

impl Param<Backlog> for ServerConfig {
fn param(&self) -> Backlog {
self.backlog
}
}
3 changes: 2 additions & 1 deletion linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{DualListenAddr, Keepalive, UserTimeout},
transport::{DualListenAddr, Keepalive, UserTimeout, Backlog},
ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand Down Expand Up @@ -59,6 +59,7 @@ pub fn default_config() -> Config {
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
user_timeout: UserTimeout(None),
backlog: Backlog::default(),
http2: h2::ServerParams::default(),
},
connect: config::ConnectConfig {
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;
use linkerd_app_core::{
svc::Param,
transport::{
listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout,
listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout, Backlog,
},
Result,
};
Expand Down Expand Up @@ -70,7 +70,7 @@ struct MockDualOrigDst {

impl<T> listen::Bind<T> for MockOrigDst
where
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr>,
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr> + Param<Backlog>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = Local<ServerAddr>;
Expand Down Expand Up @@ -120,7 +120,7 @@ impl fmt::Debug for MockOrigDst {

impl<T> listen::Bind<T> for MockDualOrigDst
where
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr>,
T: Param<Keepalive> + Param<UserTimeout> + Param<ListenAddr> + Param<Backlog>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = (Local<ServerAddr>, Option<Local<ServerAddr>>);
Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{DualListenAddr, Keepalive, UserTimeout},
transport::{DualListenAddr, Keepalive, UserTimeout, Backlog},
IpMatch, IpNet, ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand All @@ -27,6 +27,7 @@ pub(crate) fn default_config() -> Config {
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
user_timeout: UserTimeout(None),
backlog: Backlog::default(),
http2: h2::ServerParams::default(),
},
connect: config::ConnectConfig {
Expand Down
14 changes: 13 additions & 1 deletion linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use linkerd_app_core::{
control::{Config as ControlConfig, ControlAddr},
proxy::http::{h1, h2},
tls,
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout},
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout, Backlog},
AddrMatch, Conditional, IpNet,
};
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, time::Duration};
Expand Down Expand Up @@ -133,6 +133,9 @@ const ENV_OUTBOUND_ACCEPT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_U
const ENV_INBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_CONNECT_USER_TIMEOUT";
const ENV_OUTBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_USER_TIMEOUT";

const ENV_INBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_INBOUND_TCP_LISTEN_BACKLOG";
const ENV_OUTBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_OUTBOUND_TCP_LISTEN_BACKLOG";

const ENV_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_MAX_IDLE_CONNS_PER_ENDPOINT";
const ENV_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str =
"LINKERD2_PROXY_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT";
Expand Down Expand Up @@ -386,6 +389,9 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let inbound_accept_keepalive = parse(strings, ENV_INBOUND_ACCEPT_KEEPALIVE, parse_duration);
let outbound_accept_keepalive = parse(strings, ENV_OUTBOUND_ACCEPT_KEEPALIVE, parse_duration);

let inbound_tcp_listen_backlog = parse(strings, ENV_INBOUND_TCP_LISTEN_BACKLOG, parse_number::<u32>);
let outbound_tcp_listen_backlog = parse(strings, ENV_OUTBOUND_TCP_LISTEN_BACKLOG, parse_number::<u32>);

let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration);
let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration);

Expand Down Expand Up @@ -499,10 +505,12 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>

let keepalive = Keepalive(outbound_accept_keepalive?);
let user_timeout = UserTimeout(outbound_accept_user_timeout?);
let backlog = Backlog(outbound_tcp_listen_backlog?.unwrap_or(128));
let server = ServerConfig {
addr,
keepalive,
user_timeout,
backlog,
http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?,
};
let discovery_idle_timeout =
Expand Down Expand Up @@ -591,10 +599,12 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
);
let keepalive = Keepalive(inbound_accept_keepalive?);
let user_timeout = UserTimeout(inbound_accept_user_timeout?);
let backlog = Backlog(inbound_tcp_listen_backlog?.unwrap_or(128));
let server = ServerConfig {
addr,
keepalive,
user_timeout,
backlog,
http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?,
};
let discovery_idle_timeout =
Expand Down Expand Up @@ -814,6 +824,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
addr: DualListenAddr(admin_listener_addr, None),
keepalive: inbound.proxy.server.keepalive,
user_timeout: inbound.proxy.server.user_timeout,
backlog: inbound.proxy.server.backlog,
http2: inbound.proxy.server.http2.clone(),
},

Expand Down Expand Up @@ -868,6 +879,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
addr: DualListenAddr(addr, None),
keepalive: inbound.proxy.server.keepalive,
user_timeout: inbound.proxy.server.user_timeout,
backlog: inbound.proxy.server.backlog,
http2: inbound.proxy.server.http2.clone(),
},
})
Expand Down
18 changes: 18 additions & 0 deletions linkerd/app/src/env/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,22 @@ mod tests {
assert!(dbg!(parse_port_range_set("69420")).is_err());
assert!(dbg!(parse_port_range_set("1-69420")).is_err());
}

#[test]
fn parse_backlog_valid() {
assert_eq!(parse_number::<u32>("128"), Ok(128));
assert_eq!(parse_number::<u32>("512"), Ok(512));
assert_eq!(parse_number::<u32>("1024"), Ok(1024));
assert_eq!(parse_number::<u32>("65535"), Ok(65535));
assert_eq!(parse_number::<u32>("0"), Ok(0));
}

#[test]
fn parse_backlog_invalid() {
assert!(parse_number::<u32>("").is_err());
assert!(parse_number::<u32>("abc").is_err());
assert!(parse_number::<u32>("-1").is_err());
assert!(parse_number::<u32>("12.5").is_err());
assert!(parse_number::<u32>("999999999999999999999999").is_err());
}
}
7 changes: 6 additions & 1 deletion linkerd/meshtls/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use linkerd_meshtls::{self as meshtls, watch};
use linkerd_proxy_transport::{
addrs::*,
listen::{Addrs, Bind, BindTcp},
ConnectTcp, Keepalive, UserTimeout,
ConnectTcp, Keepalive, UserTimeout, Backlog,
};
use linkerd_stack::{
layer::Layer, service_fn, ExtractParam, InsertParam, NewService, Param, ServiceExt,
Expand Down Expand Up @@ -408,6 +408,11 @@ impl Param<UserTimeout> for Server {
UserTimeout(None)
}
}
impl Param<Backlog> for Server {
fn param(&self) -> Backlog {
Backlog::default()
}
}

// === impl ServerParams ===

Expand Down
10 changes: 10 additions & 0 deletions linkerd/proxy/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ impl From<UserTimeout> for Option<Duration> {
}
}

#[derive(Copy, Clone, Debug)]
pub struct Backlog(pub u32);

impl Default for Backlog {
fn default() -> Self {
// Use Rust's default backlog value (128 on most systems)
Backlog(128)
}
}

// Misc.

fn set_nodelay_or_warn(socket: &TcpStream) {
Expand Down
98 changes: 92 additions & 6 deletions linkerd/proxy/transport/src/listen.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod dual_bind;

use crate::{addrs::*, Keepalive, UserTimeout};
use crate::{addrs::*, Keepalive, UserTimeout, Backlog};
use dual_bind::DualBind;
use futures::prelude::*;
use linkerd_error::Result;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl BindTcp {

impl<T> Bind<T> for BindTcp
where
T: Param<ListenAddr> + Param<Keepalive> + Param<UserTimeout>,
T: Param<ListenAddr> + Param<Keepalive> + Param<UserTimeout> + Param<Backlog>,
{
type Addrs = Addrs;
type BoundAddrs = Local<ServerAddr>;
Expand All @@ -81,10 +81,25 @@ where
fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
let listen = {
let ListenAddr(addr) = params.param();
let l = std::net::TcpListener::bind(addr)?;
// Ensure that O_NONBLOCK is set on the socket before using it with Tokio.
l.set_nonblocking(true)?;
tokio::net::TcpListener::from_std(l).expect("listener must be valid")
let Backlog(backlog) = params.param();

// Use TcpSocket to create and configure the listener socket before binding.
// This allows us to set socket options and configure the listen backlog.
// TcpSocket::new_v4/v6 automatically sets O_NONBLOCK, which is required
// for Tokio's async I/O operations.
let socket = if addr.is_ipv4() {
tokio::net::TcpSocket::new_v4()?
} else {
tokio::net::TcpSocket::new_v6()?
};

// Enable SO_REUSEADDR to match std::net::TcpListener::bind behavior.
// This allows the server to bind to an address in TIME_WAIT state,
// enabling faster restarts without "address already in use" errors.
socket.set_reuseaddr(true)?;

socket.bind(addr)?;
socket.listen(backlog)?
};
let server = Local(ServerAddr(listen.local_addr()?));
let Keepalive(keepalive) = params.param();
Expand Down Expand Up @@ -138,3 +153,74 @@ impl Param<AddrPair> for Addrs {
AddrPair(client, server)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Clone)]
struct TestParams {
addr: ListenAddr,
keepalive: Keepalive,
user_timeout: UserTimeout,
backlog: crate::Backlog,
}

impl Param<ListenAddr> for TestParams {
fn param(&self) -> ListenAddr {
self.addr
}
}

impl Param<Keepalive> for TestParams {
fn param(&self) -> Keepalive {
self.keepalive
}
}

impl Param<UserTimeout> for TestParams {
fn param(&self) -> UserTimeout {
self.user_timeout
}
}

impl Param<crate::Backlog> for TestParams {
fn param(&self) -> crate::Backlog {
self.backlog
}
}

#[tokio::test]
#[cfg(target_os = "linux")]
async fn listener_is_nonblocking() {
use std::os::unix::io::AsRawFd;

let params = TestParams {
addr: ListenAddr("127.0.0.1:0".parse().unwrap()),
keepalive: Keepalive(None),
user_timeout: UserTimeout(None),
backlog: crate::Backlog(1024),
};

let bind = BindTcp::default();
let (bound_addr, _incoming) = bind.bind(&params).expect("failed to bind");

// Verify the socket is non-blocking by checking the O_NONBLOCK flag.
// Create a new listener to the bound address to check its flags.
let listener = tokio::net::TcpListener::bind(bound_addr.0 .0)
.await
.expect("failed to bind test listener");

let fd = listener.as_raw_fd();
// Safety: We're just reading the file descriptor flags with fcntl(F_GETFL),
// which is safe as long as the fd is valid (guaranteed by the listener being alive).
#[allow(unsafe_code)]
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
assert_ne!(flags, -1, "fcntl failed to get flags");
assert_ne!(
flags & libc::O_NONBLOCK,
0,
"O_NONBLOCK flag is not set on the listener socket"
);
}
}
10 changes: 8 additions & 2 deletions linkerd/proxy/transport/src/listen/dual_bind.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout};
use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout, Backlog};
use futures::Stream;
use linkerd_error::Result;
use linkerd_stack::Param;
Expand Down Expand Up @@ -26,7 +26,7 @@ impl<B> From<B> for DualBind<B> {

impl<T, B> Bind<T> for DualBind<B>
where
T: Param<DualListenAddr> + Param<Keepalive> + Param<UserTimeout> + Clone,
T: Param<DualListenAddr> + Param<Keepalive> + Param<UserTimeout> + Param<Backlog> + Clone,
B: Bind<Listen<T>, Io = TcpStream> + Clone + 'static,
{
type Addrs = B::Addrs;
Expand Down Expand Up @@ -68,6 +68,12 @@ impl<T: Param<UserTimeout>> Param<UserTimeout> for Listen<T> {
}
}

impl<T: Param<Backlog>> Param<Backlog> for Listen<T> {
fn param(&self) -> Backlog {
self.parent.param()
}
}

impl<T> Param<ListenAddr> for Listen<T> {
fn param(&self) -> ListenAddr {
ListenAddr(self.addr)
Expand Down