diff --git a/src/server.rs b/src/server.rs index c154a5a..8a2147d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -37,7 +37,7 @@ pub enum Operation { pub struct SidekiqServer<'a> { redispool: RedisPool, - threadpool: ThreadPool, + threadpool: Option, pub namespace: String, job_handlers: BTreeMap>, middlewares: Vec>, @@ -48,6 +48,7 @@ pub struct SidekiqServer<'a> { signal_chan: Receiver, worker_info: BTreeMap, // busy? concurrency: usize, + pub stack_size: Option, pub force_quite_timeout: usize, } @@ -57,7 +58,7 @@ impl<'a> SidekiqServer<'a> { pub fn new(redis: &str, concurrency: usize) -> Result { let signal_chan = signal_listen(&[SIGINT, SIGUSR1])?; let now = Utc::now(); - let pool = r2d2::Pool::builder() + let redispool = r2d2::Pool::builder() .max_size(concurrency as u32 + 3) .build(redis::Client::open(redis)?)?; @@ -65,8 +66,8 @@ impl<'a> SidekiqServer<'a> { let identity: Vec = iter::repeat(()).map(|()| rng.sample(distributions::Alphanumeric)).take(12).collect(); Ok(SidekiqServer { - redispool: pool, - threadpool: ThreadPool::with_name("worker".into(), concurrency), + redispool, + threadpool: None, namespace: String::new(), job_handlers: BTreeMap::new(), queues: vec![], @@ -75,6 +76,7 @@ impl<'a> SidekiqServer<'a> { worker_info: BTreeMap::new(), concurrency, signal_chan, + stack_size: None, force_quite_timeout: 10, middlewares: vec![], rs: String::from_utf8_lossy(&identity).to_string(), @@ -96,6 +98,14 @@ impl<'a> SidekiqServer<'a> { } pub fn start(&mut self) { + let mut threadpool = threadpool::Builder::new() + .thread_name("worker".to_string()) + .num_threads(self.concurrency); + if let Some(size) = self.stack_size { + threadpool = threadpool.thread_stack_size(size); + } + self.threadpool = Some(threadpool.build()); + info!("sidekiq is running..."); if self.queues.len() == 0 { error!("queue is empty, exiting"); @@ -140,7 +150,7 @@ impl<'a> SidekiqServer<'a> { if let Ok(Err(e)) = sig.map(|s| self.deal_signal(s)) { error!("error when dealing signal: '{}'", e); } - let worker_count = self.threadpool.active_count(); + let worker_count = self.threadpool.as_ref().unwrap().active_count(); // relaunch workers if they died unexpectly if worker_count < self.concurrency { warn!("worker down, restarting"); @@ -179,7 +189,7 @@ impl<'a> SidekiqServer<'a> { self.middlewares.iter_mut().map(|v| v.cloned()).collect(), self.namespace.clone()); self.worker_info.insert(worker.id.clone(), false); - self.threadpool.execute(move || worker.work()); + self.threadpool.as_ref().unwrap().execute(move || worker.work()); } fn inform_termination(&self, tox: Sender) {