Skip to content

Commit a04ab1a

Browse files
committed
io: rewrite slab to support compaction
The I/O driver uses a slab to store per-resource state. Doing this provides two benefits. First, allocating state is streamlined. Second, resources may be safetly indexed using a `usize` type. The `usize` is used passed to the OS's selector when registering for receiving events. The original slab implementation used a `Vec` backed by `RwLock`. This primarily caused contention when reading state. This implementation also only **grew** the slab capacity but never shrank. In #1625, the slab was rewritten to use a lock-free strategy. The lock contention was removed but this implementation was still grow-only. This change adds the ability to release memory. Similar to the previous implementation, it structures the slab to use a vector of pages. This enables growing the slab without having to move any previous entries. It also adds the ability to release pages. This is done by introducing a lock when allocating / releasing slab entries. This does not impact benchmarks, primarily due to the existing implementation not being "done" and also having a lock around allocating and releasing. A `Slab::compact()` function is added. Pages are iterated. When a page is found with no slots in use, the page is freed. The `compact()` function is called occassionally by the I/O driver.
1 parent d8490c1 commit a04ab1a

File tree

17 files changed

+881
-1229
lines changed

17 files changed

+881
-1229
lines changed

tokio/src/io/driver/mod.rs

Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,30 @@ pub(crate) mod platform;
33
mod scheduled_io;
44
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
55

6-
use crate::loom::sync::atomic::AtomicUsize;
76
use crate::park::{Park, Unpark};
87
use crate::runtime::context;
9-
use crate::util::slab::{Address, Slab};
8+
use crate::util::bit;
9+
use crate::util::slab::{self, Slab};
1010

1111
use mio::event::Evented;
1212
use std::fmt;
1313
use std::io;
14-
use std::sync::atomic::Ordering::SeqCst;
1514
use std::sync::{Arc, Weak};
1615
use std::task::Waker;
1716
use std::time::Duration;
1817

1918
/// I/O driver, backed by Mio
2019
pub(crate) struct Driver {
20+
/// Tracks the number of times `turn` is called. It is safe for this to wrap
21+
/// as it is mostly used to determine when to call `compact()`
22+
tick: u16,
23+
2124
/// Reuse the `mio::Events` value across calls to poll.
22-
events: mio::Events,
25+
events: Option<mio::Events>,
26+
27+
/// Primary slab handle containing the state for each resource registered
28+
/// with this driver.
29+
resources: Slab<ScheduledIo>,
2330

2431
/// State shared between the reactor and the handles.
2532
inner: Arc<Inner>,
@@ -37,11 +44,8 @@ pub(super) struct Inner {
3744
/// The underlying system event queue.
3845
io: mio::Poll,
3946

40-
/// Dispatch slabs for I/O and futures events
41-
pub(super) io_dispatch: Slab<ScheduledIo>,
42-
43-
/// The number of sources in `io_dispatch`.
44-
n_sources: AtomicUsize,
47+
/// Allocates `ScheduledIo` handles when creating new resources.
48+
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
4549

4650
/// Used to wake up the reactor from a call to `turn`
4751
wakeup: mio::SetReadiness,
@@ -53,7 +57,19 @@ pub(super) enum Direction {
5357
Write,
5458
}
5559

56-
const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
60+
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
61+
// token.
62+
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
63+
64+
const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
65+
66+
// Packs the generation value in the `readiness` field.
67+
//
68+
// The generation prevents a race condition where a slab slot is reused for a
69+
// new socket while the I/O driver is about to apply a readiness event. The
70+
// generaton value is checked when setting new readiness. If the generation do
71+
// not match, then the readiness event is discarded.
72+
const GENERATION: bit::Pack = ADDRESS.then(7);
5773

5874
fn _assert_kinds() {
5975
fn _assert<T: Send + Sync>() {}
@@ -69,6 +85,8 @@ impl Driver {
6985
pub(crate) fn new() -> io::Result<Driver> {
7086
let io = mio::Poll::new()?;
7187
let wakeup_pair = mio::Registration::new2();
88+
let slab = Slab::new();
89+
let allocator = slab.allocator();
7290

7391
io.register(
7492
&wakeup_pair.0,
@@ -78,12 +96,13 @@ impl Driver {
7896
)?;
7997

8098
Ok(Driver {
81-
events: mio::Events::with_capacity(1024),
99+
tick: 0,
100+
events: Some(mio::Events::with_capacity(1024)),
101+
resources: slab,
82102
_wakeup_registration: wakeup_pair.0,
83103
inner: Arc::new(Inner {
84104
io,
85-
io_dispatch: Slab::new(),
86-
n_sources: AtomicUsize::new(0),
105+
io_dispatch: allocator,
87106
wakeup: wakeup_pair.1,
88107
}),
89108
})
@@ -102,16 +121,27 @@ impl Driver {
102121
}
103122

104123
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
124+
// How often to call `compact()` on the resource slab
125+
const COMPACT_INTERVAL: u16 = 256;
126+
127+
self.tick = self.tick.wrapping_add(1);
128+
129+
if self.tick % COMPACT_INTERVAL == 0 {
130+
self.resources.compact();
131+
}
132+
133+
let mut events = self.events.take().expect("i/o driver event store missing");
134+
105135
// Block waiting for an event to happen, peeling out how many events
106136
// happened.
107-
match self.inner.io.poll(&mut self.events, max_wait) {
137+
match self.inner.io.poll(&mut events, max_wait) {
108138
Ok(_) => {}
109139
Err(e) => return Err(e),
110140
}
111141

112142
// Process all the events that came in, dispatching appropriately
113143

114-
for event in self.events.iter() {
144+
for event in events.iter() {
115145
let token = event.token();
116146

117147
if token == TOKEN_WAKEUP {
@@ -124,22 +154,24 @@ impl Driver {
124154
}
125155
}
126156

157+
self.events = Some(events);
158+
127159
Ok(())
128160
}
129161

130-
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
162+
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
131163
let mut rd = None;
132164
let mut wr = None;
133165

134-
let address = Address::from_usize(token.0);
166+
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
135167

136-
let io = match self.inner.io_dispatch.get(address) {
168+
let io = match self.resources.get(addr) {
137169
Some(io) => io,
138170
None => return,
139171
};
140172

141173
if io
142-
.set_readiness(address, |curr| curr | ready.as_usize())
174+
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
143175
.is_err()
144176
{
145177
// token no longer valid!
@@ -164,6 +196,18 @@ impl Driver {
164196
}
165197
}
166198

199+
impl Drop for Driver {
200+
fn drop(&mut self) {
201+
self.resources.for_each(|io| {
202+
// If a task is waiting on the I/O resource, notify it. The task
203+
// will then attempt to use the I/O resource and fail due to the
204+
// driver being shutdown.
205+
io.reader.wake();
206+
io.writer.wake();
207+
})
208+
}
209+
}
210+
167211
impl Park for Driver {
168212
type Unpark = Handle;
169213
type Error = io::Error;
@@ -246,46 +290,32 @@ impl Inner {
246290
&self,
247291
source: &dyn Evented,
248292
ready: mio::Ready,
249-
) -> io::Result<Address> {
250-
let address = self.io_dispatch.alloc().ok_or_else(|| {
293+
) -> io::Result<slab::Ref<ScheduledIo>> {
294+
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
251295
io::Error::new(
252296
io::ErrorKind::Other,
253297
"reactor at max registered I/O resources",
254298
)
255299
})?;
256300

257-
self.n_sources.fetch_add(1, SeqCst);
301+
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
258302

259-
self.io.register(
260-
source,
261-
mio::Token(address.to_usize()),
262-
ready,
263-
mio::PollOpt::edge(),
264-
)?;
303+
self.io
304+
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
265305

266-
Ok(address)
306+
Ok(shared)
267307
}
268308

269309
/// Deregisters an I/O resource from the reactor.
270310
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
271311
self.io.deregister(source)
272312
}
273313

274-
pub(super) fn drop_source(&self, address: Address) {
275-
self.io_dispatch.remove(address);
276-
self.n_sources.fetch_sub(1, SeqCst);
277-
}
278-
279314
/// Registers interest in the I/O resource associated with `token`.
280-
pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
281-
let sched = self
282-
.io_dispatch
283-
.get(token)
284-
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
285-
315+
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
286316
let waker = match dir {
287-
Direction::Read => &sched.reader,
288-
Direction::Write => &sched.writer,
317+
Direction::Read => &io.reader,
318+
Direction::Write => &io.writer,
289319
};
290320

291321
waker.register(w);

tokio/src/io/driver/scheduled_io.rs

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,30 @@
11
use crate::loom::future::AtomicWaker;
22
use crate::loom::sync::atomic::AtomicUsize;
3-
use crate::util::bit;
4-
use crate::util::slab::{Address, Entry, Generation};
3+
use crate::util::slab::Entry;
54

6-
use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst};
5+
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
76

7+
/// Stored in the I/O driver resource slab.
88
#[derive(Debug)]
99
pub(crate) struct ScheduledIo {
10+
/// Packs the resource's readiness with the resource's generation.
1011
readiness: AtomicUsize,
12+
13+
/// Task waiting on read readiness
1114
pub(crate) reader: AtomicWaker,
15+
16+
/// Task waiting on write readiness
1217
pub(crate) writer: AtomicWaker,
1318
}
1419

15-
const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH);
16-
1720
impl Entry for ScheduledIo {
18-
fn generation(&self) -> Generation {
19-
unpack_generation(self.readiness.load(SeqCst))
20-
}
21-
22-
fn reset(&self, generation: Generation) -> bool {
23-
let mut current = self.readiness.load(Acquire);
24-
25-
loop {
26-
if unpack_generation(current) != generation {
27-
return false;
28-
}
29-
30-
let next = PACK.pack(generation.next().to_usize(), 0);
21+
fn reset(&self) {
22+
let state = self.readiness.load(Acquire);
3123

32-
match self
33-
.readiness
34-
.compare_exchange(current, next, AcqRel, Acquire)
35-
{
36-
Ok(_) => break,
37-
Err(actual) => current = actual,
38-
}
39-
}
24+
let generation = super::GENERATION.unpack(state);
25+
let next = super::GENERATION.pack_lossy(generation + 1, 0);
4026

41-
drop(self.reader.take_waker());
42-
drop(self.writer.take_waker());
43-
44-
true
27+
self.readiness.store(next, Release);
4528
}
4629
}
4730

@@ -56,6 +39,10 @@ impl Default for ScheduledIo {
5639
}
5740

5841
impl ScheduledIo {
42+
pub(crate) fn generation(&self) -> usize {
43+
super::GENERATION.unpack(self.readiness.load(Acquire))
44+
}
45+
5946
#[cfg(all(test, loom))]
6047
/// Returns the current readiness value of this `ScheduledIo`, if the
6148
/// provided `token` is still a valid access.
@@ -92,32 +79,35 @@ impl ScheduledIo {
9279
/// Otherwise, this returns the previous readiness.
9380
pub(crate) fn set_readiness(
9481
&self,
95-
address: Address,
82+
token: Option<usize>,
9683
f: impl Fn(usize) -> usize,
9784
) -> Result<usize, ()> {
98-
let generation = address.generation();
99-
10085
let mut current = self.readiness.load(Acquire);
10186

10287
loop {
103-
// Check that the generation for this access is still the current
104-
// one.
105-
if unpack_generation(current) != generation {
106-
return Err(());
88+
let current_generation = super::GENERATION.unpack(current);
89+
90+
if let Some(token) = token {
91+
// Check that the generation for this access is still the
92+
// current one.
93+
if super::GENERATION.unpack(token) != current_generation {
94+
return Err(());
95+
}
10796
}
97+
10898
// Mask out the generation bits so that the modifying function
10999
// doesn't see them.
110100
let current_readiness = current & mio::Ready::all().as_usize();
111101
let new = f(current_readiness);
112102

113103
debug_assert!(
114-
new <= !PACK.max_value(),
104+
new <= super::ADDRESS.max_value(),
115105
"new readiness value would overwrite generation bits!"
116106
);
117107

118108
match self.readiness.compare_exchange(
119109
current,
120-
PACK.pack(generation.to_usize(), new),
110+
super::GENERATION.pack(current_generation, new),
121111
AcqRel,
122112
Acquire,
123113
) {
@@ -135,7 +125,3 @@ impl Drop for ScheduledIo {
135125
self.reader.wake();
136126
}
137127
}
138-
139-
fn unpack_generation(src: usize) -> Generation {
140-
Generation::new(PACK.unpack(src))
141-
}

0 commit comments

Comments
 (0)