Skip to content

Commit 46c36a9

Browse files
committed
New zero alloc, no_std compatible implementation
This implementation relies on the fact that the Thread is cloneable and layout-compatible with a pointer, thus one may just build a waker out of the thread without unnecessary wrapping behind Arcs.
1 parent 767e6db commit 46c36a9

File tree

2 files changed

+166
-68
lines changed

2 files changed

+166
-68
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ license = "Apache-2.0/MIT"
1515
readme = "README.md"
1616

1717
[features]
18+
default = ["std"]
1819
macro = ["pollster-macro"]
20+
std = []
1921

2022
[dependencies]
2123
pollster-macro = { version = "0.1", path = "macro", optional = true }

src/lib.rs

Lines changed: 164 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
#![doc = include_str!("../README.md")]
22
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
33

4-
use std::{
4+
#![cfg_attr(not(feature = "std"), no_std)]
5+
6+
use core::{
57
future::Future,
6-
sync::{Arc, Condvar, Mutex},
7-
task::{Context, Poll, Wake, Waker},
8+
mem,
9+
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
810
};
911

12+
#[cfg(feature = "std")]
13+
use std::thread::{self, Thread};
14+
1015
#[cfg(feature = "macro")]
1116
pub use pollster_macro::{main, test};
1217

@@ -24,108 +29,199 @@ pub trait FutureExt: Future {
2429
/// let result = my_fut.block_on();
2530
/// ```
2631
fn block_on(self) -> Self::Output where Self: Sized { block_on(self) }
32+
33+
/// Block the thread until the future is ready with custom thread parking implementation.
34+
///
35+
/// This allows one to use custom thread parking mechanisms in `no_std` environments.
36+
///
37+
/// # Example
38+
///
39+
/// ```
40+
/// use pollster::FutureExt as _;
41+
/// use std::thread::Thread;
42+
///
43+
/// let my_fut = async {};
44+
///
45+
/// let result = my_fut.block_on_t::<Thread>();
46+
/// ```
47+
fn block_on_t<T: Parkable>(self) -> Self::Output where Self: Sized { block_on_t::<T, Self>(self) }
2748
}
2849

2950
impl<F: Future> FutureExt for F {}
3051

31-
enum SignalState {
32-
Empty,
33-
Waiting,
34-
Notified,
52+
/// Parkable handle.
53+
///
54+
/// This handle allows a thread to potentially be efficiently blocked. This is used in the polling
55+
/// implementation to wait for wakeups.
56+
///
57+
/// The interface models that of `std::thread`, and many functions, such as
58+
/// [`current`](Parkable::current), [`park`](Parkable::park), and [`unpark`](Parkable::unpark)
59+
/// map to `std::thread` equivalents.
60+
pub trait Parkable: Sized + Clone {
61+
/// Get handle to current thread.
62+
fn current() -> Self;
63+
64+
/// Park the current thread.
65+
fn park();
66+
67+
/// Unpark specified thread.
68+
fn unpark(&self);
69+
70+
/// Convert self into opaque pointer.
71+
///
72+
/// This requires `Self` to either be layout compatible with `*const ()` or heap allocated upon
73+
/// switch.
74+
fn into_opaque(self) -> *const ();
75+
76+
/// Convert opaque pointer into `Self`.
77+
///
78+
/// # Safety
79+
///
80+
/// This function is safe if the `data` argument is a valid park handle created by
81+
/// `Self::into_opaque`.
82+
unsafe fn from_opaque(data: *const ()) -> Self;
83+
84+
/// Create a waker out of `self`
85+
///
86+
/// This function will clone self and build a `Waker` object.
87+
fn waker(&self) -> Waker {
88+
let data = self.clone().into_opaque();
89+
// SAFETY: `RawWaker` created by `raw_waker` builds a waker object out of the raw data and
90+
// vtable methods of this type which we assume are correct.
91+
unsafe {
92+
Waker::from_raw(raw_waker::<Self>(data))
93+
}
94+
}
3595
}
3696

37-
struct Signal {
38-
state: Mutex<SignalState>,
39-
cond: Condvar,
97+
#[cfg(feature = "std")]
98+
pub type DefaultHandle = Thread;
99+
#[cfg(not(feature = "std"))]
100+
pub type DefaultHandle = *const ();
101+
102+
fn raw_waker<T: Parkable>(data: *const ()) -> RawWaker {
103+
RawWaker::new(
104+
data,
105+
&RawWakerVTable::new(
106+
clone_waker::<T>,
107+
wake::<T>,
108+
wake_by_ref::<T>,
109+
drop_waker::<T>,
110+
),
111+
)
40112
}
41113

42-
impl Signal {
43-
fn new() -> Self {
44-
Self {
45-
state: Mutex::new(SignalState::Empty),
46-
cond: Condvar::new(),
47-
}
114+
unsafe fn clone_waker<T: Parkable>(data: *const ()) -> RawWaker {
115+
let waker = T::from_opaque(data);
116+
mem::forget(waker.clone());
117+
mem::forget(waker);
118+
raw_waker::<T>(data)
119+
}
120+
121+
unsafe fn wake<T: Parkable>(data: *const ()) {
122+
let waker = T::from_opaque(data);
123+
waker.unpark();
124+
}
125+
126+
unsafe fn wake_by_ref<T: Parkable>(data: *const ()) {
127+
let waker = T::from_opaque(data);
128+
waker.unpark();
129+
mem::forget(waker);
130+
}
131+
132+
unsafe fn drop_waker<T: Parkable>(data: *const ()) {
133+
let _ = T::from_opaque(data);
134+
}
135+
136+
#[cfg(feature = "std")]
137+
impl Parkable for Thread {
138+
fn current() -> Self {
139+
thread::current()
48140
}
49141

50-
fn wait(&self) {
51-
let mut state = self.state.lock().unwrap();
52-
match *state {
53-
// Notify() was called before we got here, consume it here without waiting and return immediately.
54-
SignalState::Notified => *state = SignalState::Empty,
55-
// This should not be possible because our signal is created within a function and never handed out to any
56-
// other threads. If this is the case, we have a serious problem so we panic immediately to avoid anything
57-
// more problematic happening.
58-
SignalState::Waiting => {
59-
unreachable!("Multiple threads waiting on the same signal: Open a bug report!");
60-
}
61-
SignalState::Empty => {
62-
// Nothing has happened yet, and we're the only thread waiting (as should be the case!). Set the state
63-
// accordingly and begin polling the condvar in a loop until it's no longer telling us to wait. The
64-
// loop prevents incorrect spurious wakeups.
65-
*state = SignalState::Waiting;
66-
while let SignalState::Waiting = *state {
67-
state = self.cond.wait(state).unwrap();
68-
}
69-
}
70-
}
142+
fn park() {
143+
thread::park();
71144
}
72145

73-
fn notify(&self) {
74-
let mut state = self.state.lock().unwrap();
75-
match *state {
76-
// The signal was already notified, no need to do anything because the thread will be waking up anyway
77-
SignalState::Notified => {}
78-
// The signal wasn't notified but a thread isn't waiting on it, so we can avoid doing unnecessary work by
79-
// skipping the condvar and leaving behind a message telling the thread that a notification has already
80-
// occurred should it come along in the future.
81-
SignalState::Empty => *state = SignalState::Notified,
82-
// The signal wasn't notified and there's a waiting thread. Reset the signal so it can be wait()'ed on again
83-
// and wake up the thread. Because there should only be a single thread waiting, `notify_all` would also be
84-
// valid.
85-
SignalState::Waiting => {
86-
*state = SignalState::Empty;
87-
self.cond.notify_one();
88-
}
89-
}
146+
fn unpark(&self) {
147+
Thread::unpark(self);
148+
}
149+
150+
fn into_opaque(self) -> *const () {
151+
// SAFETY: `Thread` internal layout is an Arc to inner type, which is represented as a
152+
// single pointer. The only thing we do with the pointer is transmute it back to
153+
// ThreadWaker in the waker functions. If for whatever reason Thread layout will change to
154+
// contain multiple fields, this will still be safe, because the compiler will simply
155+
// refuse to compile the program.
156+
unsafe { mem::transmute::<_, *const ()>(self) }
157+
}
158+
159+
unsafe fn from_opaque(data: *const ()) -> Self {
160+
mem::transmute(data)
90161
}
91162
}
92163

93-
impl Wake for Signal {
94-
fn wake(self: Arc<Self>) {
95-
self.notify();
164+
impl Parkable for *const () {
165+
fn current() -> Self {
166+
core::ptr::null()
167+
}
168+
169+
fn park() {
170+
core::hint::spin_loop()
171+
}
172+
173+
fn unpark(&self) {}
174+
175+
fn into_opaque(self) -> *const () {
176+
self
177+
}
178+
179+
unsafe fn from_opaque(data: *const ()) -> Self {
180+
data
96181
}
97182
}
98183

99-
/// Block the thread until the future is ready.
184+
/// Block the thread until the future is ready with custom parking implementation.
185+
///
186+
/// This allows one to use custom thread parking mechanisms in `no_std` environments.
100187
///
101188
/// # Example
102189
///
103190
/// ```
191+
/// use std::thread::Thread;
192+
///
104193
/// let my_fut = async {};
105-
/// let result = pollster::block_on(my_fut);
194+
/// let result = pollster::block_on_t::<Thread, _>(my_fut);
106195
/// ```
107-
pub fn block_on<F: Future>(mut fut: F) -> F::Output {
196+
pub fn block_on_t<T: Parkable, F: Future>(mut fut: F) -> F::Output {
108197
// Pin the future so that it can be polled.
109198
// SAFETY: We shadow `fut` so that it cannot be used again. The future is now pinned to the stack and will not be
110199
// moved until the end of this scope. This is, incidentally, exactly what the `pin_mut!` macro from `pin_utils`
111200
// does.
112201
let mut fut = unsafe { std::pin::Pin::new_unchecked(&mut fut) };
113202

114-
// Signal used to wake up the thread for polling as the future moves to completion. We need to use an `Arc`
115-
// because, although the lifetime of `fut` is limited to this function, the underlying IO abstraction might keep
116-
// the signal alive for far longer. `Arc` is a thread-safe way to allow this to happen.
117-
// TODO: Investigate ways to reuse this `Arc<Signal>`... perhaps via a `static`?
118-
let signal = Arc::new(Signal::new());
203+
let handle = T::current();
119204

120-
// Create a context that will be passed to the future.
121-
let waker = Waker::from(Arc::clone(&signal));
205+
let waker: Waker = handle.waker();
122206
let mut context = Context::from_waker(&waker);
123207

124208
// Poll the future to completion
125209
loop {
126210
match fut.as_mut().poll(&mut context) {
127-
Poll::Pending => signal.wait(),
211+
Poll::Pending => T::park(),
128212
Poll::Ready(item) => break item,
129213
}
130214
}
131215
}
216+
217+
/// Block the thread until the future is ready.
218+
///
219+
/// # Example
220+
///
221+
/// ```
222+
/// let my_fut = async {};
223+
/// let result = pollster::block_on(my_fut);
224+
/// ```
225+
pub fn block_on<F: Future>(fut: F) -> F::Output {
226+
return block_on_t::<DefaultHandle, _>(fut);
227+
}

0 commit comments

Comments
 (0)