Skip to content

Commit 983d859

Browse files
committed
New no_std compatible implementation
This implementation adds generic `Parkable` trait that can be implemented on any type. This enables to opt out from the default `Parkable` implementation on `Thread` with `default-features = false`.
1 parent 767e6db commit 983d859

File tree

2 files changed

+197
-69
lines changed

2 files changed

+197
-69
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ license = "Apache-2.0/MIT"
1515
readme = "README.md"
1616

1717
[features]
18+
default = ["std"]
1819
macro = ["pollster-macro"]
20+
std = []
1921

2022
[dependencies]
2123
pollster-macro = { version = "0.1", path = "macro", optional = true }

src/lib.rs

Lines changed: 195 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
#![doc = include_str!("../README.md")]
22
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
3+
#![cfg_attr(not(feature = "std"), no_std)]
34

4-
use std::{
5+
use core::{
56
future::Future,
6-
sync::{Arc, Condvar, Mutex},
7-
task::{Context, Poll, Wake, Waker},
7+
mem,
8+
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
9+
};
10+
11+
#[cfg(feature = "std")]
12+
use std::{
13+
sync::Arc,
14+
thread::{self, Thread},
815
};
916

1017
#[cfg(feature = "macro")]
@@ -23,109 +30,228 @@ pub trait FutureExt: Future {
2330
///
2431
/// let result = my_fut.block_on();
2532
/// ```
26-
fn block_on(self) -> Self::Output where Self: Sized { block_on(self) }
33+
fn block_on(self) -> Self::Output
34+
where
35+
Self: Sized,
36+
{
37+
block_on(self)
38+
}
39+
40+
/// Block the thread until the future is ready with custom thread parking implementation.
41+
///
42+
/// This allows one to use custom thread parking mechanisms in `no_std` environments.
43+
///
44+
/// # Example
45+
///
46+
/// ```
47+
/// use pollster::FutureExt as _;
48+
///
49+
/// let my_fut = async {};
50+
///
51+
/// let result = my_fut.block_on_t::<()>();
52+
/// ```
53+
fn block_on_t<T: Parkable>(self) -> Self::Output
54+
where
55+
Self: Sized,
56+
{
57+
block_on_t::<T, Self>(self)
58+
}
2759
}
2860

2961
impl<F: Future> FutureExt for F {}
3062

31-
enum SignalState {
32-
Empty,
33-
Waiting,
34-
Notified,
63+
/// Parkable handle.
64+
///
65+
/// This handle allows a thread to potentially be efficiently blocked. This is used in the polling
66+
/// implementation to wait for wakeups.
67+
///
68+
/// The interface models that of `std::thread`, and many functions, such as
69+
/// [`current`](Parkable::current), [`park`](Parkable::park), and [`unpark`](Parkable::unpark)
70+
/// map to `std::thread` equivalents.
71+
pub trait Parkable: Sized + Clone {
72+
/// Get handle to current thread.
73+
fn current() -> Self;
74+
75+
/// Park the current thread.
76+
fn park();
77+
78+
/// Unpark specified thread.
79+
fn unpark(&self);
80+
81+
/// Convert self into opaque pointer.
82+
///
83+
/// This requires `Self` to either be layout compatible with `*const ()` or heap allocated upon
84+
/// switch.
85+
fn into_opaque(self) -> *const ();
86+
87+
/// Convert opaque pointer into `Self`.
88+
///
89+
/// # Safety
90+
///
91+
/// This function is safe if the `data` argument is a valid park handle created by
92+
/// `Self::into_opaque`.
93+
unsafe fn from_opaque(data: *const ()) -> Self;
94+
95+
/// Create a waker out of `self`
96+
///
97+
/// This function will clone self and build a `Waker` object.
98+
fn waker(&self) -> Waker {
99+
let data = self.clone().into_opaque();
100+
// SAFETY: `RawWaker` created by `raw_waker` builds a waker object out of the raw data and
101+
// vtable methods of this type which we assume are correct.
102+
unsafe { Waker::from_raw(raw_waker::<Self>(data)) }
103+
}
35104
}
36105

37-
struct Signal {
38-
state: Mutex<SignalState>,
39-
cond: Condvar,
106+
#[cfg(all(feature = "std", not(pollster_unstable)))]
107+
pub type DefaultHandle = Arc<Thread>;
108+
#[cfg(all(feature = "std", pollster_unstable))]
109+
pub type DefaultHandle = Thread;
110+
#[cfg(not(feature = "std"))]
111+
pub type DefaultHandle = ();
112+
113+
fn raw_waker<T: Parkable>(data: *const ()) -> RawWaker {
114+
RawWaker::new(
115+
data,
116+
&RawWakerVTable::new(
117+
clone_waker::<T>,
118+
wake::<T>,
119+
wake_by_ref::<T>,
120+
drop_waker::<T>,
121+
),
122+
)
40123
}
41124

42-
impl Signal {
43-
fn new() -> Self {
44-
Self {
45-
state: Mutex::new(SignalState::Empty),
46-
cond: Condvar::new(),
47-
}
125+
unsafe fn clone_waker<T: Parkable>(data: *const ()) -> RawWaker {
126+
let waker = T::from_opaque(data);
127+
mem::forget(waker.clone());
128+
mem::forget(waker);
129+
raw_waker::<T>(data)
130+
}
131+
132+
unsafe fn wake<T: Parkable>(data: *const ()) {
133+
let waker = T::from_opaque(data);
134+
waker.unpark();
135+
}
136+
137+
unsafe fn wake_by_ref<T: Parkable>(data: *const ()) {
138+
let waker = T::from_opaque(data);
139+
waker.unpark();
140+
mem::forget(waker);
141+
}
142+
143+
unsafe fn drop_waker<T: Parkable>(data: *const ()) {
144+
let _ = T::from_opaque(data);
145+
}
146+
147+
#[cfg(feature = "std")]
148+
impl Parkable for Arc<Thread> {
149+
fn current() -> Self {
150+
thread::current().into()
48151
}
49152

50-
fn wait(&self) {
51-
let mut state = self.state.lock().unwrap();
52-
match *state {
53-
// Notify() was called before we got here, consume it here without waiting and return immediately.
54-
SignalState::Notified => *state = SignalState::Empty,
55-
// This should not be possible because our signal is created within a function and never handed out to any
56-
// other threads. If this is the case, we have a serious problem so we panic immediately to avoid anything
57-
// more problematic happening.
58-
SignalState::Waiting => {
59-
unreachable!("Multiple threads waiting on the same signal: Open a bug report!");
60-
}
61-
SignalState::Empty => {
62-
// Nothing has happened yet, and we're the only thread waiting (as should be the case!). Set the state
63-
// accordingly and begin polling the condvar in a loop until it's no longer telling us to wait. The
64-
// loop prevents incorrect spurious wakeups.
65-
*state = SignalState::Waiting;
66-
while let SignalState::Waiting = *state {
67-
state = self.cond.wait(state).unwrap();
68-
}
69-
}
70-
}
153+
fn park() {
154+
thread::park();
71155
}
72156

73-
fn notify(&self) {
74-
let mut state = self.state.lock().unwrap();
75-
match *state {
76-
// The signal was already notified, no need to do anything because the thread will be waking up anyway
77-
SignalState::Notified => {}
78-
// The signal wasn't notified but a thread isn't waiting on it, so we can avoid doing unnecessary work by
79-
// skipping the condvar and leaving behind a message telling the thread that a notification has already
80-
// occurred should it come along in the future.
81-
SignalState::Empty => *state = SignalState::Notified,
82-
// The signal wasn't notified and there's a waiting thread. Reset the signal so it can be wait()'ed on again
83-
// and wake up the thread. Because there should only be a single thread waiting, `notify_all` would also be
84-
// valid.
85-
SignalState::Waiting => {
86-
*state = SignalState::Empty;
87-
self.cond.notify_one();
88-
}
89-
}
157+
fn unpark(&self) {
158+
Thread::unpark(&**self);
159+
}
160+
161+
fn into_opaque(self) -> *const () {
162+
Arc::into_raw(self) as *const ()
163+
}
164+
165+
unsafe fn from_opaque(data: *const ()) -> Self {
166+
Arc::from_raw(data as *const Thread)
90167
}
91168
}
92169

93-
impl Wake for Signal {
94-
fn wake(self: Arc<Self>) {
95-
self.notify();
170+
#[cfg(all(feature = "std", pollster_unstable))]
171+
impl Parkable for Thread {
172+
fn current() -> Self {
173+
thread::current().into()
174+
}
175+
176+
fn park() {
177+
thread::park();
178+
}
179+
180+
fn unpark(&self) {
181+
Thread::unpark(self);
182+
}
183+
184+
fn into_opaque(self) -> *const () {
185+
// SAFETY: `Thread` internal layout is an Arc to inner type, which is represented as a
186+
// single pointer. The only thing we do with the pointer is transmute it back to
187+
// ThreadWaker in the waker functions. If for whatever reason Thread layout will change to
188+
// contain multiple fields, this may still be okay, because the compiler will simply
189+
// refuse to compile the program.
190+
unsafe { mem::transmute::<_, *const ()>(self) }
191+
}
192+
193+
unsafe fn from_opaque(data: *const ()) -> Self {
194+
mem::transmute(data)
96195
}
97196
}
98197

99-
/// Block the thread until the future is ready.
198+
impl Parkable for () {
199+
fn current() -> Self {}
200+
201+
fn park() {
202+
core::hint::spin_loop()
203+
}
204+
205+
fn unpark(&self) {}
206+
207+
fn into_opaque(self) -> *const () {
208+
&()
209+
}
210+
211+
unsafe fn from_opaque(_: *const ()) -> Self {
212+
()
213+
}
214+
}
215+
216+
/// Block the thread until the future is ready with custom parking implementation.
217+
///
218+
/// This allows one to use custom thread parking mechanisms in `no_std` environments.
100219
///
101220
/// # Example
102221
///
103222
/// ```
104223
/// let my_fut = async {};
105-
/// let result = pollster::block_on(my_fut);
224+
/// let result = pollster::block_on_t::<(), _>(my_fut);
106225
/// ```
107-
pub fn block_on<F: Future>(mut fut: F) -> F::Output {
226+
pub fn block_on_t<T: Parkable, F: Future>(mut fut: F) -> F::Output {
108227
// Pin the future so that it can be polled.
109228
// SAFETY: We shadow `fut` so that it cannot be used again. The future is now pinned to the stack and will not be
110229
// moved until the end of this scope. This is, incidentally, exactly what the `pin_mut!` macro from `pin_utils`
111230
// does.
112231
let mut fut = unsafe { std::pin::Pin::new_unchecked(&mut fut) };
113232

114-
// Signal used to wake up the thread for polling as the future moves to completion. We need to use an `Arc`
115-
// because, although the lifetime of `fut` is limited to this function, the underlying IO abstraction might keep
116-
// the signal alive for far longer. `Arc` is a thread-safe way to allow this to happen.
117-
// TODO: Investigate ways to reuse this `Arc<Signal>`... perhaps via a `static`?
118-
let signal = Arc::new(Signal::new());
233+
let handle = T::current();
119234

120-
// Create a context that will be passed to the future.
121-
let waker = Waker::from(Arc::clone(&signal));
235+
let waker: Waker = handle.waker();
122236
let mut context = Context::from_waker(&waker);
123237

124238
// Poll the future to completion
125239
loop {
126240
match fut.as_mut().poll(&mut context) {
127-
Poll::Pending => signal.wait(),
241+
Poll::Pending => T::park(),
128242
Poll::Ready(item) => break item,
129243
}
130244
}
131245
}
246+
247+
/// Block the thread until the future is ready.
248+
///
249+
/// # Example
250+
///
251+
/// ```
252+
/// let my_fut = async {};
253+
/// let result = pollster::block_on(my_fut);
254+
/// ```
255+
pub fn block_on<F: Future>(fut: F) -> F::Output {
256+
return block_on_t::<DefaultHandle, _>(fut);
257+
}

0 commit comments

Comments
 (0)