1
1
use std:: fmt;
2
2
use std:: pin:: Pin ;
3
- use std:: sync :: { Arc , Mutex } ;
4
- use std:: sync:: atomic :: { AtomicBool , Ordering } ;
3
+ use std:: cell :: { Cell , RefCell } ;
4
+ use std:: sync:: Arc ;
5
5
use std:: future:: Future ;
6
6
use std:: task:: { Poll , Context } ;
7
7
use std:: collections:: VecDeque ;
@@ -138,44 +138,46 @@ where
138
138
F : Future < Output = ( ) > + ' static ,
139
139
{
140
140
struct Task {
141
- future : Mutex < Option < Pin < Box < dyn Future < Output = ( ) > + ' static > > > > ,
142
- is_queued : AtomicBool ,
141
+ // This is an Option so that the Future can be immediately dropped when it's finished
142
+ future : RefCell < Option < Pin < Box < dyn Future < Output = ( ) > + ' static > > > > ,
143
+ is_queued : Cell < bool > ,
143
144
}
144
145
145
146
impl Task {
146
147
#[ inline]
147
148
fn new < F > ( future : F ) -> Arc < Self > where F : Future < Output = ( ) > + ' static {
148
149
Arc :: new ( Self {
149
- future : Mutex :: new ( Some ( Box :: pin ( future) ) ) ,
150
- is_queued : AtomicBool :: new ( false ) ,
150
+ future : RefCell :: new ( Some ( Box :: pin ( future) ) ) ,
151
+ is_queued : Cell :: new ( false ) ,
151
152
} )
152
153
}
153
154
}
154
155
155
156
impl ArcWake for Task {
156
157
fn wake_by_ref ( arc_self : & Arc < Self > ) {
157
- // TODO can this be more relaxed ?
158
- if !arc_self . is_queued . swap ( true , Ordering :: SeqCst ) {
159
- let mut lock = EXECUTOR . tasks . lock ( ) . unwrap_throw ( ) ;
158
+ if arc_self . is_queued . replace ( true ) {
159
+ return ;
160
+ }
160
161
161
- lock . push_back ( arc_self . clone ( ) ) ;
162
+ let mut lock = EXECUTOR . tasks . borrow_mut ( ) ;
162
163
163
- EXECUTOR . next_tick . schedule ( ) ;
164
- }
164
+ lock. push_back ( arc_self. clone ( ) ) ;
165
+
166
+ EXECUTOR . next_tick . schedule ( ) ;
165
167
}
166
168
}
167
169
168
170
169
171
struct NextTick {
170
- is_spinning : AtomicBool ,
172
+ is_spinning : Cell < bool > ,
171
173
promise : Promise ,
172
174
closure : Closure < dyn FnMut ( JsValue ) > ,
173
175
}
174
176
175
177
impl NextTick {
176
178
fn new < F > ( mut f : F ) -> Self where F : FnMut ( ) + ' static {
177
179
Self {
178
- is_spinning : AtomicBool :: new ( false ) ,
180
+ is_spinning : Cell :: new ( false ) ,
179
181
promise : Promise :: resolve ( & JsValue :: null ( ) ) ,
180
182
closure : Closure :: wrap ( Box :: new ( move |_| {
181
183
f ( ) ;
@@ -184,22 +186,22 @@ where
184
186
}
185
187
186
188
fn schedule ( & self ) {
187
- // TODO can this be more relaxed ?
188
- if !self . is_spinning . swap ( true , Ordering :: SeqCst ) {
189
- // TODO avoid creating a new Promise
190
- self . promise . then ( & self . closure ) ;
189
+ if self . is_spinning . replace ( true ) {
190
+ return ;
191
191
}
192
+
193
+ // TODO avoid creating a new Promise
194
+ self . promise . then ( & self . closure ) ;
192
195
}
193
196
194
197
fn done ( & self ) {
195
- // TODO can this be more relaxed ?
196
- self . is_spinning . store ( false , Ordering :: SeqCst ) ;
198
+ self . is_spinning . set ( false ) ;
197
199
}
198
200
}
199
201
200
202
201
203
struct Executor {
202
- tasks : Mutex < VecDeque < Arc < Task > > > ,
204
+ tasks : RefCell < VecDeque < Arc < Task > > > ,
203
205
next_tick : NextTick ,
204
206
}
205
207
@@ -209,31 +211,33 @@ where
209
211
210
212
lazy_static ! {
211
213
static ref EXECUTOR : Executor = Executor {
212
- tasks: Mutex :: new( VecDeque :: new( ) ) ,
214
+ tasks: RefCell :: new( VecDeque :: new( ) ) ,
213
215
next_tick: NextTick :: new( || {
214
216
let tasks = & EXECUTOR . tasks;
215
217
216
218
loop {
217
- let mut lock = tasks. lock ( ) . unwrap_throw ( ) ;
219
+ let mut lock = tasks. borrow_mut ( ) ;
218
220
219
221
match lock. pop_front( ) {
220
222
Some ( task) => {
221
223
// This is necessary because the polled task might queue more tasks
222
224
drop( lock) ;
223
225
224
- let mut future = task. future. lock( ) . unwrap_throw( ) ;
226
+ let mut future = task. future. borrow_mut( ) ;
227
+
228
+ let poll = {
229
+ let mut future = future. as_mut( ) . unwrap_throw( ) ;
225
230
226
- let poll = future. as_mut( ) . map( |mut future| {
227
231
// Clear `is_queued` flag so that it will re-queue if poll calls waker.wake()
228
- task. is_queued. store ( false , Ordering :: SeqCst ) ;
232
+ task. is_queued. set ( false ) ;
229
233
230
234
// TODO is there some way of saving these so they don't need to be recreated all the time ?
231
235
let waker = ArcWake :: into_waker( task. clone( ) ) ;
232
236
let cx = & mut Context :: from_waker( & waker) ;
233
237
Pin :: new( & mut future) . poll( cx)
234
- } ) ;
238
+ } ;
235
239
236
- if let Some ( Poll :: Ready ( _) ) = poll {
240
+ if let Poll :: Ready ( _) = poll {
237
241
* future = None ;
238
242
}
239
243
} ,
0 commit comments