Skip to content

Commit f3537a0

Browse files
authored
Turbopack: Change run once signature to avoid exposing TaskId (#83844)
### What? change signature of run_once to make the behavior more clear simplify stuff
1 parent c0a3cfb commit f3537a0

File tree

10 files changed

+130
-145
lines changed

10 files changed

+130
-145
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use tracing::Instrument;
3232
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
3333
use turbo_rcstr::{RcStr, rcstr};
3434
use turbo_tasks::{
35-
Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef,
36-
ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc,
37-
get_effects,
35+
Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc,
36+
TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects,
3837
message_queue::{CompilationEvent, Severity, TimingEvent},
3938
trace::TraceRawVcs,
4039
};
@@ -465,13 +464,18 @@ pub fn project_new(
465464
.or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e))
466465
.await?;
467466

468-
turbo_tasks.spawn_once_task({
467+
turbo_tasks.start_once_process({
469468
let tt = turbo_tasks.clone();
470-
async move {
471-
benchmark_file_io(tt, container.project().node_root().owned().await?)
472-
.await
473-
.inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO"))
474-
}
469+
Box::pin(async move {
470+
let future = async move {
471+
benchmark_file_io(tt, container.project().node_root().owned().await?).await
472+
};
473+
if let Err(err) = future.await {
474+
// TODO Not ideal to print directly to stdout.
475+
// We should use a compilation event instead to report async errors.
476+
println!("Failed to benchmark file IO: {err}");
477+
}
478+
})
475479
});
476480

477481
Ok(External::new(ProjectInstance {
@@ -519,10 +523,7 @@ impl CompilationEvent for SlowFilesystemEvent {
519523
/// - https://x.com/jarredsumner/status/1637549427677364224
520524
/// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038
521525
#[tracing::instrument(skip(turbo_tasks))]
522-
async fn benchmark_file_io(
523-
turbo_tasks: NextTurboTasks,
524-
directory: FileSystemPath,
525-
) -> Result<Vc<Completion>> {
526+
async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> {
526527
// try to get the real file path on disk so that we can use it with tokio
527528
let fs = ResolvedVc::try_downcast_type::<DiskFileSystem>(directory.fs)
528529
.context(anyhow!(
@@ -567,7 +568,7 @@ async fn benchmark_file_io(
567568
}));
568569
}
569570

570-
Ok(Completion::new())
571+
Ok(())
571572
}
572573

573574
#[napi]

turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn scope_stress(c: &mut Criterion) {
@@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) {
4747
.map(|(a, b)| {
4848
let tt = &tt;
4949
async move {
50-
let task = tt.spawn_once_task(async move {
50+
tt.run_once(async move {
5151
rectangle(a, b).strongly_consistent().await?;
52-
Ok::<Vc<()>, _>(Default::default())
53-
});
54-
tt.wait_task_completion(task, ReadConsistency::Eventual)
55-
.await
52+
Ok(())
53+
})
54+
.await
5655
}
5756
})
5857
.try_join()

turbopack/crates/turbo-tasks-backend/benches/stress.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn fibonacci(c: &mut Criterion) {
@@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) {
3737
noop_backing_storage(),
3838
));
3939
async move {
40-
let task = tt.spawn_once_task(async move {
40+
tt.run_once(async move {
4141
// Number of tasks:
4242
// 1 root task
4343
// size >= 1 => + fib(0) = 1
4444
// size >= 2 => + fib(1) = 2
4545
(0..size).map(|i| fib(i, i)).try_join().await?;
46-
Ok::<Vc<()>, _>(Default::default())
47-
});
48-
tt.wait_task_completion(task, ReadConsistency::Eventual)
49-
.await
50-
.unwrap();
51-
tt
46+
Ok(())
47+
})
48+
.await
49+
.unwrap();
5250
}
5351
})
5452
});

turbopack/crates/turbo-tasks-testing/src/lib.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
future::Future,
88
mem::replace,
99
panic::AssertUnwindSafe,
10+
pin::Pin,
1011
sync::{Arc, Mutex, Weak},
1112
};
1213

@@ -120,22 +121,30 @@ impl TurboTasksCallApi for VcStorage {
120121
fn run_once(
121122
&self,
122123
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123-
) -> TaskId {
124+
) -> Pin<
125+
Box<
126+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
127+
>,
128+
> {
124129
unreachable!()
125130
}
126131

127132
fn run_once_with_reason(
128133
&self,
129134
_reason: StaticOrArc<dyn InvalidationReason>,
130135
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131-
) -> TaskId {
136+
) -> Pin<
137+
Box<
138+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
139+
>,
140+
> {
132141
unreachable!()
133142
}
134143

135-
fn run_once_process(
144+
fn start_once_process(
136145
&self,
137-
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138-
) -> TaskId {
146+
_future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
147+
) {
139148
unreachable!()
140149
}
141150
}

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,13 @@ pub trait TurboTasksCallApi: Sync + Send {
7777
fn run_once(
7878
&self,
7979
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80-
) -> TaskId;
80+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8181
fn run_once_with_reason(
8282
&self,
8383
reason: StaticOrArc<dyn InvalidationReason>,
8484
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85-
) -> TaskId;
86-
fn run_once_process(
87-
&self,
88-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89-
) -> TaskId;
85+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
86+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
9087
}
9188

9289
/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
@@ -493,7 +490,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
493490
/// Creates a new root task, that is only executed once.
494491
/// Dependencies will not invalidate the task.
495492
#[track_caller]
496-
pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
493+
fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
497494
where
498495
T: ?Sized,
499496
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
@@ -533,6 +530,21 @@ impl<B: Backend + 'static> TurboTasks<B> {
533530
Ok(rx.await?)
534531
}
535532

533+
pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
534+
let this = self.pin();
535+
tokio::spawn(async move {
536+
this.pin()
537+
.run_once(async move {
538+
this.finish_foreground_job();
539+
future.await;
540+
this.begin_foreground_job();
541+
Ok(())
542+
})
543+
.await
544+
.unwrap()
545+
});
546+
}
547+
536548
pub(crate) fn native_call(
537549
&self,
538550
native_fn: &'static NativeFunction,
@@ -1109,41 +1121,28 @@ impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
11091121
fn run_once(
11101122
&self,
11111123
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1112-
) -> TaskId {
1113-
self.spawn_once_task(async move {
1114-
future.await?;
1115-
Ok(Completion::new())
1116-
})
1124+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1125+
let this = self.pin();
1126+
Box::pin(async move { this.run_once(future).await })
11171127
}
11181128

11191129
#[track_caller]
11201130
fn run_once_with_reason(
11211131
&self,
11221132
reason: StaticOrArc<dyn InvalidationReason>,
11231133
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1124-
) -> TaskId {
1134+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
11251135
{
11261136
let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
11271137
reason_set.insert(reason);
11281138
}
1129-
self.spawn_once_task(async move {
1130-
future.await?;
1131-
Ok(Completion::new())
1132-
})
1139+
let this = self.pin();
1140+
Box::pin(async move { this.run_once(future).await })
11331141
}
11341142

11351143
#[track_caller]
1136-
fn run_once_process(
1137-
&self,
1138-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1139-
) -> TaskId {
1140-
let this = self.pin();
1141-
self.spawn_once_task(async move {
1142-
this.finish_foreground_job();
1143-
future.await?;
1144-
this.begin_foreground_job();
1145-
Ok(Completion::new())
1146-
})
1144+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1145+
self.start_once_process(future)
11471146
}
11481147
}
11491148

@@ -1428,18 +1427,13 @@ pub async fn run_once<T: Send + 'static>(
14281427
) -> Result<T> {
14291428
let (tx, rx) = tokio::sync::oneshot::channel();
14301429

1431-
let task_id = tt.run_once(Box::pin(async move {
1430+
tt.run_once(Box::pin(async move {
14321431
let result = future.await?;
14331432
tx.send(result)
14341433
.map_err(|_| anyhow!("unable to send result"))?;
14351434
Ok(())
1436-
}));
1437-
1438-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1439-
// track a dependency
1440-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1441-
let raw_future = raw_result.into_read().untracked();
1442-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1435+
}))
1436+
.await?;
14431437

14441438
Ok(rx.await?)
14451439
}
@@ -1451,21 +1445,16 @@ pub async fn run_once_with_reason<T: Send + 'static>(
14511445
) -> Result<T> {
14521446
let (tx, rx) = tokio::sync::oneshot::channel();
14531447

1454-
let task_id = tt.run_once_with_reason(
1448+
tt.run_once_with_reason(
14551449
(Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
14561450
Box::pin(async move {
14571451
let result = future.await?;
14581452
tx.send(result)
14591453
.map_err(|_| anyhow!("unable to send result"))?;
14601454
Ok(())
14611455
}),
1462-
);
1463-
1464-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1465-
// track a dependency
1466-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1467-
let raw_future = raw_result.into_read().untracked();
1468-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1456+
)
1457+
.await?;
14691458

14701459
Ok(rx.await?)
14711460
}

0 commit comments

Comments
 (0)