Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 72 additions & 139 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ pub(crate) mod platform;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests

use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
use crate::runtime::context;
use crate::util::slab::{Address, Slab};
use crate::util::bit;
use crate::util::slab::{self, Slab};

use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,

/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
events: Option<mio::Events>,

/// Primary slab handle containing the state for each resource registered
/// with this driver.
resources: Slab<ScheduledIo>,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,
Expand All @@ -37,11 +44,8 @@ pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,

/// Dispatch slabs for I/O and futures events
pub(super) io_dispatch: Slab<ScheduledIo>,

/// The number of sources in `io_dispatch`.
n_sources: AtomicUsize,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,

/// Used to wake up the reactor from a call to `turn`
wakeup: mio::SetReadiness,
Expand All @@ -53,7 +57,19 @@ pub(super) enum Direction {
Write,
}

const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);

const ADDRESS: bit::Pack = bit::Pack::least_significant(24);

// Packs the generation value in the `readiness` field.
//
// The generation prevents a race condition where a slab slot is reused for a
// new socket while the I/O driver is about to apply a readiness event. The
// generaton value is checked when setting new readiness. If the generation do
// not match, then the readiness event is discarded.
const GENERATION: bit::Pack = ADDRESS.then(7);

fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
Expand All @@ -69,6 +85,8 @@ impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
let slab = Slab::new();
let allocator = slab.allocator();

io.register(
&wakeup_pair.0,
Expand All @@ -78,12 +96,13 @@ impl Driver {
)?;

Ok(Driver {
events: mio::Events::with_capacity(1024),
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io,
io_dispatch: Slab::new(),
n_sources: AtomicUsize::new(0),
io_dispatch: allocator,
wakeup: wakeup_pair.1,
}),
})
Expand All @@ -102,16 +121,27 @@ impl Driver {
}

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
self.resources.compact();
}

let mut events = self.events.take().expect("i/o driver event store missing");

// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut self.events, max_wait) {
match self.inner.io.poll(&mut events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
}

// Process all the events that came in, dispatching appropriately

for event in self.events.iter() {
for event in events.iter() {
let token = event.token();

if token == TOKEN_WAKEUP {
Expand All @@ -124,22 +154,24 @@ impl Driver {
}
}

self.events = Some(events);

Ok(())
}

fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let address = Address::from_usize(token.0);
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.inner.io_dispatch.get(address) {
let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(address, |curr| curr | ready.as_usize())
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
// token no longer valid!
Expand All @@ -164,6 +196,18 @@ impl Driver {
}
}

impl Drop for Driver {
fn drop(&mut self) {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
})
}
}

impl Park for Driver {
type Unpark = Handle;
type Error = io::Error;
Expand Down Expand Up @@ -246,46 +290,32 @@ impl Inner {
&self,
source: &dyn Evented,
ready: mio::Ready,
) -> io::Result<Address> {
let address = self.io_dispatch.alloc().ok_or_else(|| {
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;

self.n_sources.fetch_add(1, SeqCst);
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

self.io.register(
source,
mio::Token(address.to_usize()),
ready,
mio::PollOpt::edge(),
)?;
self.io
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;

Ok(address)
Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

pub(super) fn drop_source(&self, address: Address) {
self.io_dispatch.remove(address);
self.n_sources.fetch_sub(1, SeqCst);
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
let sched = self
.io_dispatch
.get(token)
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));

pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &sched.reader,
Direction::Write => &sched.writer,
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
Expand All @@ -303,100 +333,3 @@ impl Direction {
}
}
}

#[cfg(all(test, loom))]
mod tests {
use super::*;
use loom::thread;

// No-op `Evented` impl just so we can have something to pass to `add_source`.
struct NotEvented;

impl Evented for NotEvented {
fn register(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}

fn reregister(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}

fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
Ok(())
}
}

#[test]
fn tokens_unique_when_dropped() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
})
}

#[test]
fn tokens_unique_when_dropped_on_full_page() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
// add sources to fill up the first page so that the dropped index
// may be reused.
for _ in 0..31 {
inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
}

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
})
}

#[test]
fn tokens_unique_concurrent_add() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();

let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
token_2
});

let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let token_2 = thread.join().unwrap();

assert!(token_1 != token_2);
})
}
}
Loading