From 52f2ca720f573a83b06122570eec5e4c41f72eb3 Mon Sep 17 00:00:00 2001 From: Aurel Canciu Date: Fri, 12 Dec 2025 21:05:52 +0100 Subject: [PATCH] feat: Allow configuring TCP listen backlog Signed-off-by: Aurel Canciu --- linkerd/app/core/src/config.rs | 9 +- linkerd/app/inbound/src/test_util.rs | 3 +- linkerd/app/integration/src/proxy.rs | 6 +- linkerd/app/outbound/src/test_util.rs | 3 +- linkerd/app/src/env.rs | 14 ++- linkerd/app/src/env/types.rs | 18 ++++ linkerd/meshtls/tests/util.rs | 7 +- linkerd/proxy/transport/src/lib.rs | 10 ++ linkerd/proxy/transport/src/listen.rs | 98 +++++++++++++++++-- .../proxy/transport/src/listen/dual_bind.rs | 10 +- 10 files changed, 162 insertions(+), 16 deletions(-) diff --git a/linkerd/app/core/src/config.rs b/linkerd/app/core/src/config.rs index d27467ba8f..e9cd14ac82 100644 --- a/linkerd/app/core/src/config.rs +++ b/linkerd/app/core/src/config.rs @@ -2,7 +2,7 @@ pub use crate::exp_backoff::ExponentialBackoff; use crate::{ proxy::http::{h1, h2}, svc::{queue, ExtractParam, Param}, - transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout}, + transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout, Backlog}, }; use std::time::Duration; @@ -11,6 +11,7 @@ pub struct ServerConfig { pub addr: DualListenAddr, pub keepalive: Keepalive, pub user_timeout: UserTimeout, + pub backlog: Backlog, pub http2: h2::ServerParams, } @@ -84,3 +85,9 @@ impl Param for ServerConfig { self.user_timeout } } + +impl Param for ServerConfig { + fn param(&self) -> Backlog { + self.backlog + } +} diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index 1fb420351c..da514df7c0 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -8,7 +8,7 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{DualListenAddr, Keepalive, UserTimeout}, + transport::{DualListenAddr, Keepalive, UserTimeout, Backlog}, ProxyRuntime, }; pub use linkerd_app_test as support; @@ -59,6 +59,7 @@ pub fn default_config() -> Config { addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None), keepalive: Keepalive(None), user_timeout: UserTimeout(None), + backlog: Backlog::default(), http2: h2::ServerParams::default(), }, connect: config::ConnectConfig { diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index f89bd76621..f91ad9c6dd 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -2,7 +2,7 @@ use super::*; use linkerd_app_core::{ svc::Param, transport::{ - listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout, + listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout, Backlog, }, Result, }; @@ -70,7 +70,7 @@ struct MockDualOrigDst { impl listen::Bind for MockOrigDst where - T: Param + Param + Param, + T: Param + Param + Param + Param, { type Addrs = orig_dst::Addrs; type BoundAddrs = Local; @@ -120,7 +120,7 @@ impl fmt::Debug for MockOrigDst { impl listen::Bind for MockDualOrigDst where - T: Param + Param + Param, + T: Param + Param + Param + Param, { type Addrs = orig_dst::Addrs; type BoundAddrs = (Local, Option>); diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index b77a3909f1..72dd498fdb 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,7 +7,7 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{DualListenAddr, Keepalive, UserTimeout}, + transport::{DualListenAddr, Keepalive, UserTimeout, Backlog}, IpMatch, IpNet, ProxyRuntime, }; pub use linkerd_app_test as support; @@ -27,6 +27,7 @@ pub(crate) fn default_config() -> Config { addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None), keepalive: Keepalive(None), user_timeout: UserTimeout(None), + backlog: Backlog::default(), http2: h2::ServerParams::default(), }, connect: config::ConnectConfig { diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 26d6781d69..f5c480133b 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -5,7 +5,7 @@ use linkerd_app_core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout}, + transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout, Backlog}, AddrMatch, Conditional, IpNet, }; use std::{collections::HashSet, net::SocketAddr, path::PathBuf, time::Duration}; @@ -133,6 +133,9 @@ const ENV_OUTBOUND_ACCEPT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_U const ENV_INBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_CONNECT_USER_TIMEOUT"; const ENV_OUTBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_USER_TIMEOUT"; +const ENV_INBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_INBOUND_TCP_LISTEN_BACKLOG"; +const ENV_OUTBOUND_TCP_LISTEN_BACKLOG: &str = "LINKERD2_PROXY_OUTBOUND_TCP_LISTEN_BACKLOG"; + const ENV_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_MAX_IDLE_CONNS_PER_ENDPOINT"; const ENV_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT"; @@ -386,6 +389,9 @@ pub fn parse_config(strings: &S) -> Result let inbound_accept_keepalive = parse(strings, ENV_INBOUND_ACCEPT_KEEPALIVE, parse_duration); let outbound_accept_keepalive = parse(strings, ENV_OUTBOUND_ACCEPT_KEEPALIVE, parse_duration); + let inbound_tcp_listen_backlog = parse(strings, ENV_INBOUND_TCP_LISTEN_BACKLOG, parse_number::); + let outbound_tcp_listen_backlog = parse(strings, ENV_OUTBOUND_TCP_LISTEN_BACKLOG, parse_number::); + let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); @@ -499,10 +505,12 @@ pub fn parse_config(strings: &S) -> Result let keepalive = Keepalive(outbound_accept_keepalive?); let user_timeout = UserTimeout(outbound_accept_user_timeout?); + let backlog = Backlog(outbound_tcp_listen_backlog?.unwrap_or(128)); let server = ServerConfig { addr, keepalive, user_timeout, + backlog, http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = @@ -591,10 +599,12 @@ pub fn parse_config(strings: &S) -> Result ); let keepalive = Keepalive(inbound_accept_keepalive?); let user_timeout = UserTimeout(inbound_accept_user_timeout?); + let backlog = Backlog(inbound_tcp_listen_backlog?.unwrap_or(128)); let server = ServerConfig { addr, keepalive, user_timeout, + backlog, http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = @@ -814,6 +824,7 @@ pub fn parse_config(strings: &S) -> Result addr: DualListenAddr(admin_listener_addr, None), keepalive: inbound.proxy.server.keepalive, user_timeout: inbound.proxy.server.user_timeout, + backlog: inbound.proxy.server.backlog, http2: inbound.proxy.server.http2.clone(), }, @@ -868,6 +879,7 @@ pub fn parse_config(strings: &S) -> Result addr: DualListenAddr(addr, None), keepalive: inbound.proxy.server.keepalive, user_timeout: inbound.proxy.server.user_timeout, + backlog: inbound.proxy.server.backlog, http2: inbound.proxy.server.http2.clone(), }, }) diff --git a/linkerd/app/src/env/types.rs b/linkerd/app/src/env/types.rs index 0255d5c285..741d1738cb 100644 --- a/linkerd/app/src/env/types.rs +++ b/linkerd/app/src/env/types.rs @@ -357,4 +357,22 @@ mod tests { assert!(dbg!(parse_port_range_set("69420")).is_err()); assert!(dbg!(parse_port_range_set("1-69420")).is_err()); } + + #[test] + fn parse_backlog_valid() { + assert_eq!(parse_number::("128"), Ok(128)); + assert_eq!(parse_number::("512"), Ok(512)); + assert_eq!(parse_number::("1024"), Ok(1024)); + assert_eq!(parse_number::("65535"), Ok(65535)); + assert_eq!(parse_number::("0"), Ok(0)); + } + + #[test] + fn parse_backlog_invalid() { + assert!(parse_number::("").is_err()); + assert!(parse_number::("abc").is_err()); + assert!(parse_number::("-1").is_err()); + assert!(parse_number::("12.5").is_err()); + assert!(parse_number::("999999999999999999999999").is_err()); + } } diff --git a/linkerd/meshtls/tests/util.rs b/linkerd/meshtls/tests/util.rs index b5f033b4a8..f2ff3b46bc 100644 --- a/linkerd/meshtls/tests/util.rs +++ b/linkerd/meshtls/tests/util.rs @@ -11,7 +11,7 @@ use linkerd_meshtls::{self as meshtls, watch}; use linkerd_proxy_transport::{ addrs::*, listen::{Addrs, Bind, BindTcp}, - ConnectTcp, Keepalive, UserTimeout, + ConnectTcp, Keepalive, UserTimeout, Backlog, }; use linkerd_stack::{ layer::Layer, service_fn, ExtractParam, InsertParam, NewService, Param, ServiceExt, @@ -408,6 +408,11 @@ impl Param for Server { UserTimeout(None) } } +impl Param for Server { + fn param(&self) -> Backlog { + Backlog::default() + } +} // === impl ServerParams === diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index 6a8078bae2..f1eeef0469 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -45,6 +45,16 @@ impl From for Option { } } +#[derive(Copy, Clone, Debug)] +pub struct Backlog(pub u32); + +impl Default for Backlog { + fn default() -> Self { + // Use Rust's default backlog value (128 on most systems) + Backlog(128) + } +} + // Misc. fn set_nodelay_or_warn(socket: &TcpStream) { diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index fe2031a38b..0a895d0650 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,6 +1,6 @@ mod dual_bind; -use crate::{addrs::*, Keepalive, UserTimeout}; +use crate::{addrs::*, Keepalive, UserTimeout, Backlog}; use dual_bind::DualBind; use futures::prelude::*; use linkerd_error::Result; @@ -71,7 +71,7 @@ impl BindTcp { impl Bind for BindTcp where - T: Param + Param + Param, + T: Param + Param + Param + Param, { type Addrs = Addrs; type BoundAddrs = Local; @@ -81,10 +81,25 @@ where fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> { let listen = { let ListenAddr(addr) = params.param(); - let l = std::net::TcpListener::bind(addr)?; - // Ensure that O_NONBLOCK is set on the socket before using it with Tokio. - l.set_nonblocking(true)?; - tokio::net::TcpListener::from_std(l).expect("listener must be valid") + let Backlog(backlog) = params.param(); + + // Use TcpSocket to create and configure the listener socket before binding. + // This allows us to set socket options and configure the listen backlog. + // TcpSocket::new_v4/v6 automatically sets O_NONBLOCK, which is required + // for Tokio's async I/O operations. + let socket = if addr.is_ipv4() { + tokio::net::TcpSocket::new_v4()? + } else { + tokio::net::TcpSocket::new_v6()? + }; + + // Enable SO_REUSEADDR to match std::net::TcpListener::bind behavior. + // This allows the server to bind to an address in TIME_WAIT state, + // enabling faster restarts without "address already in use" errors. + socket.set_reuseaddr(true)?; + + socket.bind(addr)?; + socket.listen(backlog)? }; let server = Local(ServerAddr(listen.local_addr()?)); let Keepalive(keepalive) = params.param(); @@ -138,3 +153,74 @@ impl Param for Addrs { AddrPair(client, server) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Clone)] + struct TestParams { + addr: ListenAddr, + keepalive: Keepalive, + user_timeout: UserTimeout, + backlog: crate::Backlog, + } + + impl Param for TestParams { + fn param(&self) -> ListenAddr { + self.addr + } + } + + impl Param for TestParams { + fn param(&self) -> Keepalive { + self.keepalive + } + } + + impl Param for TestParams { + fn param(&self) -> UserTimeout { + self.user_timeout + } + } + + impl Param for TestParams { + fn param(&self) -> crate::Backlog { + self.backlog + } + } + + #[tokio::test] + #[cfg(target_os = "linux")] + async fn listener_is_nonblocking() { + use std::os::unix::io::AsRawFd; + + let params = TestParams { + addr: ListenAddr("127.0.0.1:0".parse().unwrap()), + keepalive: Keepalive(None), + user_timeout: UserTimeout(None), + backlog: crate::Backlog(1024), + }; + + let bind = BindTcp::default(); + let (bound_addr, _incoming) = bind.bind(¶ms).expect("failed to bind"); + + // Verify the socket is non-blocking by checking the O_NONBLOCK flag. + // Create a new listener to the bound address to check its flags. + let listener = tokio::net::TcpListener::bind(bound_addr.0 .0) + .await + .expect("failed to bind test listener"); + + let fd = listener.as_raw_fd(); + // Safety: We're just reading the file descriptor flags with fcntl(F_GETFL), + // which is safe as long as the fd is valid (guaranteed by the listener being alive). + #[allow(unsafe_code)] + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + assert_ne!(flags, -1, "fcntl failed to get flags"); + assert_ne!( + flags & libc::O_NONBLOCK, + 0, + "O_NONBLOCK flag is not set on the listener socket" + ); + } +} diff --git a/linkerd/proxy/transport/src/listen/dual_bind.rs b/linkerd/proxy/transport/src/listen/dual_bind.rs index f9dde45625..fd3a28fcc5 100644 --- a/linkerd/proxy/transport/src/listen/dual_bind.rs +++ b/linkerd/proxy/transport/src/listen/dual_bind.rs @@ -1,4 +1,4 @@ -use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout}; +use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout, Backlog}; use futures::Stream; use linkerd_error::Result; use linkerd_stack::Param; @@ -26,7 +26,7 @@ impl From for DualBind { impl Bind for DualBind where - T: Param + Param + Param + Clone, + T: Param + Param + Param + Param + Clone, B: Bind, Io = TcpStream> + Clone + 'static, { type Addrs = B::Addrs; @@ -68,6 +68,12 @@ impl> Param for Listen { } } +impl> Param for Listen { + fn param(&self) -> Backlog { + self.parent.param() + } +} + impl Param for Listen { fn param(&self) -> ListenAddr { ListenAddr(self.addr)