Skip to content

Commit 6e779c1

Browse files
authored
Replace once_cell with async-lock (#30)
1 parent 8d8e258 commit 6e779c1

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ exclude = ["/.*"]
1616

1717
[dependencies]
1818
async-channel = "1.4.0"
19+
async-lock = "2.6.0"
1920
async-task = "4.0.2"
2021
atomic-waker = "1.0.0"
2122
fastrand = "1.3.4"
2223
futures-lite = "1.11.0"
23-
once_cell = "1.4.1"

src/lib.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ use std::thread;
9393
use std::time::Duration;
9494

9595
use async_channel::{bounded, Receiver};
96+
use async_lock::OnceCell;
9697
use async_task::Runnable;
9798
use atomic_waker::AtomicWaker;
9899
use futures_lite::{future, prelude::*, ready};
99-
use once_cell::sync::Lazy;
100100

101101
#[doc(no_inline)]
102102
pub use async_task::Task;
@@ -113,17 +113,6 @@ const MAX_MAX_THREADS: usize = 10000;
113113
/// Env variable that allows to override default value for max threads.
114114
const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
115115

116-
/// Lazily initialized global executor.
117-
static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor {
118-
inner: Mutex::new(Inner {
119-
idle_count: 0,
120-
thread_count: 0,
121-
queue: VecDeque::new(),
122-
}),
123-
cvar: Condvar::new(),
124-
thread_limit: Executor::max_threads(),
125-
});
126-
127116
/// The blocking executor.
128117
struct Executor {
129118
/// Inner state of the executor.
@@ -162,11 +151,31 @@ impl Executor {
162151
Err(_) => DEFAULT_MAX_THREADS,
163152
}
164153
}
154+
165155
/// Spawns a future onto this executor.
166156
///
167157
/// Returns a [`Task`] handle for the spawned task.
168158
fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
169-
let (runnable, task) = async_task::spawn(future, |r| EXECUTOR.schedule(r));
159+
static EXECUTOR: OnceCell<Executor> = OnceCell::new();
160+
161+
let (runnable, task) = async_task::spawn(future, |r| {
162+
// Initialize the executor if we haven't already.
163+
let executor = EXECUTOR.get_or_init_blocking(|| {
164+
let thread_limit = Self::max_threads();
165+
Executor {
166+
inner: Mutex::new(Inner {
167+
idle_count: 0,
168+
thread_count: 0,
169+
queue: VecDeque::new(),
170+
}),
171+
cvar: Condvar::new(),
172+
thread_limit,
173+
}
174+
});
175+
176+
// Schedule the task on our executor.
177+
executor.schedule(r)
178+
});
170179
runnable.schedule();
171180
task
172181
}
@@ -223,8 +232,7 @@ impl Executor {
223232
fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
224233
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
225234
// already, then be aggressive: wake all idle threads and spawn one more thread.
226-
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < EXECUTOR.thread_limit
227-
{
235+
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < self.thread_limit {
228236
// The new thread starts in idle state.
229237
inner.idle_count += 1;
230238
inner.thread_count += 1;

0 commit comments

Comments
 (0)