Skip to content

Commit e466e1a

Browse files
committed
moved threadsafe futures behind a flag
1 parent 2fdfe79 commit e466e1a

File tree

3 files changed

+156
-15
lines changed

3 files changed

+156
-15
lines changed

crates/futures/src/lib.rs

Lines changed: 137 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,36 @@ pub mod futures_0_3;
110110
use std::cell::{Cell, RefCell};
111111
use std::fmt;
112112
use std::rc::Rc;
113+
#[cfg(target_feature = "atomics")]
113114
use std::sync::atomic::{AtomicBool, Ordering};
114115
use std::sync::Arc;
116+
#[cfg(target_feature = "atomics")]
117+
use std::sync::Mutex;
115118

116119
use futures::executor::{self, Notify, Spawn};
117120
use futures::future;
118121
use futures::prelude::*;
119122
use futures::sync::oneshot;
120-
use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer};
123+
use js_sys::{Function, Promise};
124+
#[cfg(target_feature = "atomics")]
125+
use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly};
121126
use wasm_bindgen::prelude::*;
127+
#[cfg(target_feature = "atomics")]
128+
use wasm_bindgen::JsCast;
122129

130+
#[cfg(target_feature = "atomics")]
123131
mod polyfill;
124132

133+
macro_rules! console_log {
134+
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
135+
}
136+
137+
#[wasm_bindgen]
138+
extern "C" {
139+
#[wasm_bindgen(js_namespace = console)]
140+
fn log(s: &str);
141+
}
142+
125143
/// A Rust `Future` backed by a JavaScript `Promise`.
126144
///
127145
/// This type is constructed with a JavaScript `Promise` object and translates
@@ -255,7 +273,8 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
255273
resolve,
256274
reject,
257275
notified: Cell::new(State::Notified),
258-
waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)),
276+
#[cfg(target_feature = "atomics")]
277+
waker: Arc::new(Waker::new(vec![0; 4], false)),
259278
}));
260279
});
261280

@@ -275,6 +294,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
275294
resolve: Function,
276295
reject: Function,
277296

297+
#[cfg(target_feature = "atomics")]
278298
// Struct to wake a future
279299
waker: Arc<Waker>,
280300
}
@@ -307,38 +327,59 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
307327
Waiting(Arc<Package>),
308328
}
309329

330+
#[cfg(target_feature = "atomics")]
310331
struct Waker {
311-
buffer: SharedArrayBuffer,
332+
array: Vec<i32>,
312333
notified: AtomicBool,
313334
};
314335

336+
#[cfg(target_feature = "atomics")]
315337
impl Waker {
316-
fn new(buffer: SharedArrayBuffer, notified: bool) -> Self {
317-
Self {
318-
buffer,
338+
fn new(array: Vec<i32>, notified: bool) -> Self {
339+
Waker {
340+
array,
319341
notified: AtomicBool::new(notified),
320342
}
321343
}
322344
}
323345

346+
#[cfg(target_feature = "atomics")]
324347
impl Notify for Waker {
325348
fn notify(&self, id: usize) {
349+
console_log!("Waker notify");
326350
if !self.notified.swap(true, Ordering::SeqCst) {
327-
let array = Int32Array::new(&self.buffer);
351+
console_log!("Waker, inside if");
352+
let memory_buffer = wasm_bindgen::memory()
353+
.dyn_into::<WebAssembly::Memory>()
354+
.expect("Should cast a memory to WebAssembly::Memory")
355+
.buffer();
356+
357+
let array_location = self.array.as_ptr() as u32 / 4;
358+
let array = Int32Array::new(&memory_buffer)
359+
.subarray(array_location, array_location + self.array.len() as u32);
360+
328361
let _ = Atomics::notify(&array, id as u32);
329362
}
330363
}
331364
}
332365

366+
#[cfg(target_feature = "atomics")]
333367
fn poll_again(package: Arc<Package>, id: usize) {
368+
console_log!("poll_again called");
334369
let me = match package.notified.replace(State::Notified) {
335370
// we need to schedule polling to resume, so keep going
336-
State::Waiting(me) => me,
371+
State::Waiting(me) => {
372+
console_log!("poll_again Waiting");
373+
me
374+
},
337375

338376
// we were already notified, and were just notified again;
339377
// having now coalesced the notifications we return as it's
340378
// still someone else's job to process this
341-
State::Notified => return,
379+
State::Notified => {
380+
console_log!("poll_again Notified");
381+
return;
382+
},
342383

343384
// the future was previously being polled, and we've just
344385
// switched it to the "you're notified" state. We don't have
@@ -347,9 +388,21 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
347388
// continue polling. For us, though, there's nothing else to do,
348389
// so we bail out.
349390
// later see
350-
State::Polling => return,
391+
State::Polling => {
392+
console_log!("poll_again Polling");
393+
return;
394+
},
351395
};
352396

397+
let memory_buffer = wasm_bindgen::memory()
398+
.dyn_into::<WebAssembly::Memory>()
399+
.expect("Should cast a memory to WebAssembly::Memory")
400+
.buffer();
401+
402+
let array_location = package.waker.array.as_ptr() as u32 / 4;
403+
let array = Int32Array::new(&memory_buffer)
404+
.subarray(array_location, array_location + package.waker.array.len() as u32);
405+
353406
// Use `Promise.then` on a resolved promise to place our execution
354407
// onto the next turn of the microtask queue, enqueueing our poll
355408
// operation. We don't currently poll immediately as it turns out
@@ -358,19 +411,26 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
358411
//
359412
// Note that the `Rc`/`RefCell` trick here is basically to just
360413
// ensure that our `Closure` gets cleaned up appropriately.
361-
let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0)
414+
let promise = polyfill::wait_async(array, id as u32, 0)
362415
.expect("Should create a Promise");
363416
let slot = Rc::new(RefCell::new(None));
364417
let slot2 = slot.clone();
365418
let closure = Closure::wrap(Box::new(move |_| {
366419
let myself = slot2.borrow_mut().take();
367420
debug_assert!(myself.is_some());
368421
Package::poll(&me);
369-
}) as Box<FnMut(JsValue)>);
422+
}) as Box<dyn FnMut(JsValue)>);
370423
promise.then(&closure);
371424
*slot.borrow_mut() = Some(closure);
372425
}
373426

427+
// No shared memory right now, wasm is single threaded, no need to worry
428+
// about this!
429+
#[cfg(not(target_feature = "atomics"))]
430+
unsafe impl Send for Package {}
431+
#[cfg(not(target_feature = "atomics"))]
432+
unsafe impl Sync for Package {}
433+
374434
impl Package {
375435
// Move the future contained in `me` as far forward as we can. This will
376436
// do as much synchronous work as possible to complete the future,
@@ -386,7 +446,9 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
386446
match me.notified.replace(State::Polling) {
387447
// We received a notification while previously polling, or
388448
// this is the initial poll. We've got work to do below!
389-
State::Notified => {}
449+
State::Notified => {
450+
console_log!("Package::poll Notified");
451+
}
390452

391453
// We've gone through this loop once and no notification was
392454
// received while we were executing work. That means we got
@@ -396,15 +458,31 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
396458
// When the notification comes in it'll notify our task, see
397459
// our `Waiting` state, and resume the polling process
398460
State::Polling => {
461+
console_log!("Package::poll Polling");
462+
399463
me.notified.set(State::Waiting(me.clone()));
464+
465+
#[cfg(target_feature = "atomics")]
400466
poll_again(me.clone(), 0);
467+
401468
break;
402469
}
403470

404-
State::Waiting(_) => panic!("shouldn't see waiting state!"),
471+
State::Waiting(_) => {
472+
console_log!("Package::poll Waiting");
473+
474+
panic!("shouldn't see waiting state!")
475+
},
405476
}
406477

407-
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) {
478+
479+
#[cfg(target_feature = "atomics")]
480+
let waker = &me.waker;
481+
482+
#[cfg(not(target_feature = "atomics"))]
483+
let waker = me;
484+
485+
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) {
408486
// If the future is ready, immediately call the
409487
// resolve/reject callback and then return as we're done.
410488
Ok(Async::Ready(value)) => (value, &me.resolve),
@@ -420,6 +498,50 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
420498
}
421499
}
422500
}
501+
502+
#[cfg(not(target_feature = "atomics"))]
503+
impl Notify for Package {
504+
fn notify(&self, _id: usize) {
505+
console_log!("Package::notify Waiting");
506+
let me = match self.notified.replace(State::Notified) {
507+
// we need to schedule polling to resume, so keep going
508+
State::Waiting(me) => me,
509+
510+
// we were already notified, and were just notified again;
511+
// having now coalesced the notifications we return as it's
512+
// still someone else's job to process this
513+
State::Notified => return,
514+
515+
// the future was previously being polled, and we've just
516+
// switched it to the "you're notified" state. We don't have
517+
// access to the future as it's being polled, so the future
518+
// polling process later sees this notification and will
519+
// continue polling. For us, though, there's nothing else to do,
520+
// so we bail out.
521+
// later see
522+
State::Polling => return,
523+
};
524+
525+
// Use `Promise.then` on a resolved promise to place our execution
526+
// onto the next turn of the microtask queue, enqueueing our poll
527+
// operation. We don't currently poll immediately as it turns out
528+
// `futures` crate adapters aren't compatible with it and it also
529+
// helps avoid blowing the stack by accident.
530+
//
531+
// Note that the `Rc`/`RefCell` trick here is basically to just
532+
// ensure that our `Closure` gets cleaned up appropriately.
533+
let promise = Promise::resolve(&JsValue::undefined());
534+
let slot = Rc::new(RefCell::new(None));
535+
let slot2 = slot.clone();
536+
let closure = Closure::wrap(Box::new(move |_| {
537+
let myself = slot2.borrow_mut().take();
538+
debug_assert!(myself.is_some());
539+
Package::poll(&me);
540+
}) as Box<dyn FnMut(JsValue)>);
541+
promise.then(&closure);
542+
*slot.borrow_mut() = Some(closure);
543+
}
544+
}
423545
}
424546

425547
/// Converts a Rust `Future` on a local task queue.

crates/futures/src/polyfill.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,23 @@ use wasm_bindgen::prelude::*;
4747
use wasm_bindgen::JsCast;
4848
use web_sys::{MessageEvent, Worker};
4949

50+
macro_rules! console_log {
51+
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
52+
}
53+
54+
#[wasm_bindgen]
55+
extern "C" {
56+
#[wasm_bindgen(js_namespace = console)]
57+
fn log(s: &str);
58+
}
59+
5060
const HELPER_CODE: &'static str = "
5161
onmessage = function (ev) {
5262
try {
5363
switch (ev.data[0]) {
5464
case 'wait': {
5565
let [_, ia, index, value, timeout] = ev.data;
66+
console.log('wait event inside a worker');
5667
let result = Atomics.wait(ia, index, value, timeout);
5768
postMessage(['ok', result]);
5869
break;
@@ -115,16 +126,20 @@ pub fn wait_async_with_timeout(
115126
timeout: f64,
116127
) -> Result<Promise, JsValue> {
117128
if !indexed_array.buffer().has_type::<SharedArrayBuffer>() {
129+
console_log!("polyfill, not a SharedArrayBuffer");
118130
return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into());
119131
}
120132

121133
// Optimization, avoid the helper thread in this common case.
122134
if Atomics::load(&indexed_array, index)? != value {
135+
console_log!("polyfill, not-equal");
123136
return Ok(Promise::resolve(&JsString::from("not-equal")));
124137
}
125138

126139
// General case, we must wait.
127140

141+
console_log!("polyfill, general case");
142+
128143
Ok(Promise::new(
129144
&mut Box::new(move |resolve: Function, reject: Function| {
130145
let helper = alloc_helper();
@@ -161,6 +176,8 @@ pub fn wait_async_with_timeout(
161176
.borrow()
162177
.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
163178

179+
onmessage_callback.forget();
180+
164181
// It's possible to do better here if the ia is already known to the
165182
// helper. In that case we can communicate the other data through
166183
// shared memory and wake the agent. And it is possible to make ia

crates/futures/tests/tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ extern crate wasm_bindgen;
66
extern crate wasm_bindgen_futures;
77
extern crate wasm_bindgen_test;
88

9+
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
10+
911
use futures::unsync::oneshot;
1012
use futures::Future;
1113
use wasm_bindgen::prelude::*;

0 commit comments

Comments
 (0)