From f1ca67ea3a4ad4fe3f27869abc6cb3ed8c6deb32 Mon Sep 17 00:00:00 2001 From: Derek Chiang Date: Sun, 9 Feb 2014 18:22:14 -0800 Subject: [PATCH 1/4] Implement exponential backoff --- src/libgreen/lib.rs | 2 -- src/libgreen/sched.rs | 27 +++++++++++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 8758eb1179ef9..0e968e8da6de6 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -368,7 +368,6 @@ impl SchedPool { pool.sleepers.clone(), pool.task_state.clone()); pool.handles.push(sched.make_handle()); - let sched = sched; pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); } @@ -430,7 +429,6 @@ impl SchedPool { self.task_state.clone()); let ret = sched.make_handle(); self.handles.push(sched.make_handle()); - let sched = sched; self.threads.push(Thread::start(proc() { sched.bootstrap() })); return ret; diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index ad32ba7ba6d1c..3982838544cc1 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -9,6 +9,8 @@ // except according to those terms. use std::cast; +use std::libc::funcs::posix88::unistd; +use std::num; use std::rand::{XorShiftRng, Rng, Rand}; use std::rt::local::Local; use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; @@ -26,6 +28,11 @@ use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; use msgq = message_queue; +/// By default, a scheduler tries to back off three times before it +/// goes to sleep. +/// TODO: Make this value configurable. +static EXPONENTIAL_BACKOFF_FACTOR: uint = 3; + /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified /// Rust Task. When not running this task is stored in the scheduler @@ -63,6 +70,8 @@ pub struct Scheduler { /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, + /// We only go to sleep when backoff_counter hits 0. + backoff_counter: uint, stack_pool: StackPool, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. @@ -151,6 +160,7 @@ impl Scheduler { message_producer: producer, sleepy: false, no_sleep: false, + backoff_counter: 0, event_loop: event_loop, work_queue: work_queue, work_queues: work_queues, @@ -325,12 +335,17 @@ impl Scheduler { // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. if !sched.sleepy && !sched.no_sleep { - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - // Since we are sleeping, deactivate the idle callback. - sched.idle_callback.get_mut_ref().pause(); + if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { + sched.backoff_counter = 0; + rtdebug!("scheduler has no work to do, going to sleep"); + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + // Since we are sleeping, deactivate the idle callback. + sched.idle_callback.get_mut_ref().pause(); + } + unsafe { unistd::usleep(num::pow(2, sched.backoff_counter) as u32); } + sched.backoff_counter += 1; } else { rtdebug!("not sleeping, already doing so or no_sleep set"); // We may not be sleeping, but we still need to deactivate From bd7d10ff7d795f4f9f7e73c30d415e7ad2e33688 Mon Sep 17 00:00:00 2001 From: Derek Chiang Date: Thu, 13 Feb 2014 21:05:03 -0500 Subject: [PATCH 2/4] Getting rid of the use of the global sleeper list. This commit is NOT production ready at all and will eventually be squashed. --- src/libgreen/lib.rs | 34 +++++++++++------- src/libgreen/sched.rs | 80 ++++++++++++++++++++++--------------------- 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 0e968e8da6de6..df5c45cccdb22 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -187,7 +187,6 @@ use std::vec; use std::sync::arc::UnsafeArc; use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor}; -use sleeper_list::SleeperList; use stack::StackPool; use task::GreenTask; @@ -303,7 +302,6 @@ pub struct SchedPool { priv next_friend: uint, priv stack_pool: StackPool, priv deque_pool: deque::BufferPool<~task::GreenTask>, - priv sleepers: SleeperList, priv factory: fn() -> ~rtio::EventLoop, priv task_state: TaskState, priv tasks_done: Port<()>, @@ -340,7 +338,6 @@ impl SchedPool { handles: ~[], stealers: ~[], id: unsafe { POOL_ID.fetch_add(1, SeqCst) }, - sleepers: SleeperList::new(), stack_pool: StackPool::new(), deque_pool: deque::BufferPool::new(), next_friend: 0, @@ -358,15 +355,24 @@ impl SchedPool { // Now that we've got all our work queues, create one scheduler per // queue, spawn the scheduler into a thread, and be sure to keep a // handle to the scheduler and the thread to keep them alive. + let mut scheds = ~[]; for worker in workers.move_iter() { + scheds.push(~Scheduler::new(pool.id, + (pool.factory)(), + worker, + pool.stealers.clone(), + pool.task_state.clone())); + } + // Assign left and right neighbors to each scheduler + for i in range(0, scheds.len()) { + let left = if i > 0 { i - 1 } else { scheds.len() - 1 }; + let right = if (i + 1) < scheds.len() { i + 1 } else { 0 }; + scheds[i].left_sched = Some(scheds[left].make_handle()); + scheds[i].right_sched = Some(scheds[right].make_handle()); + } + for sched in scheds.move_iter() { + let mut sched = sched; rtdebug!("inserting a regular scheduler"); - - let mut sched = ~Scheduler::new(pool.id, - (pool.factory)(), - worker, - pool.stealers.clone(), - pool.sleepers.clone(), - pool.task_state.clone()); pool.handles.push(sched.make_handle()); pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); } @@ -425,8 +431,8 @@ impl SchedPool { (self.factory)(), worker, self.stealers.clone(), - self.sleepers.clone(), self.task_state.clone()); + // TODO(derekchiang): setup the scheduler's neighbors let ret = sched.make_handle(); self.handles.push(sched.make_handle()); self.threads.push(Thread::start(proc() { sched.bootstrap() })); @@ -445,7 +451,7 @@ impl SchedPool { /// native tasks or extern pools will not be waited on pub fn shutdown(mut self) { self.stealers = ~[]; - + rterrln!("BP1"); // Wait for everyone to exit. We may have reached a 0-task count // multiple times in the past, meaning there could be several buffered // messages on the `tasks_done` port. We're guaranteed that after *some* @@ -454,14 +460,16 @@ impl SchedPool { while self.task_state.active() { self.tasks_done.recv(); } - + rterrln!("BP2"); // Now that everyone's gone, tell everything to shut down. for mut handle in replace(&mut self.handles, ~[]).move_iter() { handle.send(Shutdown); } + rterrln!("BP3"); for thread in replace(&mut self.threads, ~[]).move_iter() { thread.join(); } + rterrln!("BP4"); } } diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 3982838544cc1..e3cbda6093898 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -11,11 +11,13 @@ use std::cast; use std::libc::funcs::posix88::unistd; use std::num; +use std::ptr; use std::rand::{XorShiftRng, Rng, Rand}; use std::rt::local::Local; use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; use std::rt::task::BlockedTask; use std::rt::task::Task; +use std::sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst}; use std::sync::deque; use std::unstable::mutex::NativeMutex; use std::unstable::raw; @@ -23,7 +25,6 @@ use std::unstable::raw; use TaskState; use context::Context; use coroutine::Coroutine; -use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; use msgq = message_queue; @@ -57,21 +58,21 @@ pub struct Scheduler { message_queue: msgq::Consumer, /// Producer used to clone sched handles from message_producer: msgq::Producer, - /// A shared list of sleeping schedulers. We'll use this to wake - /// up schedulers when pushing work onto the work queue. - sleeper_list: SleeperList, /// Indicates that we have previously pushed a handle onto the /// SleeperList but have not yet received the Wake message. /// Being `true` does not necessarily mean that the scheduler is /// not active since there are multiple event sources that may /// wake the scheduler. It just prevents the scheduler from pushing /// multiple handles onto the sleeper list. - sleepy: bool, + sleepy: AtomicBool, /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, /// We only go to sleep when backoff_counter hits 0. backoff_counter: uint, + /// A scheduler only ever tries to wake up its two neighboring neighbors + left_sched: Option, + right_sched: Option, stack_pool: StackPool, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. @@ -110,6 +111,20 @@ pub struct Scheduler { event_loop: ~EventLoop, } +macro_rules! wakeup_if_sleepy( + ($expr:expr) => { + unsafe { + match $expr { + Some(ref mut sched) if (*sched.sleepy).load(SeqCst) => { + sched.send(Wake); + true + }, + _ => false + } + } + } +) + /// An indication of how hard to work on a given operation, the difference /// mainly being whether memory is synchronized or not #[deriving(Eq)] @@ -133,12 +148,11 @@ impl Scheduler { event_loop: ~EventLoop, work_queue: deque::Worker<~GreenTask>, work_queues: ~[deque::Stealer<~GreenTask>], - sleeper_list: SleeperList, state: TaskState) -> Scheduler { Scheduler::new_special(pool_id, event_loop, work_queue, work_queues, - sleeper_list, true, None, state) + true, None, state) } @@ -146,7 +160,6 @@ impl Scheduler { event_loop: ~EventLoop, work_queue: deque::Worker<~GreenTask>, work_queues: ~[deque::Stealer<~GreenTask>], - sleeper_list: SleeperList, run_anything: bool, friend: Option, state: TaskState) @@ -155,12 +168,13 @@ impl Scheduler { let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, - sleeper_list: sleeper_list, message_queue: consumer, message_producer: producer, - sleepy: false, + sleepy: INIT_ATOMIC_BOOL, no_sleep: false, backoff_counter: 0, + left_sched: None, + right_sched: None, event_loop: event_loop, work_queue: work_queue, work_queues: work_queues, @@ -334,20 +348,18 @@ impl Scheduler { // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - if !sched.sleepy && !sched.no_sleep { + if !sched.no_sleep && !sched.sleepy.load(SeqCst) { if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { sched.backoff_counter = 0; - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); + rterrln!("scheduler has no work to do, going to sleep"); + sched.sleepy.store(true, SeqCst); // Since we are sleeping, deactivate the idle callback. sched.idle_callback.get_mut_ref().pause(); } unsafe { unistd::usleep(num::pow(2, sched.backoff_counter) as u32); } sched.backoff_counter += 1; } else { - rtdebug!("not sleeping, already doing so or no_sleep set"); + rterrln!("not sleeping, already doing so or no_sleep set"); // We may not be sleeping, but we still need to deactivate // the idle callback. sched.idle_callback.get_mut_ref().pause(); @@ -412,29 +424,22 @@ impl Scheduler { (sched, task, true) } Some(Wake) => { - self.sleepy = false; + self.sleepy.store(false, SeqCst); (self, stask, true) } Some(Shutdown) => { rtdebug!("shutting down"); - if self.sleepy { - // There may be an outstanding handle on the - // sleeper list. Pop them all to make sure that's - // not the case. - loop { - match self.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake); - } - None => break - } - } + rterrln!("receiving shutdown"); + if self.sleepy.load(SeqCst) { + wakeup_if_sleepy!(self.left_sched); + wakeup_if_sleepy!(self.right_sched); + self.left_sched = None; + self.right_sched = None; } // No more sleeping. After there are no outstanding // event loop references we will shut down. self.no_sleep = true; - self.sleepy = false; + self.sleepy.store(false, SeqCst); (self, stask, true) } Some(NewNeighbor(neighbor)) => { @@ -601,14 +606,9 @@ impl Scheduler { // We've made work available. Notify a // sleeping scheduler. - - match self.sleeper_list.casual_pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; + if !wakeup_if_sleepy!(self.left_sched) { + wakeup_if_sleepy!(self.right_sched); + } } // * Core Context Switching Functions @@ -879,6 +879,7 @@ impl Scheduler { return SchedHandle { remote: remote, + sleepy: ptr::to_unsafe_ptr(&(self.sleepy)), queue: self.message_producer.clone(), sched_id: self.sched_id() } @@ -902,6 +903,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, priv queue: msgq::Producer, + priv sleepy: *AtomicBool, sched_id: uint } From bf237014dbf1ce998b7c62ca2a526437222c9a7c Mon Sep 17 00:00:00 2001 From: Derek Chiang Date: Thu, 13 Feb 2014 22:29:45 -0500 Subject: [PATCH 3/4] Fix some issues --- src/libgreen/lib.rs | 22 ++++++----------- src/libgreen/sched.rs | 57 +++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 43 deletions(-) diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index df5c45cccdb22..1fe84275f83a9 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -355,14 +355,13 @@ impl SchedPool { // Now that we've got all our work queues, create one scheduler per // queue, spawn the scheduler into a thread, and be sure to keep a // handle to the scheduler and the thread to keep them alive. - let mut scheds = ~[]; - for worker in workers.move_iter() { - scheds.push(~Scheduler::new(pool.id, - (pool.factory)(), - worker, - pool.stealers.clone(), - pool.task_state.clone())); - } + let mut scheds = workers.move_iter() + .map(|worker| ~Scheduler::new(pool.id, + (pool.factory)(), + worker, + pool.stealers.clone(), + pool.task_state.clone())) + .to_owned_vec(); // Assign left and right neighbors to each scheduler for i in range(0, scheds.len()) { let left = if i > 0 { i - 1 } else { scheds.len() - 1 }; @@ -370,8 +369,7 @@ impl SchedPool { scheds[i].left_sched = Some(scheds[left].make_handle()); scheds[i].right_sched = Some(scheds[right].make_handle()); } - for sched in scheds.move_iter() { - let mut sched = sched; + for mut sched in scheds.move_iter() { rtdebug!("inserting a regular scheduler"); pool.handles.push(sched.make_handle()); pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); @@ -451,7 +449,6 @@ impl SchedPool { /// native tasks or extern pools will not be waited on pub fn shutdown(mut self) { self.stealers = ~[]; - rterrln!("BP1"); // Wait for everyone to exit. We may have reached a 0-task count // multiple times in the past, meaning there could be several buffered // messages on the `tasks_done` port. We're guaranteed that after *some* @@ -460,16 +457,13 @@ impl SchedPool { while self.task_state.active() { self.tasks_done.recv(); } - rterrln!("BP2"); // Now that everyone's gone, tell everything to shut down. for mut handle in replace(&mut self.handles, ~[]).move_iter() { handle.send(Shutdown); } - rterrln!("BP3"); for thread in replace(&mut self.threads, ~[]).move_iter() { thread.join(); } - rterrln!("BP4"); } } diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index e3cbda6093898..93fe412e89551 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -10,14 +10,13 @@ use std::cast; use std::libc::funcs::posix88::unistd; -use std::num; use std::ptr; use std::rand::{XorShiftRng, Rng, Rand}; use std::rt::local::Local; use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; use std::rt::task::BlockedTask; use std::rt::task::Task; -use std::sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst}; +use std::sync::atomics::{AtomicBool, SeqCst}; use std::sync::deque; use std::unstable::mutex::NativeMutex; use std::unstable::raw; @@ -68,7 +67,7 @@ pub struct Scheduler { /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, - /// We only go to sleep when backoff_counter hits 0. + /// We only go to sleep when backoff_counter hits EXPONENTIAL_BACKOFF_FACTOR. backoff_counter: uint, /// A scheduler only ever tries to wake up its two neighboring neighbors left_sched: Option, @@ -111,20 +110,6 @@ pub struct Scheduler { event_loop: ~EventLoop, } -macro_rules! wakeup_if_sleepy( - ($expr:expr) => { - unsafe { - match $expr { - Some(ref mut sched) if (*sched.sleepy).load(SeqCst) => { - sched.send(Wake); - true - }, - _ => false - } - } - } -) - /// An indication of how hard to work on a given operation, the difference /// mainly being whether memory is synchronized or not #[deriving(Eq)] @@ -170,7 +155,7 @@ impl Scheduler { pool_id: pool_id, message_queue: consumer, message_producer: producer, - sleepy: INIT_ATOMIC_BOOL, + sleepy: AtomicBool::new(false), no_sleep: false, backoff_counter: 0, left_sched: None, @@ -351,15 +336,16 @@ impl Scheduler { if !sched.no_sleep && !sched.sleepy.load(SeqCst) { if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { sched.backoff_counter = 0; - rterrln!("scheduler has no work to do, going to sleep"); + rtdebug!("scheduler has no work to do, going to sleep"); sched.sleepy.store(true, SeqCst); // Since we are sleeping, deactivate the idle callback. sched.idle_callback.get_mut_ref().pause(); + } else { + unsafe { unistd::usleep((1 << sched.backoff_counter) * 1000u as u32); } + sched.backoff_counter += 1; } - unsafe { unistd::usleep(num::pow(2, sched.backoff_counter) as u32); } - sched.backoff_counter += 1; } else { - rterrln!("not sleeping, already doing so or no_sleep set"); + rtdebug!("not sleeping, already doing so or no_sleep set"); // We may not be sleeping, but we still need to deactivate // the idle callback. sched.idle_callback.get_mut_ref().pause(); @@ -429,12 +415,15 @@ impl Scheduler { } Some(Shutdown) => { rtdebug!("shutting down"); - rterrln!("receiving shutdown"); if self.sleepy.load(SeqCst) { - wakeup_if_sleepy!(self.left_sched); - wakeup_if_sleepy!(self.right_sched); - self.left_sched = None; - self.right_sched = None; + match self.left_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; + match self.right_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; } // No more sleeping. After there are no outstanding // event loop references we will shut down. @@ -606,8 +595,8 @@ impl Scheduler { // We've made work available. Notify a // sleeping scheduler. - if !wakeup_if_sleepy!(self.left_sched) { - wakeup_if_sleepy!(self.right_sched); + if !self.left_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched }) { + self.right_sched.mutate(|mut sched| { sched.wakeup_if_sleepy(); sched }); } } @@ -903,6 +892,8 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, priv queue: msgq::Producer, + // Under the current design, a scheduler always outlives the handles + // pointing to it, so it's safe to use an unsafe pointer here priv sleepy: *AtomicBool, sched_id: uint } @@ -912,6 +903,14 @@ impl SchedHandle { self.queue.push(msg); self.remote.fire(); } + + fn wakeup_if_sleepy(&mut self) { + unsafe { + if (*self.sleepy).load(SeqCst) { + self.send(Wake); + } + } + } } struct SchedRunner; From 1c5852c7b33d05fee8ab0acb831b5f39bfb9a532 Mon Sep 17 00:00:00 2001 From: Derek Chiang Date: Tue, 18 Feb 2014 23:43:18 -0500 Subject: [PATCH 4/4] Use UnsafeArc instead of unsafe pointers directly; prevent sleeping when there is active IO --- src/libgreen/sched.rs | 78 +++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 93fe412e89551..cf4be1742a8e8 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -10,12 +10,12 @@ use std::cast; use std::libc::funcs::posix88::unistd; -use std::ptr; use std::rand::{XorShiftRng, Rng, Rand}; use std::rt::local::Local; use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; use std::rt::task::BlockedTask; use std::rt::task::Task; +use std::sync::arc::UnsafeArc; use std::sync::atomics::{AtomicBool, SeqCst}; use std::sync::deque; use std::unstable::mutex::NativeMutex; @@ -63,7 +63,7 @@ pub struct Scheduler { /// not active since there are multiple event sources that may /// wake the scheduler. It just prevents the scheduler from pushing /// multiple handles onto the sleeper list. - sleepy: AtomicBool, + sleepy: UnsafeArc, /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, @@ -155,7 +155,7 @@ impl Scheduler { pool_id: pool_id, message_queue: consumer, message_producer: producer, - sleepy: AtomicBool::new(false), + sleepy: UnsafeArc::new(AtomicBool::new(false)), no_sleep: false, backoff_counter: 0, left_sched: None, @@ -333,22 +333,24 @@ impl Scheduler { // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - if !sched.no_sleep && !sched.sleepy.load(SeqCst) { - if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { - sched.backoff_counter = 0; - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy.store(true, SeqCst); - // Since we are sleeping, deactivate the idle callback. - sched.idle_callback.get_mut_ref().pause(); + unsafe { + if !sched.no_sleep && !(*sched.sleepy.get()).load(SeqCst) { + if sched.backoff_counter == EXPONENTIAL_BACKOFF_FACTOR { + sched.backoff_counter = 0; + rtdebug!("scheduler has no work to do, going to sleep"); + (*sched.sleepy.get()).store(true, SeqCst); + // Since we are sleeping, deactivate the idle callback. + sched.idle_callback.get_mut_ref().pause(); + } else if !sched.event_loop.has_active_io() { + unistd::usleep((1 << sched.backoff_counter) * 1000u as u32); + sched.backoff_counter += 1; + } } else { - unsafe { unistd::usleep((1 << sched.backoff_counter) * 1000u as u32); } - sched.backoff_counter += 1; + rtdebug!("not sleeping, already doing so or no_sleep set"); + // We may not be sleeping, but we still need to deactivate + // the idle callback. + sched.idle_callback.get_mut_ref().pause(); } - } else { - rtdebug!("not sleeping, already doing so or no_sleep set"); - // We may not be sleeping, but we still need to deactivate - // the idle callback. - sched.idle_callback.get_mut_ref().pause(); } // Finished a cycle without using the Scheduler. Place it back @@ -410,26 +412,30 @@ impl Scheduler { (sched, task, true) } Some(Wake) => { - self.sleepy.store(false, SeqCst); - (self, stask, true) + unsafe { + (*self.sleepy.get()).store(false, SeqCst); + (self, stask, true) + } } Some(Shutdown) => { rtdebug!("shutting down"); - if self.sleepy.load(SeqCst) { - match self.left_sched.take() { - Some(mut sched) => sched.wakeup_if_sleepy(), - None => () - }; - match self.right_sched.take() { - Some(mut sched) => sched.wakeup_if_sleepy(), - None => () - }; + unsafe { + if (*self.sleepy.get()).load(SeqCst) { + match self.left_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; + match self.right_sched.take() { + Some(mut sched) => sched.wakeup_if_sleepy(), + None => () + }; + } + // No more sleeping. After there are no outstanding + // event loop references we will shut down. + self.no_sleep = true; + (*self.sleepy.get()).store(false, SeqCst); + (self, stask, true) } - // No more sleeping. After there are no outstanding - // event loop references we will shut down. - self.no_sleep = true; - self.sleepy.store(false, SeqCst); - (self, stask, true) } Some(NewNeighbor(neighbor)) => { self.work_queues.push(neighbor); @@ -868,7 +874,7 @@ impl Scheduler { return SchedHandle { remote: remote, - sleepy: ptr::to_unsafe_ptr(&(self.sleepy)), + sleepy: self.sleepy.clone(), queue: self.message_producer.clone(), sched_id: self.sched_id() } @@ -894,7 +900,7 @@ pub struct SchedHandle { priv queue: msgq::Producer, // Under the current design, a scheduler always outlives the handles // pointing to it, so it's safe to use an unsafe pointer here - priv sleepy: *AtomicBool, + priv sleepy: UnsafeArc, sched_id: uint } @@ -906,7 +912,7 @@ impl SchedHandle { fn wakeup_if_sleepy(&mut self) { unsafe { - if (*self.sleepy).load(SeqCst) { + if (*self.sleepy.get()).load(SeqCst) { self.send(Wake); } }