Skip to content

Commit a85335d

Browse files
authored
Merge pull request #1507 from Pauan/futures-0.3
Adding in Futures 0.3 support to wasm-bindgen-futures
2 parents d382ad7 + c00262f commit a85335d

File tree

3 files changed

+280
-0
lines changed

3 files changed

+280
-0
lines changed

crates/futures/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ edition = "2018"
1414
futures = "0.1.20"
1515
js-sys = { path = "../js-sys", version = '0.3.20' }
1616
wasm-bindgen = { path = "../..", version = '0.2.43' }
17+
futures-util-preview = { version = "0.3.0-alpha.15", optional = true }
18+
futures-channel-preview = { version = "0.3.0-alpha.15", optional = true }
19+
lazy_static = { version = "1.3.0", optional = true }
1720

1821
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
1922
wasm-bindgen-test = { path = '../test', version = '0.2.43' }
23+
24+
[features]
25+
futures_0_3 = ["futures-util-preview", "futures-channel-preview", "lazy_static"]

crates/futures/src/futures_0_3.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use std::fmt;
2+
use std::pin::Pin;
3+
use std::cell::{Cell, RefCell};
4+
use std::sync::Arc;
5+
use std::future::Future;
6+
use std::task::{Poll, Context};
7+
use std::collections::VecDeque;
8+
9+
use futures_util::task::ArcWake;
10+
use futures_util::future::FutureExt;
11+
use futures_channel::oneshot;
12+
13+
use lazy_static::lazy_static;
14+
15+
use js_sys::Promise;
16+
use wasm_bindgen::prelude::*;
17+
18+
/// A Rust `Future` backed by a JavaScript `Promise`.
19+
///
20+
/// This type is constructed with a JavaScript `Promise` object and translates
21+
/// it to a Rust `Future`. This type implements the `Future` trait from the
22+
/// `futures` crate and will either succeed or fail depending on what happens
23+
/// with the JavaScript `Promise`.
24+
///
25+
/// Currently this type is constructed with `JsFuture::from`.
26+
pub struct JsFuture {
27+
resolved: oneshot::Receiver<JsValue>,
28+
rejected: oneshot::Receiver<JsValue>,
29+
_cb_resolve: Closure<FnMut(JsValue)>,
30+
_cb_reject: Closure<FnMut(JsValue)>,
31+
}
32+
33+
impl fmt::Debug for JsFuture {
34+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35+
write!(f, "JsFuture {{ ... }}")
36+
}
37+
}
38+
39+
impl From<Promise> for JsFuture {
40+
fn from(js: Promise) -> JsFuture {
41+
// Use the `then` method to schedule two callbacks, one for the
42+
// resolved value and one for the rejected value. These two callbacks
43+
// will be connected to oneshot channels which feed back into our
44+
// future.
45+
//
46+
// This may not be the speediest option today but it should work!
47+
let (tx1, rx1) = oneshot::channel();
48+
49+
let cb_resolve = Closure::once(move |val| {
50+
tx1.send(val).unwrap_throw();
51+
});
52+
53+
let (tx2, rx2) = oneshot::channel();
54+
55+
let cb_reject = Closure::once(move |val| {
56+
tx2.send(val).unwrap_throw();
57+
});
58+
59+
js.then2(&cb_resolve, &cb_reject);
60+
61+
JsFuture {
62+
resolved: rx1,
63+
rejected: rx2,
64+
_cb_resolve: cb_resolve,
65+
_cb_reject: cb_reject,
66+
}
67+
}
68+
}
69+
70+
impl Future for JsFuture {
71+
type Output = Result<JsValue, JsValue>;
72+
73+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
74+
// Test if either our resolved or rejected side is finished yet.
75+
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) {
76+
return Poll::Ready(Ok(val.unwrap_throw()));
77+
}
78+
79+
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
80+
return Poll::Ready(Err(val.unwrap_throw()));
81+
}
82+
83+
Poll::Pending
84+
}
85+
}
86+
87+
/// Converts a Rust `Future` into a JavaScript `Promise`.
88+
///
89+
/// This function will take any future in Rust and schedule it to be executed,
90+
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
91+
/// to get plumbed into the rest of a system.
92+
///
93+
/// The `future` provided must adhere to `'static` because it'll be scheduled
94+
/// to run in the background and cannot contain any stack references. The
95+
/// returned `Promise` will be resolved or rejected when the future completes,
96+
/// depending on whether it finishes with `Ok` or `Err`.
97+
///
98+
/// # Panics
99+
///
100+
/// Note that in wasm panics are currently translated to aborts, but "abort" in
101+
/// this case means that a JavaScript exception is thrown. The wasm module is
102+
/// still usable (likely erroneously) after Rust panics.
103+
///
104+
/// If the `future` provided panics then the returned `Promise` **will not
105+
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
106+
/// limitation of wasm currently that's hoped to be fixed one day!
107+
pub fn future_to_promise<F>(future: F) -> Promise
108+
where
109+
F: Future<Output = Result<JsValue, JsValue>> + 'static,
110+
{
111+
let mut future = Some(future);
112+
113+
Promise::new(&mut |resolve, reject| {
114+
// TODO change Promise::new to be FnOnce
115+
spawn_local(future.take().unwrap_throw().map(move |val| {
116+
match val {
117+
Ok(val) => {
118+
resolve.call1(&JsValue::undefined(), &val).unwrap_throw();
119+
},
120+
Err(val) => {
121+
reject.call1(&JsValue::undefined(), &val).unwrap_throw();
122+
},
123+
}
124+
}));
125+
})
126+
}
127+
128+
/// Runs a Rust `Future` on a local task queue.
129+
///
130+
/// The `future` provided must adhere to `'static` because it'll be scheduled
131+
/// to run in the background and cannot contain any stack references.
132+
///
133+
/// # Panics
134+
///
135+
/// This function has the same panic behavior as `future_to_promise`.
136+
pub fn spawn_local<F>(future: F)
137+
where
138+
F: Future<Output = ()> + 'static,
139+
{
140+
struct Task {
141+
// This is an Option so that the Future can be immediately dropped when it's finished
142+
future: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 'static>>>>,
143+
144+
// This is used to ensure that the Task will only be queued once
145+
is_queued: Cell<bool>,
146+
}
147+
148+
impl Task {
149+
#[inline]
150+
fn new<F>(future: F) -> Arc<Self> where F: Future<Output = ()> + 'static {
151+
Arc::new(Self {
152+
future: RefCell::new(Some(Box::pin(future))),
153+
is_queued: Cell::new(false),
154+
})
155+
}
156+
}
157+
158+
impl ArcWake for Task {
159+
fn wake_by_ref(arc_self: &Arc<Self>) {
160+
// This ensures that it's only queued once
161+
if arc_self.is_queued.replace(true) {
162+
return;
163+
}
164+
165+
let mut lock = EXECUTOR.tasks.borrow_mut();
166+
167+
lock.push_back(arc_self.clone());
168+
169+
// The Task will be polled on the next microtask event tick
170+
EXECUTOR.next_tick.schedule();
171+
}
172+
}
173+
174+
175+
struct NextTick {
176+
is_spinning: Cell<bool>,
177+
promise: Promise,
178+
closure: Closure<dyn FnMut(JsValue)>,
179+
}
180+
181+
impl NextTick {
182+
#[inline]
183+
fn new<F>(mut f: F) -> Self where F: FnMut() + 'static {
184+
Self {
185+
is_spinning: Cell::new(false),
186+
promise: Promise::resolve(&JsValue::null()),
187+
closure: Closure::wrap(Box::new(move |_| {
188+
f();
189+
})),
190+
}
191+
}
192+
193+
fn schedule(&self) {
194+
// This ensures that it's only scheduled once
195+
if self.is_spinning.replace(true) {
196+
return;
197+
}
198+
199+
// TODO avoid creating a new Promise
200+
self.promise.then(&self.closure);
201+
}
202+
203+
fn done(&self) {
204+
self.is_spinning.set(false);
205+
}
206+
}
207+
208+
209+
struct Executor {
210+
// This is a queue of Tasks which will be polled in order
211+
tasks: RefCell<VecDeque<Arc<Task>>>,
212+
213+
// This is used to ensure that Tasks are polled on the next microtask event tick
214+
next_tick: NextTick,
215+
}
216+
217+
// TODO This is only safe because JS is currently single-threaded
218+
unsafe impl Send for Executor {}
219+
unsafe impl Sync for Executor {}
220+
221+
lazy_static! {
222+
static ref EXECUTOR: Executor = Executor {
223+
tasks: RefCell::new(VecDeque::new()),
224+
225+
// This closure will only be called on the next microtask event tick
226+
next_tick: NextTick::new(|| {
227+
let tasks = &EXECUTOR.tasks;
228+
229+
loop {
230+
let mut lock = tasks.borrow_mut();
231+
232+
match lock.pop_front() {
233+
Some(task) => {
234+
let mut future = task.future.borrow_mut();
235+
236+
let poll = {
237+
// This will only panic if the Future wakes up the Waker after returning Poll::Ready
238+
let mut future = future.as_mut().unwrap_throw();
239+
240+
// Clear `is_queued` flag so that it will re-queue if poll calls waker.wake()
241+
task.is_queued.set(false);
242+
243+
// This is necessary because the polled task might queue more tasks
244+
drop(lock);
245+
246+
// TODO is there some way of saving these so they don't need to be recreated all the time ?
247+
let waker = ArcWake::into_waker(task.clone());
248+
let cx = &mut Context::from_waker(&waker);
249+
Pin::new(&mut future).poll(cx)
250+
};
251+
252+
if let Poll::Ready(_) = poll {
253+
// Cleanup the Future immediately
254+
*future = None;
255+
}
256+
},
257+
None => {
258+
// All of the Tasks have been polled, so it's now possible to schedule the NextTick again
259+
EXECUTOR.next_tick.done();
260+
break;
261+
},
262+
}
263+
}
264+
}),
265+
};
266+
}
267+
268+
269+
ArcWake::wake_by_ref(&Task::new(future));
270+
}

crates/futures/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@
103103
104104
#![deny(missing_docs)]
105105

106+
#[cfg(feature = "futures_0_3")]
107+
/// Contains a Futures 0.3 implementation of this crate.
108+
pub mod futures_0_3;
109+
106110
use std::cell::{Cell, RefCell};
107111
use std::fmt;
108112
use std::rc::Rc;

0 commit comments

Comments
 (0)