Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/derive/src/tc_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ fn parse_knobs(mut input: ItemFn, config: FinalConfig) -> TokenStream {

let mut tc_builder = quote_spanned! {last_stmt_start_span=>
#crate_path::TaskCenterBuilder::default()
.ingress_runtime_handle(rt.handle().clone())
.default_runtime_handle(rt.handle().clone())
};
let mut rt = match config.flavor {
Expand Down
19 changes: 6 additions & 13 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ impl TaskCenter {
Self::with_current(|tc| tc.spawn_child(kind, name, future))
}

/// An unmanaged task is one that is not automatically cancelled by the task center on
/// shutdown. Moreover, the task ID will not be registered with task center and therefore
/// cannot be "taken" by calling [`TaskCenter::take_task`].
#[track_caller]
pub fn spawn_unmanaged<F, T>(
kind: TaskKind,
Expand Down Expand Up @@ -285,16 +288,13 @@ struct TaskCenterInner {
#[allow(dead_code)]
pause_time: bool,
default_runtime_handle: tokio::runtime::Handle,
ingress_runtime_handle: tokio::runtime::Handle,
managed_runtimes: Mutex<HashMap<SharedString, OwnedRuntimeHandle>>,
start_time: Instant,
/// We hold on to the owned Runtime to ensure it's dropped when task center is dropped. If this
/// is None, it means that it's the responsibility of the Handle owner to correctly drop
/// tokio's runtime after dropping the task center.
#[allow(dead_code)]
default_runtime: Option<tokio::runtime::Runtime>,
#[allow(dead_code)]
ingress_runtime: Option<tokio::runtime::Runtime>,
global_cancel_token: CancellationToken,
shutdown_requested: AtomicBool,
current_exit_code: AtomicI32,
Expand All @@ -308,9 +308,7 @@ struct TaskCenterInner {
impl TaskCenterInner {
fn new(
default_runtime_handle: tokio::runtime::Handle,
ingress_runtime_handle: tokio::runtime::Handle,
default_runtime: Option<tokio::runtime::Runtime>,
ingress_runtime: Option<tokio::runtime::Runtime>,
// used in tests to start all runtimes with clock paused. Note that this only impacts
// partition processor runtimes
pause_time: bool,
Expand All @@ -328,8 +326,6 @@ impl TaskCenterInner {
start_time: Instant::now(),
default_runtime_handle,
default_runtime,
ingress_runtime_handle,
ingress_runtime,
global_cancel_token: CancellationToken::new(),
shutdown_requested: AtomicBool::new(false),
current_exit_code: AtomicI32::new(0),
Expand Down Expand Up @@ -706,7 +702,6 @@ impl TaskCenterInner {
};

dump("default", &self.default_runtime_handle).await;
dump("ingress", &self.ingress_runtime_handle).await;
for (name, handle) in managed_runtimes {
dump(&name, &handle).await;
}
Expand Down Expand Up @@ -777,7 +772,6 @@ impl TaskCenterInner {
let runtime = match kind.runtime() {
crate::AsyncRuntime::Inherit => &tokio::runtime::Handle::current(),
crate::AsyncRuntime::Default => &self.default_runtime_handle,
crate::AsyncRuntime::Ingress => &self.ingress_runtime_handle,
};
let inner_handle = tokio_task
.spawn_on(fut, runtime)
Expand Down Expand Up @@ -880,10 +874,10 @@ impl TaskCenterInner {
self.cancel_tasks(Some(TaskKind::ClusterController), None)
.await;
// stop accepting ingress
self.cancel_tasks(Some(TaskKind::IngressServer), None).await;
// PPM will shutdown running processors
self.cancel_tasks(Some(TaskKind::PartitionProcessorManager), None)
self.cancel_tasks(Some(TaskKind::HttpIngressRole), None)
.await;
// Worker will shutdown running processors
self.cancel_tasks(Some(TaskKind::WorkerRole), None).await;
self.initiate_managed_runtimes_shutdown();
// Ask bifrost to shutdown providers and loglets
self.cancel_tasks(Some(TaskKind::BifrostWatchdog), None)
Expand Down Expand Up @@ -1130,7 +1124,6 @@ mod tests {
let tc = TaskCenterBuilder::default()
.options(common_opts)
.default_runtime_handle(tokio::runtime::Handle::current())
.ingress_runtime_handle(tokio::runtime::Handle::current())
.build()?
.into_handle();
let start = tokio::time::Instant::now();
Expand Down
24 changes: 0 additions & 24 deletions crates/core/src/task_center/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ pub enum TaskCenterBuildError {
pub struct TaskCenterBuilder {
default_runtime_handle: Option<tokio::runtime::Handle>,
default_runtime: Option<tokio::runtime::Runtime>,
ingress_runtime_handle: Option<tokio::runtime::Handle>,
ingress_runtime: Option<tokio::runtime::Runtime>,
options: Option<CommonOptions>,
pause_time: bool,
}
Expand All @@ -43,12 +41,6 @@ impl TaskCenterBuilder {
self
}

pub fn ingress_runtime_handle(mut self, handle: tokio::runtime::Handle) -> Self {
self.ingress_runtime_handle = Some(handle);
self.ingress_runtime = None;
self
}

pub fn options(mut self, options: CommonOptions) -> Self {
self.options = Some(options);
self
Expand All @@ -60,20 +52,13 @@ impl TaskCenterBuilder {
self
}

pub fn ingress_runtime(mut self, runtime: tokio::runtime::Runtime) -> Self {
self.ingress_runtime_handle = Some(runtime.handle().clone());
self.ingress_runtime = Some(runtime);
self
}

pub fn pause_time(mut self, pause_time: bool) -> Self {
self.pause_time = pause_time;
self
}

pub fn default_for_tests() -> Self {
Self::default()
.ingress_runtime_handle(tokio::runtime::Handle::current())
.default_runtime_handle(tokio::runtime::Handle::current())
.pause_time(true)
}
Expand All @@ -87,21 +72,12 @@ impl TaskCenterBuilder {
self.default_runtime = Some(default_runtime);
}

if self.ingress_runtime_handle.is_none() {
let mut ingress_runtime_builder = tokio_builder("ingress", &options);
let ingress_runtime = ingress_runtime_builder.build()?;
self.ingress_runtime_handle = Some(ingress_runtime.handle().clone());
self.ingress_runtime = Some(ingress_runtime);
}

if cfg!(any(test, feature = "test-util")) {
eprintln!("!!!! Running with test-util enabled !!!!");
}
Ok(OwnedHandle::new(TaskCenterInner::new(
self.default_runtime_handle.unwrap(),
self.ingress_runtime_handle.unwrap(),
self.default_runtime,
self.ingress_runtime,
self.pause_time,
)))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ impl Handle {
self.inner.spawn_child(kind, name, future)
}

/// An unmanaged task is one that is not automatically cancelled by the task center on
/// shutdown. Moreover, the task ID will not be registered with task center and therefore
/// cannot be "taken" by calling [`TaskCenter::take_task`].
pub fn spawn_unmanaged<F, T>(
&self,
kind: TaskKind,
Expand Down
7 changes: 0 additions & 7 deletions crates/core/src/task_center/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use super::Handle;
pub trait TaskCenterMonitoring {
fn default_runtime_metrics(&self) -> RuntimeMetrics;

fn ingress_runtime_metrics(&self) -> RuntimeMetrics;

fn managed_runtime_metrics(&self) -> Vec<(SharedString, RuntimeMetrics)>;

/// How long has the task-center been running?
Expand All @@ -36,10 +34,6 @@ impl TaskCenterMonitoring for Handle {
self.inner.default_runtime_handle.metrics()
}

fn ingress_runtime_metrics(&self) -> RuntimeMetrics {
self.inner.ingress_runtime_handle.metrics()
}

fn managed_runtime_metrics(&self) -> Vec<(SharedString, RuntimeMetrics)> {
let guard = self.inner.managed_runtimes.lock();
guard
Expand All @@ -56,7 +50,6 @@ impl TaskCenterMonitoring for Handle {
/// Submit telemetry for all runtimes to metrics recorder
fn submit_metrics(&self) {
submit_runtime_metrics("default", self.default_runtime_metrics());
submit_runtime_metrics("ingress", self.ingress_runtime_metrics());

// Partition processor runtimes
let processor_runtimes = self.managed_runtime_metrics();
Expand Down
56 changes: 55 additions & 1 deletion crates/core/src/task_center/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::task::{Poll, ready};

use futures::FutureExt;
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::{CancellationToken, DropGuard};

use restate_types::SharedString;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -123,6 +123,14 @@ pub struct TaskHandle<T> {
}

impl<T> TaskHandle<T> {
/// Returns a [`TaskGuard`] guard that will automatically cancel the task when dropped.
pub fn into_guard(self) -> TaskGuard<T> {
TaskGuard {
drop_guard: self.cancellation_token.drop_guard(),
inner_handle: self.inner_handle,
}
}

/// Abort the task immediately. This will abort the task at the next yielding point. If the
/// task is running a blocking call, it'll not be aborted until it can yield to the runtime.
pub fn abort(&self) {
Expand All @@ -145,6 +153,12 @@ impl<T> TaskHandle<T> {
pub fn is_finished(&self) -> bool {
self.inner_handle.is_finished()
}

/// Cancels the task and waits for it to complete gracefully.
pub async fn cancel_and_wait(self) -> Result<T, ShutdownError> {
self.cancel();
self.await
}
}

impl<T> std::future::Future for TaskHandle<T> {
Expand All @@ -157,3 +171,43 @@ impl<T> std::future::Future for TaskHandle<T> {
}
}
}

/// Like [`TaskHandle`] but with a guard that will automatically cancel() the task when dropped.
///
/// You can convert this back to a handle by calling [`TaskGuard::into_handle`] which will disarm
/// the guard. Additionally, you call [`TaskGuard::cancel_and_wait`] to cancel the task and wait
/// for it to complete.
///
/// Like TaskHandle, awaiting this future waits for the task to complete.
#[must_use = "task is cancelled when guard is dropped"]
pub struct TaskGuard<T> {
pub(crate) drop_guard: DropGuard,
pub(crate) inner_handle: tokio::task::JoinHandle<T>,
}

impl<T> TaskGuard<T> {
/// Disarms the guard and returns a handle to the task.
pub fn into_handle(self) -> TaskHandle<T> {
TaskHandle {
cancellation_token: self.drop_guard.disarm(),
inner_handle: self.inner_handle,
}
}

/// Cancels the task and waits for it to complete gracefully.
pub async fn cancel_and_wait(self) -> Result<T, ShutdownError> {
let handle = self.into_handle();
handle.cancel_and_wait().await
}
}

impl<T> std::future::Future for TaskGuard<T> {
type Output = Result<T, ShutdownError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match ready!(self.inner_handle.poll_unpin(cx)) {
Ok(v) => Poll::Ready(Ok(v)),
Err(_) => Poll::Ready(Err(ShutdownError)),
}
}
}
10 changes: 4 additions & 6 deletions crates/core/src/task_center/task_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ pub enum TaskKind {
H2ClientStream,
/// A type for ingress until we start enforcing timeouts for inflight requests. This enables us
/// to shut down cleanly without waiting indefinitely.
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
IngressServer,
#[strum(props(OnCancel = "abort"))]
HttpIngressRole,
WorkerRole,
RoleRunner,
/// Cluster controller is the first thing that gets stopped when the server is shut down
ClusterController,
#[strum(props(runtime = "default"))]
FailureDetector,
SystemService,
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
#[strum(props(OnCancel = "abort"))]
Ingress,
/// Kafka ingestion related task
Kafka,
Expand Down Expand Up @@ -182,7 +183,6 @@ impl TaskKind {
match self.get_str("runtime").unwrap_or("inherit") {
"inherit" => AsyncRuntime::Inherit,
"default" => AsyncRuntime::Default,
"ingress" => AsyncRuntime::Ingress,
_ => panic!("Invalid runtime for task kind: {self}"),
}
}
Expand All @@ -198,6 +198,4 @@ pub enum AsyncRuntime {
Inherit,
/// Run on the default runtime
Default,
/// Run on ingress runtime
Ingress,
}
16 changes: 5 additions & 11 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,7 @@ where
ChannelStatusReader(self.status_tx.clone())
}

pub async fn run(
self,
mut updateable_options: impl LiveLoad<Live = InvokerOptions>,
) -> anyhow::Result<()> {
pub async fn run(self, mut updateable_options: impl LiveLoad<Live = InvokerOptions>) {
debug!("Starting the invoker");
let Service {
tmp_dir,
Expand Down Expand Up @@ -302,7 +299,6 @@ where

// Wait for all the tasks to shutdown
service.invocation_tasks.shutdown().await;
Ok(())
}
}

Expand Down Expand Up @@ -1657,12 +1653,13 @@ mod tests {

let mut handle = service.handle();

let invoker_task_id = TaskCenter::spawn(
let invoker_task = TaskCenter::spawn_unmanaged(
TaskKind::SystemService,
"invoker",
service.run(Constant::new(invoker_options)),
)
.unwrap();
.unwrap()
.into_guard();

let partition_leader_epoch = (PartitionId::from(0), LeaderEpoch::INITIAL);
let invocation_target = InvocationTarget::mock_service();
Expand Down Expand Up @@ -1695,10 +1692,7 @@ mod tests {
// the invocation and we won't see a result for the invocation (failure because the deployment cannot be resolved).
check!(let Some(_) = output_rx.recv().await);

TaskCenter::cancel_task(invoker_task_id)
.unwrap()
.await
.unwrap();
invoker_task.cancel_and_wait().await.unwrap();
}

#[test(restate_core::test)]
Expand Down
11 changes: 6 additions & 5 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,7 @@ impl Node {
Some(
WorkerRole::create(
tc.health().worker_status(),
metadata.clone(),
PartitionRouting::new(replica_set_states.clone(), tc.clone()),
replica_set_states.clone(),
updateable_config.clone(),
&mut router_builder,
networking.clone(),
bifrost_svc.handle(),
Expand Down Expand Up @@ -507,11 +504,15 @@ impl Node {
}

if let Some(ingress_role) = self.ingress_role {
TaskCenter::spawn(TaskKind::IngressServer, "ingress-http", ingress_role.run())?;
TaskCenter::spawn(
TaskKind::HttpIngressRole,
"ingress-http",
ingress_role.run(),
)?;
}

if let Some(worker_role) = self.worker_role {
TaskCenter::spawn(TaskKind::SystemBoot, "worker-init", worker_role.start())?;
worker_role.start()?;
}

if let Some(admin_role) = self.admin_role {
Expand Down
Loading
Loading