Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 132 additions & 148 deletions quic/s2n-quic-platform/src/io/turmoil.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{buffer::default as buffer, io::tokio::Clock, socket::std as socket};
use crate::{
io::tokio::Clock,
message::{simple::Message, Message as _},
socket::{
io::{rx, tx},
ring::{self, Consumer, Producer},
},
};
use core::future::Future;
use futures::future::poll_fn;
use s2n_quic_core::{
endpoint::Endpoint,
event::{self, EndpointPublisher as _},
inet::SocketAddress,
io::event_loop::select::{self, Select},
path::MaxMtu,
time::{
clock::{ClockWithTimer as _, Timer as _},
Clock as ClockTrait,
},
inet::{self, SocketAddress},
io::event_loop::{select::Select, EventLoop},
path::{self, MaxMtu},
};
use std::{convert::TryInto, io, io::ErrorKind};
use tokio::runtime::Handle;
Expand All @@ -22,22 +26,7 @@ mod builder;
mod tests;

pub use builder::Builder;

impl crate::socket::std::Socket for UdpSocket {
type Error = io::Error;

fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Option<SocketAddress>), Self::Error> {
let (len, addr) = self.try_recv_from(buf)?;
Ok((len, Some(addr.into())))
}

fn send_to(&self, buf: &[u8], addr: &SocketAddress) -> Result<usize, Self::Error> {
let addr: std::net::SocketAddr = (*addr).into();
self.try_send_to(buf, addr)
}
}

pub type PathHandle = socket::Handle;
pub type PathHandle = path::Tuple;

#[derive(Default)]
pub struct Io {
Expand All @@ -57,7 +46,7 @@ impl Io {
async fn setup<E: Endpoint<PathHandle = PathHandle>>(
self,
mut endpoint: E,
) -> io::Result<(Instance<E>, SocketAddress)> {
) -> io::Result<(impl Future<Output = ()>, SocketAddress)> {
let Builder {
handle: _,
socket,
Expand All @@ -80,26 +69,56 @@ impl Io {
));
};

let rx_buffer = buffer::Buffer::new_with_mtu(max_mtu.into());
let mut rx = socket::Queue::new(rx_buffer);
let local_addr = socket.local_addr()?;
let local_addr: inet::SocketAddress = local_addr.into();
let payload_len: usize = max_mtu.into();
let payload_len = payload_len as u32;

// This number is somewhat arbitrary but it's a decent number of messages without it consuming
// large in memory. Eventually, it might be a good idea to expose this value in the
// builder, but we'll wait until someone asks for it :).
let entries = 1024;

let (rx, rx_producer) = {
let mut consumers = vec![];

let (producer, consumer) = ring::pair(entries, payload_len);
consumers.push(consumer);

let rx = rx::Rx::new(consumers, max_mtu, local_addr.into());

(rx, producer)
};

let (tx, tx_consumer) = {
let mut producers = vec![];

let (producer, consumer) = ring::pair(entries, payload_len);
producers.push(producer);

let tx_buffer = buffer::Buffer::new_with_mtu(max_mtu.into());
let tx = socket::Queue::new(tx_buffer);
let gso = crate::features::Gso::default();

let local_addr: SocketAddress = socket.local_addr()?.into();
// GSO is not supported by turmoil so disable it
gso.disable();

// tell the queue the local address so it can fill it in on each message
rx.set_local_address(local_addr.into());
let tx = tx::Tx::new(producers, gso, max_mtu);

let instance = Instance {
(tx, consumer)
};

// Spawn a task that does the actual socket calls and coordinates with the event loop
// through the ring buffers
tokio::spawn(run_io(socket, rx_producer, tx_consumer));

let event_loop = EventLoop {
clock,
socket,
rx,
tx,
endpoint,
};
}
.start();

Ok((instance, local_addr))
Ok((event_loop, local_addr))
}

pub fn start<E: Endpoint<PathHandle = PathHandle>>(
Expand All @@ -117,14 +136,7 @@ impl Io {
let task = handle.spawn(async move {
let (instance, _local_addr) = self.setup(endpoint).await.unwrap();

if let Err(err) = instance.event_loop().await {
let debug = format!("A fatal IO error occurred ({:?}): {err}", err.kind());
if cfg!(test) {
panic!("{debug}");
} else {
eprintln!("{debug}");
}
}
instance.await;
});

drop(guard);
Expand All @@ -136,117 +148,89 @@ impl Io {
}
}

struct Instance<E> {
clock: Clock,
socket: turmoil::net::UdpSocket,
rx: socket::Queue<buffer::Buffer>,
tx: socket::Queue<buffer::Buffer>,
endpoint: E,
}

impl<E: Endpoint<PathHandle = PathHandle>> Instance<E> {
async fn event_loop(self) -> io::Result<()> {
let Self {
clock,
socket,
mut rx,
mut tx,
mut endpoint,
} = self;

let mut timer = clock.timer();

loop {
// Poll for readability if we have free slots available
let rx_interest = rx.free_len() > 0;
let rx_task = async {
if rx_interest {
socket.readable().await
} else {
futures::future::pending().await
}
};

// Poll for writablity if we have occupied slots available
let tx_interest = tx.occupied_len() > 0;
let tx_task = async {
if tx_interest {
socket.writable().await
} else {
futures::future::pending().await
}
};

let wakeups = endpoint.wakeups(&clock);
// pin the wakeups future so we don't have to move it into the Select future.
tokio::pin!(wakeups);

let timer_ready = timer.ready();

let select::Outcome {
rx_result,
tx_result,
timeout_expired,
application_wakeup,
} = if let Ok(res) = Select::new(rx_task, tx_task, &mut wakeups, timer_ready).await {
res
/// Turmoil doesn't allow to split sockets for Tx and Rx so we need to spawn a single task to
/// handle both jobs
async fn run_io(
socket: UdpSocket,
mut producer: Producer<Message>,
mut consumer: Consumer<Message>,
) -> io::Result<()> {
let mut poll_producer = false;

loop {
let socket_ready = socket.readable();
let consumer_ready = poll_fn(|cx| consumer.poll_acquire(u32::MAX, cx));
let producer_ready = async {
// Only poll the producer if we need more capacity - otherwise we would constantly wake
// up
if poll_producer {
poll_fn(|cx| producer.poll_acquire(u32::MAX, cx)).await
} else {
// The endpoint has shut down
return Ok(());
};

let wakeup_timestamp = clock.get_time();
let subscriber = endpoint.subscriber();
let mut publisher = event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: E::ENDPOINT_TYPE,
timestamp: wakeup_timestamp,
},
None,
subscriber,
);

publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup {
timeout_expired,
rx_ready: rx_result.is_some(),
tx_ready: tx_result.is_some(),
application_wakeup,
});

if tx_result.is_some() {
tx.tx(&socket, &mut publisher)?;
core::future::pending().await
}

if rx_result.is_some() {
rx.rx(&socket, &mut publisher)?;
endpoint.receive(&mut rx.rx_queue(), &clock);
};
// The socket task doesn't have any application wakeups to handle so just make it pending
let application_wakeup = core::future::pending();

// We replace the timer future with the `socket_ready` instead, since we don't have a
// timer here. Other than the application wakeup, Select doesn't really treat any of
// the futures special.
let is_readable = Select::new(
consumer_ready,
producer_ready,
application_wakeup,
socket_ready,
)
.await
.unwrap()
.timeout_expired;

if is_readable {
let mut count = 0;
for entry in producer.data() {
// Since UDP sockets are stateless, the only errors we should back is a WouldBlock.
// If we get any errors, we'll try again later.
if let Ok((len, addr)) = socket.try_recv_from(entry.payload_mut()) {
count += 1;
// update the packet information
entry.set_remote_address(&(addr.into()));
unsafe {
entry.set_payload_len(len);
}
} else {
break;
}
}

endpoint.transmit(&mut tx.tx_queue(), &clock);
// release the received messages to the consumer
producer.release(count);

let timeout = endpoint.timeout();
// only poll the producer if we need entries
poll_producer = producer.data().is_empty();
}

if let Some(timeout) = timeout {
timer.update(timeout);
{
let mut count = 0;
for entry in consumer.data() {
let addr = *entry.remote_address();
let addr: std::net::SocketAddr = addr.into();
let payload = entry.payload_mut();
// Since UDP sockets are stateless, the only errors we should back is a WouldBlock.
// If we get any errors, we'll try again later.
if socket.try_send_to(payload, addr).is_ok() {
count += 1;
} else {
break;
}
}

let timestamp = clock.get_time();
let subscriber = endpoint.subscriber();
let mut publisher = event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: E::ENDPOINT_TYPE,
timestamp,
},
None,
subscriber,
);

// notify the application that we're going to sleep
let timeout = timeout.map(|t| t.saturating_duration_since(timestamp));
publisher.on_platform_event_loop_sleep(event::builder::PlatformEventLoopSleep {
timeout,
processing_duration: timestamp.saturating_duration_since(wakeup_timestamp),
});
// release capacity back to the producer
consumer.release(count);
}

// check to see if the rings are open, otherwise we need to shut down the task
if !(producer.is_open() && consumer.is_open()) {
return Ok(());
}
}
}