Skip to content

Commit 6ec13ed

Browse files
committed
feat(s2n-quic-qns): add xdp client
1 parent 0a75a76 commit 6ec13ed

File tree

10 files changed

+240
-140
lines changed

10 files changed

+240
-140
lines changed

quic/s2n-quic-platform/src/io/tokio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::runtime::Handle;
1515

1616
mod builder;
1717
mod clock;
18-
mod task;
18+
pub(crate) mod task;
1919
#[cfg(test)]
2020
mod tests;
2121

quic/s2n-quic-platform/src/io/xdp.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use s2n_quic_core::{
77
};
88

99
pub use s2n_quic_core::{
10-
io::{rx, tx},
10+
io::rx,
1111
sync::{spsc, worker},
1212
xdp::path::Tuple as PathHandle,
1313
};
@@ -38,6 +38,54 @@ pub mod socket {
3838
}
3939
}
4040

41+
pub mod tx {
42+
pub use s2n_quic_core::io::tx::*;
43+
44+
pub fn channel(
45+
socket: ::std::net::UdpSocket,
46+
) -> (
47+
impl Tx<PathHandle = crate::message::default::Handle>,
48+
impl core::future::Future<Output = ::std::io::Result<()>>,
49+
) {
50+
// Initial packets don't need to be bigger than the minimum
51+
let max_mtu = s2n_quic_core::path::MaxMtu::MIN;
52+
53+
// It's unlikely the initial packets will utilize, so just disable it
54+
let gso = crate::features::Gso::default();
55+
gso.disable();
56+
57+
// compute the payload size for each message from the number of GSO segments we can
58+
// fill
59+
let payload_len = {
60+
let max_mtu: u16 = max_mtu.into();
61+
(max_mtu as u32 * gso.max_segments() as u32).min(u16::MAX as u32)
62+
};
63+
64+
// 512Kb
65+
let tx_buffer_size: u32 = 1 << 19;
66+
let entries = tx_buffer_size / payload_len;
67+
let entries = if entries.is_power_of_two() {
68+
entries
69+
} else {
70+
// round up to the nearest power of two, since the ring buffers require it
71+
entries.next_power_of_two()
72+
};
73+
74+
let mut producers = vec![];
75+
76+
let (producer, consumer) = crate::socket::ring::pair(entries, payload_len);
77+
producers.push(producer);
78+
79+
// spawn a task that actually flushes the ring buffer to the socket
80+
let task = crate::io::tokio::task::tx(socket, consumer, gso.clone());
81+
82+
// construct the TX side for the endpoint event loop
83+
let io = crate::socket::io::tx::Tx::new(producers, gso, max_mtu);
84+
85+
(io, task)
86+
}
87+
}
88+
4189
impl From<PathHandle> for crate::message::msg::Handle {
4290
#[inline]
4391
fn from(handle: PathHandle) -> Self {

quic/s2n-quic-qns/src/client/interop.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ use crate::{
99
Result,
1010
};
1111
use core::time::Duration;
12-
use s2n_quic::{
13-
client::Connect,
14-
provider::{event, io},
15-
Client,
16-
};
12+
use s2n_quic::{client::Connect, provider::event, Client};
1713
use std::{
1814
collections::{hash_map::Entry, HashMap},
1915
path::PathBuf,
@@ -40,15 +36,9 @@ pub struct Interop {
4036
#[structopt(long)]
4137
download_dir: Option<PathBuf>,
4238

43-
#[structopt(long)]
44-
disable_gso: bool,
45-
4639
#[structopt(long, parse(try_from_str = parse_duration))]
4740
keep_alive: Option<Duration>,
4841

49-
#[structopt(short, long, default_value = "::")]
50-
local_ip: std::net::IpAddr,
51-
5242
#[structopt(long, env = "TESTCASE", possible_values = &Testcase::supported(is_supported_testcase))]
5343
testcase: Option<Testcase>,
5444

@@ -57,6 +47,9 @@ pub struct Interop {
5747

5848
#[structopt(long, default_value)]
5949
tls: TlsProviders,
50+
51+
#[structopt(flatten)]
52+
io: crate::io::Client,
6053
}
6154

6255
impl Interop {
@@ -130,14 +123,7 @@ impl Interop {
130123
}
131124

132125
fn client(&self) -> Result<Client> {
133-
let mut io_builder =
134-
io::Default::builder().with_receive_address((self.local_ip, 0u16).into())?;
135-
136-
if self.disable_gso {
137-
io_builder = io_builder.with_gso_disabled()?;
138-
}
139-
140-
let io = io_builder.build()?;
126+
let io = self.io.build()?;
141127

142128
let client = Client::builder()
143129
.with_io(io)?

quic/s2n-quic-qns/src/client/perf.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use crate::{perf, tls, Result};
55
use futures::future::try_join_all;
6-
use s2n_quic::{client, provider::io, Client, Connection};
6+
use s2n_quic::{client, Client, Connection};
77
use std::path::PathBuf;
88
use structopt::StructOpt;
99

@@ -29,15 +29,6 @@ pub struct Perf {
2929
#[structopt(long)]
3030
connections: Option<usize>,
3131

32-
#[structopt(long)]
33-
disable_gso: bool,
34-
35-
#[structopt(long, default_value = "9000")]
36-
max_mtu: u16,
37-
38-
#[structopt(short, long, default_value = "::")]
39-
local_ip: std::net::IpAddr,
40-
4132
#[structopt(long, default_value)]
4233
send: u64,
4334

@@ -50,6 +41,9 @@ pub struct Perf {
5041
/// Logs statistics for the endpoint
5142
#[structopt(long)]
5243
stats: bool,
44+
45+
#[structopt(flatten)]
46+
io: crate::io::Client,
5347
}
5448

5549
impl Perf {
@@ -106,15 +100,7 @@ impl Perf {
106100
}
107101

108102
fn client(&self) -> Result<Client> {
109-
let mut io_builder = io::Default::builder()
110-
.with_receive_address((self.local_ip, 0u16).into())?
111-
.with_max_mtu(self.max_mtu)?;
112-
113-
if self.disable_gso {
114-
io_builder = io_builder.with_gso_disabled()?;
115-
}
116-
117-
let io = io_builder.build()?;
103+
let io = self.io.build()?;
118104

119105
let tls = s2n_quic::provider::tls::default::Client::builder()
120106
.with_certificate(tls::default::ca(self.ca.as_ref())?)?

quic/s2n-quic-qns/src/io.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::Result;
5+
use s2n_quic::provider::io;
6+
use structopt::StructOpt;
7+
8+
#[derive(Debug, StructOpt)]
9+
pub struct Server {
10+
#[structopt(short, long, default_value = "::")]
11+
pub ip: std::net::IpAddr,
12+
13+
#[structopt(short, long, default_value = "443")]
14+
pub port: u16,
15+
16+
#[structopt(long)]
17+
pub disable_gso: bool,
18+
19+
#[structopt(long, default_value = "9000")]
20+
pub max_mtu: u16,
21+
22+
#[cfg(feature = "xdp")]
23+
#[structopt(flatten)]
24+
xdp: crate::xdp::Xdp,
25+
}
26+
27+
impl Server {
28+
#[cfg(feature = "xdp")]
29+
pub fn build(&self) -> Result<impl io::Provider> {
30+
// GSO isn't currently supported for XDP so just read it to avoid a dead_code warning
31+
let _ = self.disable_gso;
32+
let _ = self.max_mtu;
33+
34+
let addr = (self.ip, self.port).into();
35+
36+
self.xdp.server(addr)
37+
}
38+
39+
#[cfg(not(feature = "xdp"))]
40+
pub fn build(&self) -> Result<impl io::Provider> {
41+
let mut io_builder = io::Default::builder()
42+
.with_receive_address((self.ip, self.port).into())?
43+
.with_max_mtu(self.max_mtu)?;
44+
45+
if self.disable_gso {
46+
io_builder = io_builder.with_gso_disabled()?;
47+
}
48+
49+
Ok(io_builder.build()?)
50+
}
51+
}
52+
53+
#[derive(Debug, StructOpt)]
54+
pub struct Client {
55+
#[structopt(long)]
56+
pub disable_gso: bool,
57+
58+
#[structopt(long, default_value = "9000")]
59+
pub max_mtu: u16,
60+
61+
#[structopt(short, long, default_value = "::")]
62+
pub local_ip: std::net::IpAddr,
63+
64+
#[cfg(feature = "xdp")]
65+
#[structopt(flatten)]
66+
xdp: crate::xdp::Xdp,
67+
}
68+
69+
impl Client {
70+
#[cfg(feature = "xdp")]
71+
pub fn build(&self) -> Result<impl io::Provider> {
72+
// GSO isn't currently supported for XDP so just read it to avoid a dead_code warning
73+
let _ = self.disable_gso;
74+
let _ = self.max_mtu;
75+
76+
let addr = (self.local_ip, 0u16).into();
77+
78+
self.xdp.client(addr)
79+
}
80+
81+
#[cfg(not(feature = "xdp"))]
82+
pub fn build(&self) -> Result<impl io::Provider> {
83+
let mut io_builder = io::Default::builder()
84+
.with_receive_address((self.local_ip, 0u16).into())?
85+
.with_max_mtu(self.max_mtu)?;
86+
87+
if self.disable_gso {
88+
io_builder = io_builder.with_gso_disabled()?;
89+
}
90+
91+
Ok(io_builder.build()?)
92+
}
93+
}

quic/s2n-quic-qns/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub type Result<T, E = Error> = core::result::Result<T, E>;
99
mod client;
1010
mod file;
1111
mod interop;
12+
mod io;
1213
mod perf;
1314
mod server;
1415
mod tls;

quic/s2n-quic-qns/src/server/interop.rs

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use s2n_quic::{
1414
provider::{
1515
endpoint_limits,
1616
event::{events, Subscriber},
17-
io,
1817
},
1918
Server,
2019
};
@@ -27,12 +26,6 @@ use tokio::spawn;
2726

2827
#[derive(Debug, StructOpt)]
2928
pub struct Interop {
30-
#[structopt(short, long, default_value = "::")]
31-
ip: std::net::IpAddr,
32-
33-
#[structopt(short, long, default_value = "443")]
34-
port: u16,
35-
3629
#[structopt(long)]
3730
certificate: Option<PathBuf>,
3831

@@ -45,18 +38,14 @@ pub struct Interop {
4538
#[structopt(long, default_value = ".")]
4639
www_dir: PathBuf,
4740

48-
#[structopt(long)]
49-
disable_gso: bool,
50-
5141
#[structopt(long, env = "TESTCASE", possible_values = &Testcase::supported(is_supported_testcase))]
5242
testcase: Option<Testcase>,
5343

5444
#[structopt(long, default_value)]
5545
tls: TlsProviders,
5646

57-
#[cfg(feature = "xdp")]
5847
#[structopt(flatten)]
59-
xdp: crate::xdp::Xdp,
48+
io: crate::io::Server,
6049
}
6150

6251
impl Interop {
@@ -89,28 +78,6 @@ impl Interop {
8978
Err(crate::CRASH_ERROR_MESSAGE.into())
9079
}
9180

92-
#[cfg(feature = "xdp")]
93-
fn io(&self) -> Result<impl io::Provider> {
94-
// GSO isn't currently supported for XDP so just read it to avoid a dead_code warning
95-
let _ = self.disable_gso;
96-
97-
let addr = (self.ip, self.port).into();
98-
99-
self.xdp.server(addr)
100-
}
101-
102-
#[cfg(not(feature = "xdp"))]
103-
fn io(&self) -> Result<impl io::Provider> {
104-
let mut io_builder =
105-
io::Default::builder().with_receive_address((self.ip, self.port).into())?;
106-
107-
if self.disable_gso {
108-
io_builder = io_builder.with_gso_disabled()?;
109-
}
110-
111-
Ok(io_builder.build()?)
112-
}
113-
11481
fn server(&self) -> Result<Server> {
11582
let mut max_handshakes = 100;
11683
if let Some(Testcase::Retry) = self.testcase {
@@ -121,7 +88,7 @@ impl Interop {
12188
.with_inflight_handshake_limit(max_handshakes)?
12289
.build()?;
12390

124-
let io = self.io()?;
91+
let io = self.io.build()?;
12592

12693
let server = Server::builder()
12794
.with_io(io)?
@@ -157,7 +124,7 @@ impl Interop {
157124
}
158125
};
159126

160-
eprintln!("Server listening on port {}", self.port);
127+
eprintln!("Server listening on port {}", self.io.port);
161128

162129
Ok(server)
163130
}

0 commit comments

Comments
 (0)