Skip to content

Commit e433282

Browse files
author
Stjepan Glavina
committed
Rewrite the park/unpark mechanism
1 parent 0f76470 commit e433282

File tree

2 files changed

+108
-68
lines changed

2 files changed

+108
-68
lines changed

tokio-executor/src/park.rs

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
use std::marker::PhantomData;
4848
use std::rc::Rc;
4949
use std::sync::{Arc, Mutex, Condvar};
50-
use std::sync::atomic::{AtomicUsize, Ordering};
50+
use std::sync::atomic::AtomicUsize;
51+
use std::sync::atomic::Ordering::SeqCst;
5152
use std::time::Duration;
5253

5354
/// Block the current thread.
@@ -237,37 +238,56 @@ impl Inner {
237238
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
238239
// If currently notified, then we skip sleeping. This is checked outside
239240
// of the lock to avoid acquiring a mutex if not necessary.
240-
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
241-
NOTIFY => return Ok(()),
242-
IDLE => {},
243-
_ => unreachable!(),
241+
if self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst).is_ok() {
242+
return Ok(());
243+
}
244+
245+
// If the duration is zero, then there is no need to actually block
246+
if let Some(ref dur) = timeout {
247+
if *dur == Duration::from_millis(0) {
248+
return Ok(());
249+
}
244250
}
245251

246252
// The state is currently idle, so obtain the lock and then try to
247253
// transition to a sleeping state.
248254
let mut m = self.mutex.lock().unwrap();
249255

250256
// Transition to sleeping
251-
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
252-
NOTIFY => {
257+
match self.state.compare_exchange(IDLE, SLEEP, SeqCst, SeqCst) {
258+
Ok(_) => {}
259+
Err(NOTIFY) => {
253260
// Notified before we could sleep, consume the notification and
254261
// exit
255-
self.state.store(IDLE, Ordering::SeqCst);
262+
self.state.store(IDLE, SeqCst);
256263
return Ok(());
257264
}
258-
IDLE => {},
259265
_ => unreachable!(),
260266
}
261267

262-
m = match timeout {
263-
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
264-
None => self.condvar.wait(m).unwrap(),
268+
match timeout {
269+
Some(timeout) => {
270+
m = self.condvar.wait_timeout(m, timeout).unwrap().0;
271+
272+
// Transition back to idle. If the state has transitioned to `NOTIFY`,
273+
// this will consume that notification.
274+
match self.state.swap(IDLE, SeqCst) {
275+
NOTIFY => {}, // Got a notification
276+
SLEEP => {}, // No notification, timed out
277+
_ => unreachable!(),
278+
}
279+
}
280+
None => {
281+
loop {
282+
m = self.condvar.wait(m).unwrap();
283+
match self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst) {
284+
Ok(_) => break, // Got a notification
285+
Err(_) => {} // Spurious wakeup, go back to sleep
286+
}
287+
}
288+
}
265289
};
266290

267-
// Transition back to idle. If the state has transitioned to `NOTIFY`,
268-
// this will consume that notification
269-
self.state.store(IDLE, Ordering::SeqCst);
270-
271291
// Explicitly drop the mutex guard. There is no real point in doing it
272292
// except that I find it helpful to make it explicit where we want the
273293
// mutex to unlock.
@@ -277,26 +297,30 @@ impl Inner {
277297
}
278298

279299
fn unpark(&self) {
280-
// First, try transitioning from IDLE -> NOTIFY, this does not require a
281-
// lock.
282-
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
283-
IDLE | NOTIFY => return,
284-
SLEEP => {}
285-
_ => unreachable!(),
286-
}
287-
288-
// The other half is sleeping, this requires a lock
289-
let _m = self.mutex.lock().unwrap();
300+
loop {
301+
// First, try transitioning from IDLE -> NOTIFY, this does not require a
302+
// lock.
303+
match self.state.compare_exchange(IDLE, NOTIFY, SeqCst, SeqCst) {
304+
Ok(_) => return, // No one was waiting
305+
Err(NOTIFY) => return, // Already unparked
306+
Err(SLEEP) => {} // Gotta wake up
307+
_ => unreachable!(),
308+
}
290309

291-
// Transition to NOTIFY
292-
match self.state.swap(NOTIFY, Ordering::SeqCst) {
293-
SLEEP => {}
294-
NOTIFY => return,
295-
IDLE => return,
296-
_ => unreachable!(),
310+
// The other half is sleeping, this requires a lock
311+
let _m = self.mutex.lock().unwrap();
312+
313+
// Transition to NOTIFY
314+
match self.state.compare_exchange(SLEEP, NOTIFY, SeqCst, SeqCst) {
315+
Ok(_) => {
316+
// Wakeup the sleeper
317+
self.condvar.notify_one();
318+
return;
319+
}
320+
Err(NOTIFY) => return, // A different thread unparked
321+
Err(IDLE) => {} // Parked thread went away, try again
322+
_ => unreachable!(),
323+
}
297324
}
298-
299-
// Wakeup the sleeper
300-
self.condvar.notify_one();
301325
}
302326
}

tokio-threadpool/src/park/default_park.rs

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,8 @@ impl Inner {
9999
fn park(&self, timeout: Option<Duration>) {
100100
// If currently notified, then we skip sleeping. This is checked outside
101101
// of the lock to avoid acquiring a mutex if not necessary.
102-
match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) {
103-
NOTIFY => return,
104-
IDLE => {},
105-
_ => unreachable!(),
102+
if self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst).is_ok() {
103+
return;
106104
}
107105

108106
// If the duration is zero, then there is no need to actually block
@@ -117,54 +115,72 @@ impl Inner {
117115
let mut m = self.mutex.lock().unwrap();
118116

119117
// Transition to sleeping
120-
match self.state.compare_and_swap(IDLE, SLEEP, SeqCst) {
121-
NOTIFY => {
118+
match self.state.compare_exchange(IDLE, SLEEP, SeqCst, SeqCst) {
119+
Ok(_) => {}
120+
Err(NOTIFY) => {
122121
// Notified before we could sleep, consume the notification and
123122
// exit
124123
self.state.store(IDLE, SeqCst);
125124
return;
126125
}
127-
IDLE => {},
128126
_ => unreachable!(),
129127
}
130128

131-
m = match timeout {
132-
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
133-
None => self.condvar.wait(m).unwrap(),
129+
match timeout {
130+
Some(timeout) => {
131+
m = self.condvar.wait_timeout(m, timeout).unwrap().0;
132+
133+
// Transition back to idle. If the state has transitioned to `NOTIFY`,
134+
// this will consume that notification.
135+
match self.state.swap(IDLE, SeqCst) {
136+
NOTIFY => {}, // Got a notification
137+
SLEEP => {}, // No notification, timed out
138+
_ => unreachable!(),
139+
}
140+
}
141+
None => {
142+
loop {
143+
m = self.condvar.wait(m).unwrap();
144+
match self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst) {
145+
Ok(_) => break, // Got a notification
146+
Err(_) => {} // Spurious wakeup, go back to sleep
147+
}
148+
}
149+
}
134150
};
135151

136-
// Transition back to idle. If the state has transitioned to `NOTIFY`,
137-
// this will consume that notification.
138-
self.state.store(IDLE, SeqCst);
139-
140152
// Explicitly drop the mutex guard. There is no real point in doing it
141153
// except that I find it helpful to make it explicit where we want the
142154
// mutex to unlock.
143155
drop(m);
144156
}
145157

146158
fn unpark(&self) {
147-
// First, try transitioning from IDLE -> NOTIFY, this does not require a
148-
// lock.
149-
match self.state.compare_and_swap(IDLE, NOTIFY, SeqCst) {
150-
IDLE | NOTIFY => return,
151-
SLEEP => {}
152-
_ => unreachable!(),
153-
}
154-
155-
// The other half is sleeping, this requires a lock
156-
let _m = self.mutex.lock().unwrap();
159+
loop {
160+
// First, try transitioning from IDLE -> NOTIFY, this does not require a
161+
// lock.
162+
match self.state.compare_exchange(IDLE, NOTIFY, SeqCst, SeqCst) {
163+
Ok(_) => return, // No one was waiting
164+
Err(NOTIFY) => return, // Already unparked
165+
Err(SLEEP) => {} // Gotta wake up
166+
_ => unreachable!(),
167+
}
157168

158-
// Transition to NOTIFY
159-
match self.state.swap(NOTIFY, SeqCst) {
160-
SLEEP => {}
161-
NOTIFY => return,
162-
IDLE => return,
163-
_ => unreachable!(),
169+
// The other half is sleeping, this requires a lock
170+
let _m = self.mutex.lock().unwrap();
171+
172+
// Transition to NOTIFY
173+
match self.state.compare_exchange(SLEEP, NOTIFY, SeqCst, SeqCst) {
174+
Ok(_) => {
175+
// Wakeup the sleeper
176+
self.condvar.notify_one();
177+
return;
178+
}
179+
Err(NOTIFY) => return, // A different thread unparked
180+
Err(IDLE) => {} // Parked thread went away, try again
181+
_ => unreachable!(),
182+
}
164183
}
165-
166-
// Wakeup the sleeper
167-
self.condvar.notify_one();
168184
}
169185
}
170186

0 commit comments

Comments
 (0)