Skip to content

[Flight] Use cacheController instead of abortListeners for Streams #33633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 25, 2025
152 changes: 59 additions & 93 deletions packages/react-server/src/ReactFlightServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ export type Request = {
nextChunkId: number,
pendingChunks: number,
hints: Hints,
abortListeners: Set<(reason: mixed) => void>,
abortableTasks: Set<Task>,
pingedTasks: Array<Task>,
completedImportChunks: Array<Chunk>,
Expand Down Expand Up @@ -547,7 +546,6 @@ function RequestInstance(
this.nextChunkId = 0;
this.pendingChunks = 0;
this.hints = hints;
this.abortListeners = new Set();
this.abortableTasks = abortSet;
this.pingedTasks = pingedTasks;
this.completedImportChunks = ([]: Array<Chunk>);
Expand Down Expand Up @@ -839,13 +837,11 @@ function serializeThenable(
if (request.status === ABORTING) {
// We can no longer accept any resolved values
request.abortableTasks.delete(newTask);
newTask.status = ABORTED;
if (enableHalt && request.type === PRERENDER) {
request.pendingChunks--;
haltTask(newTask, request);
} else {
const errorId: number = (request.fatalError: any);
const model = stringify(serializeByValueID(errorId));
emitModelChunk(request, newTask.id, model);
abortTask(newTask, request, errorId);
}
return newTask.id;
}
Expand Down Expand Up @@ -936,29 +932,26 @@ function serializeReadableStream(
__DEV__ ? task.debugStack : null,
__DEV__ ? task.debugTask : null,
);
request.abortableTasks.delete(streamTask);

request.pendingChunks++; // The task represents the Start row. This adds a Stop row.

// The task represents the Stop row. This adds a Start row.
request.pendingChunks++;
const startStreamRow =
streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n';
request.completedRegularChunks.push(stringToChunk(startStreamRow));

// There's a race condition between when the stream is aborted and when the promise
// resolves so we track whether we already aborted it to avoid writing twice.
let aborted = false;
function progress(entry: {done: boolean, value: ReactClientValue, ...}) {
if (aborted) {
if (streamTask.status !== PENDING) {
return;
}

if (entry.done) {
streamTask.status = COMPLETED;
const endStreamRow = streamTask.id.toString(16) + ':C\n';
request.completedRegularChunks.push(stringToChunk(endStreamRow));
request.abortableTasks.delete(streamTask);
request.cacheController.signal.removeEventListener('abort', abortStream);
enqueueFlush(request);
request.abortListeners.delete(abortStream);
callOnAllReadyIfReady(request);
aborted = true;
} else {
try {
streamTask.model = entry.value;
Expand All @@ -972,34 +965,36 @@ function serializeReadableStream(
}
}
function error(reason: mixed) {
if (aborted) {
if (streamTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortStream);
request.cacheController.signal.removeEventListener('abort', abortStream);
erroredTask(request, streamTask, reason);
enqueueFlush(request);

// $FlowFixMe should be able to pass mixed
reader.cancel(reason).then(error, error);
}
function abortStream(reason: mixed) {
if (aborted) {
function abortStream() {
if (streamTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortStream);
const signal = request.cacheController.signal;
signal.removeEventListener('abort', abortStream);
const reason = signal.reason;
if (enableHalt && request.type === PRERENDER) {
request.pendingChunks--;
haltTask(streamTask, request);
request.abortableTasks.delete(streamTask);
} else {
// TODO: Make this use abortTask() instead.
erroredTask(request, streamTask, reason);
enqueueFlush(request);
}
// $FlowFixMe should be able to pass mixed
reader.cancel(reason).then(error, error);
}

request.abortListeners.add(abortStream);
request.cacheController.signal.addEventListener('abort', abortStream);
reader.read().then(progress, error);
return serializeByValueID(streamTask.id);
}
Expand Down Expand Up @@ -1028,10 +1023,9 @@ function serializeAsyncIterable(
__DEV__ ? task.debugStack : null,
__DEV__ ? task.debugTask : null,
);
request.abortableTasks.delete(streamTask);

request.pendingChunks++; // The task represents the Start row. This adds a Stop row.

// The task represents the Stop row. This adds a Start row.
request.pendingChunks++;
const startStreamRow =
streamTask.id.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n';
request.completedRegularChunks.push(stringToChunk(startStreamRow));
Expand All @@ -1043,19 +1037,17 @@ function serializeAsyncIterable(
}
}

// There's a race condition between when the stream is aborted and when the promise
// resolves so we track whether we already aborted it to avoid writing twice.
let aborted = false;
function progress(
entry:
| {done: false, +value: ReactClientValue, ...}
| {done: true, +value: ReactClientValue, ...},
) {
if (aborted) {
if (streamTask.status !== PENDING) {
return;
}

if (entry.done) {
streamTask.status = COMPLETED;
let endStreamRow;
if (entry.value === undefined) {
endStreamRow = streamTask.id.toString(16) + ':C\n';
Expand All @@ -1075,10 +1067,13 @@ function serializeAsyncIterable(
}
}
request.completedRegularChunks.push(stringToChunk(endStreamRow));
request.abortableTasks.delete(streamTask);
request.cacheController.signal.removeEventListener(
'abort',
abortIterable,
);
enqueueFlush(request);
request.abortListeners.delete(abortIterable);
callOnAllReadyIfReady(request);
aborted = true;
} else {
try {
streamTask.model = entry.value;
Expand All @@ -1097,11 +1092,10 @@ function serializeAsyncIterable(
}
}
function error(reason: mixed) {
if (aborted) {
if (streamTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortIterable);
request.cacheController.signal.removeEventListener('abort', abortIterable);
erroredTask(request, streamTask, reason);
enqueueFlush(request);
if (typeof (iterator: any).throw === 'function') {
Expand All @@ -1110,16 +1104,19 @@ function serializeAsyncIterable(
iterator.throw(reason).then(error, error);
}
}
function abortIterable(reason: mixed) {
if (aborted) {
function abortIterable() {
if (streamTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortIterable);
const signal = request.cacheController.signal;
signal.removeEventListener('abort', abortIterable);
const reason = signal.reason;
if (enableHalt && request.type === PRERENDER) {
request.pendingChunks--;
haltTask(streamTask, request);
request.abortableTasks.delete(streamTask);
} else {
erroredTask(request, streamTask, reason);
// TODO: Make this use abortTask() instead.
erroredTask(request, streamTask, signal.reason);
enqueueFlush(request);
}
if (typeof (iterator: any).throw === 'function') {
Expand All @@ -1128,7 +1125,7 @@ function serializeAsyncIterable(
iterator.throw(reason).then(error, error);
}
}
request.abortListeners.add(abortIterable);
request.cacheController.signal.addEventListener('abort', abortIterable);
if (__DEV__) {
callIteratorInDEV(iterator, progress, error);
} else {
Expand Down Expand Up @@ -2675,16 +2672,14 @@ function serializeBlob(request: Request, blob: Blob): string {

const reader = blob.stream().getReader();

let aborted = false;
function progress(
entry: {done: false, value: Uint8Array} | {done: true, value: void},
): Promise<void> | void {
if (aborted) {
if (newTask.status !== PENDING) {
return;
}
if (entry.done) {
request.abortListeners.delete(abortBlob);
aborted = true;
request.cacheController.signal.removeEventListener('abort', abortBlob);
pingTask(request, newTask);
return;
}
Expand All @@ -2694,33 +2689,34 @@ function serializeBlob(request: Request, blob: Blob): string {
return reader.read().then(progress).catch(error);
}
function error(reason: mixed) {
if (aborted) {
if (newTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortBlob);
request.cacheController.signal.removeEventListener('abort', abortBlob);
erroredTask(request, newTask, reason);
enqueueFlush(request);
// $FlowFixMe should be able to pass mixed
reader.cancel(reason).then(error, error);
}
function abortBlob(reason: mixed) {
if (aborted) {
function abortBlob() {
if (newTask.status !== PENDING) {
return;
}
aborted = true;
request.abortListeners.delete(abortBlob);
const signal = request.cacheController.signal;
signal.removeEventListener('abort', abortBlob);
const reason = signal.reason;
if (enableHalt && request.type === PRERENDER) {
request.pendingChunks--;
haltTask(newTask, request);
} else {
// TODO: Make this use abortTask() instead.
erroredTask(request, newTask, reason);
enqueueFlush(request);
}
// $FlowFixMe should be able to pass mixed
reader.cancel(reason).then(error, error);
}

request.abortListeners.add(abortBlob);
request.cacheController.signal.addEventListener('abort', abortBlob);

// $FlowFixMe[incompatible-call]
reader.read().then(progress).catch(error);
Expand Down Expand Up @@ -5005,16 +5001,15 @@ function retryTask(request: Request, task: Task): void {
} catch (thrownValue) {
if (request.status === ABORTING) {
request.abortableTasks.delete(task);
task.status = ABORTED;
task.status = PENDING;
if (enableHalt && request.type === PRERENDER) {
// When aborting a prerener with halt semantics we don't emit
// anything into the slot for a task that aborts, it remains unresolved
request.pendingChunks--;
haltTask(task, request);
} else {
// Otherwise we emit an error chunk into the task slot.
const errorId: number = (request.fatalError: any);
const model = stringify(serializeByValueID(errorId));
emitModelChunk(request, task.id, model);
abortTask(task, request, errorId);
}
return;
}
Expand Down Expand Up @@ -5257,8 +5252,9 @@ function enqueueFlush(request: Request): void {
}

function callOnAllReadyIfReady(request: Request): void {
if (request.abortableTasks.size === 0 && request.abortListeners.size === 0) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to need to check the listener count because it's what kept the stream alive while we were waiting on entries from the stream. Now I just let the task stay open and let the task be abortable which just logs an error for the stream.

request.onAllReady();
if (request.abortableTasks.size === 0) {
const onAllReady = request.onAllReady;
onAllReady();
}
}

Expand Down Expand Up @@ -5294,6 +5290,7 @@ export function abort(request: Request, reason: mixed): void {
if (request.status <= OPEN) {
request.status = ABORTING;
request.cacheController.abort(reason);
callOnAllReadyIfReady(request);
}
const abortableTasks = request.abortableTasks;
if (abortableTasks.size > 0) {
Expand Down Expand Up @@ -5345,37 +5342,6 @@ export function abort(request: Request, reason: mixed): void {
callOnAllReadyIfReady(request);
}
}
const abortListeners = request.abortListeners;
if (abortListeners.size > 0) {
let error;
if (
enablePostpone &&
typeof reason === 'object' &&
reason !== null &&
(reason: any).$$typeof === REACT_POSTPONE_TYPE
) {
// We aborted with a Postpone but since we're passing this to an
// external handler, passing this object would leak it outside React.
// We create an alternative reason for it instead.
error = new Error('The render was aborted due to being postponed.');
} else {
error =
reason === undefined
? new Error(
'The render was aborted by the server without a reason.',
)
: typeof reason === 'object' &&
reason !== null &&
typeof reason.then === 'function'
? new Error(
'The render was aborted by the server with a promise.',
)
: reason;
}
abortListeners.forEach(callback => callback(error));
abortListeners.clear();
callOnAllReadyIfReady(request);
}
if (request.destination !== null) {
flushCompletedChunks(request, request.destination);
}
Expand Down
Loading