diff --git a/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts b/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts index fc38dddac0e1e..deaa19a0a2285 100644 --- a/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts +++ b/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts @@ -80,20 +80,11 @@ async function render(request: NextRequest, event: NextFetchEvent) { response.headers.append('Vary', RSC_VARY_HEADER) const writer = tranform.writable.getWriter() - const target = { + result.pipe({ write: (chunk: Uint8Array) => writer.write(chunk), end: () => writer.close(), destroy: (reason?: Error) => writer.abort(reason), - closed: false, - } - const onClose = () => { - target.closed = true - } - // No, this cannot be replaced with `finally`, because early cancelling - // the stream will create a rejected promise, and finally will create an - // unhandled rejection. - writer.closed.then(onClose, onClose) - result.pipe(target) + }) return response } diff --git a/packages/next/src/server/render-result.ts b/packages/next/src/server/render-result.ts index 53b01e9906c5e..6975732502bcd 100644 --- a/packages/next/src/server/render-result.ts +++ b/packages/next/src/server/render-result.ts @@ -16,7 +16,6 @@ export interface PipeTarget { end: () => unknown flush?: () => unknown destroy: (err?: Error) => unknown - get closed(): boolean } export default class RenderResult { @@ -112,25 +111,31 @@ export default class RenderResult { : () => {} const reader = this.response.getReader() + let shouldFatalError = false try { - while (true) { - const result = await reader.read() - - if (res.closed || result.done) { - break - } + let result = await reader.read() + if (!result.done) { + // As we're going to write to the response, we should destroy the + // response if an error occurs. + shouldFatalError = true + } + while (!result.done) { // Write the data to the response. res.write(result.value) // Flush it to the client (if it supports flushing). flush() + + // Read the next chunk. + result = await reader.read() } // We're done writing to the response, so we can end it. res.end() } catch (err) { - if (!res.closed) { + // If we've written to the response, we should destroy it. + if (shouldFatalError) { res.destroy(err as any) } diff --git a/packages/next/src/server/send-response.ts b/packages/next/src/server/send-response.ts index 1312885d72b13..a722f74bcfb49 100644 --- a/packages/next/src/server/send-response.ts +++ b/packages/next/src/server/send-response.ts @@ -49,7 +49,6 @@ export async function sendResponse( const iterator = consumeUint8ArrayReadableStream(response.body) try { for await (const chunk of iterator) { - if (originalResponse.closed) break originalResponse.write(chunk) } } finally { diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index 9c19e6c01ae3e..7f6312965902e 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -25,9 +25,8 @@ export const streamToBufferedResult = async ( }, end() {}, destroy() {}, - closed: false, } - await renderResult.pipe(writable) + await renderResult.pipe(writable as any) return renderChunks.join('') } diff --git a/packages/next/src/server/web-server.ts b/packages/next/src/server/web-server.ts index 1151a6be0615d..63004cd1f553e 100644 --- a/packages/next/src/server/web-server.ts +++ b/packages/next/src/server/web-server.ts @@ -421,20 +421,14 @@ export default class NextWebServer extends BaseServer { if (options.result.isDynamic) { const writer = res.transformStream.writable.getWriter() - const target = { + options.result.pipe({ write: (chunk: Uint8Array) => writer.write(chunk), end: () => writer.close(), destroy: (err: Error) => writer.abort(err), - closed: false, - } as any - const onClose = () => { - target.closed = true - } - // No, this cannot be replaced with `finally`, because early cancelling - // the stream will create a rejected promise, and finally will create an - // unhandled rejection. - writer.closed.then(onClose, onClose) - options.result.pipe(target) + cork: () => {}, + uncork: () => {}, + // Not implemented: on/removeListener + } as any) } else { const payload = await options.result.toUnchunkedString() res.setHeader('Content-Length', String(byteLength(payload))) diff --git a/test/e2e/cancel-request/app/edge-route/route.ts b/test/e2e/cancel-request/app/edge-route/route.ts deleted file mode 100644 index d1ee993c28131..0000000000000 --- a/test/e2e/cancel-request/app/edge-route/route.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Streamable } from '../../streamable' - -export const runtime = 'edge' - -let streamable -let requestAborted = false - -export function GET(req: Request): Response { - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. - if (streamable) { - return new Response( - JSON.stringify({ - requestAborted, - i: streamable.i, - streamCleanedUp: streamable.streamCleanedUp, - }) - ) - } - - streamable = Streamable() - req.signal.onabort = () => { - requestAborted = true - } - return new Response(streamable.stream) -} diff --git a/test/e2e/cancel-request/app/node-route/route.ts b/test/e2e/cancel-request/app/node-route/route.ts deleted file mode 100644 index 1159bd474c470..0000000000000 --- a/test/e2e/cancel-request/app/node-route/route.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Streamable } from '../../streamable' - -export const runtime = 'nodejs' -// Next thinks it can statically compile this route, which breaks the test. -export const dynamic = 'force-dynamic' - -let streamable -let requestAborted = false - -export function GET(req: Request): Response { - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. - if (streamable) { - return new Response( - JSON.stringify({ - requestAborted, - i: streamable.i, - streamCleanedUp: streamable.streamCleanedUp, - }) - ) - } - - streamable = Streamable() - req.signal.onabort = () => { - requestAborted = true - } - return new Response(streamable.stream) -} diff --git a/test/e2e/cancel-request/middleware.ts b/test/e2e/cancel-request/middleware.ts deleted file mode 100644 index f5ec4e77dbd42..0000000000000 --- a/test/e2e/cancel-request/middleware.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Streamable } from './streamable' - -export const config = { - matcher: '/middleware', -} - -let streamable -let requestAborted = false - -export default function handler(req: Request): Response { - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. - if (streamable) { - return new Response( - JSON.stringify({ - requestAborted, - i: streamable.i, - streamCleanedUp: streamable.streamCleanedUp, - }) - ) - } - - streamable = Streamable() - req.signal.onabort = () => { - requestAborted = true - } - return new Response(streamable.stream) -} diff --git a/test/e2e/cancel-request/pages/api/edge-api.ts b/test/e2e/cancel-request/pages/api/edge-api.ts deleted file mode 100644 index c586ce10862e2..0000000000000 --- a/test/e2e/cancel-request/pages/api/edge-api.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Streamable } from '../../streamable' - -export const config = { - runtime: 'edge', -} - -let streamable -let requestAborted = false - -export default function handler(req: Request): Response { - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. - if (streamable) { - return new Response( - JSON.stringify({ - requestAborted, - i: streamable.i, - streamCleanedUp: streamable.streamCleanedUp, - }) - ) - } - - streamable = Streamable() - req.signal.onabort = () => { - requestAborted = true - } - return new Response(streamable.stream) -} diff --git a/test/e2e/cancel-request/pages/api/node-api.ts b/test/e2e/cancel-request/pages/api/node-api.ts deleted file mode 100644 index c81c7ac746491..0000000000000 --- a/test/e2e/cancel-request/pages/api/node-api.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { IncomingMessage, ServerResponse } from 'http' -import { pipeline } from 'stream' -import { Readable } from '../../readable' - -export const config = { - runtime: 'nodejs', -} - -let readable -let requestAborted = false - -export default function handler( - _req: IncomingMessage, - res: ServerResponse -): void { - // The 2nd request should render the stats. We don't use a query param - // because edge rendering will create a different bundle for that. - if (readable) { - res.end( - JSON.stringify({ - requestAborted, - i: readable.i, - streamCleanedUp: readable.streamCleanedUp, - }) - ) - return - } - - readable = Readable() - res.on('close', () => { - requestAborted = true - }) - pipeline(readable.stream, res, () => { - res.end() - }) -} diff --git a/test/e2e/cancel-request/readable.ts b/test/e2e/cancel-request/readable.ts deleted file mode 100644 index 8faea130df5c9..0000000000000 --- a/test/e2e/cancel-request/readable.ts +++ /dev/null @@ -1,22 +0,0 @@ -import * as stream from 'stream' -import { sleep } from './sleep' - -export function Readable() { - const encoder = new TextEncoder() - const readable = { - i: 0, - streamCleanedUp: false, - stream: new stream.Readable({ - async read() { - await sleep(100) - this.push(encoder.encode(String(readable.i++))) - - if (readable.i >= 25) this.push(null) - }, - destroy() { - readable.streamCleanedUp = true - }, - }), - } - return readable -} diff --git a/test/e2e/cancel-request/sleep.ts b/test/e2e/cancel-request/sleep.ts deleted file mode 100644 index 5b675e3b71433..0000000000000 --- a/test/e2e/cancel-request/sleep.ts +++ /dev/null @@ -1,3 +0,0 @@ -export function sleep(ms: number) { - return new Promise((res) => setTimeout(res, ms)) -} diff --git a/test/e2e/cancel-request/stream-cancel.test.ts b/test/e2e/cancel-request/stream-cancel.test.ts deleted file mode 100644 index e6c051390b230..0000000000000 --- a/test/e2e/cancel-request/stream-cancel.test.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { createNextDescribe } from 'e2e-utils' -import { sleep } from './sleep' -import { get } from 'http' - -createNextDescribe( - 'streaming responses cancel inner stream after disconnect', - { - files: __dirname, - }, - ({ next }) => { - type CancelState = { - requestAborted: boolean - streamCleanedUp: boolean - i: number - } - - function prime(url: string) { - return new Promise((resolve) => { - url = new URL(url, next.url).href - - // There's a bug in node-fetch v2 where aborting the fetch will never abort - // the connection, because the body is a transformed stream that doesn't - // close the connection stream. - // https://github.com/node-fetch/node-fetch/pull/670 - const req = get(url, async (res) => { - while (true) { - const value = res.read(1) - if (value) break - await sleep(5) - } - - res.destroy() - - // make sure the connection has finished - await sleep(100) - - resolve() - }) - req.end() - }) - } - - // The disconnect from our prime request to the server isn't instant, and - // there's no good signal on the client end for when it happens. So we just - // fetch multiple times waiting for it to happen. - async function getTillCancelled(url: string) { - while (true) { - const res = await next.fetch(url) - const json = (await res.json()) as CancelState - if (json.streamCleanedUp === true) { - return json - } - - await sleep(10) - } - } - - it('Midddleware cancels inner ReadableStream', async () => { - await prime('/middleware') - const json = await getTillCancelled('/middleware') - expect(json).toMatchObject({ - requestAborted: true, - streamCleanedUp: true, - i: (expect as any).toBeWithin(0, 5), - }) - }) - - it('App Route Handler Edge cancels inner ReadableStream', async () => { - await prime('/edge-route') - const json = await getTillCancelled('/edge-route') - expect(json).toMatchObject({ - requestAborted: true, - streamCleanedUp: true, - i: (expect as any).toBeWithin(0, 5), - }) - }) - - it('App Route Handler NodeJS cancels inner ReadableStream', async () => { - await prime('/node-route') - const json = await getTillCancelled('/node-route') - expect(json).toMatchObject({ - requestAborted: true, - streamCleanedUp: true, - i: (expect as any).toBeWithin(0, 5), - }) - }) - - it('Pages Api Route Edge cancels inner ReadableStream', async () => { - await prime('/api/edge-api') - const json = await getTillCancelled('/api/edge-api') - expect(json).toMatchObject({ - requestAborted: true, - streamCleanedUp: true, - i: (expect as any).toBeWithin(0, 5), - }) - }) - - it('Pages Api Route NodeJS cancels inner ReadableStream', async () => { - await prime('/api/node-api') - const json = await getTillCancelled('/api/node-api') - expect(json).toMatchObject({ - requestAborted: true, - streamCleanedUp: true, - i: (expect as any).toBeWithin(0, 5), - }) - }) - } -) diff --git a/test/e2e/cancel-request/streamable.ts b/test/e2e/cancel-request/streamable.ts deleted file mode 100644 index a540d09e3448e..0000000000000 --- a/test/e2e/cancel-request/streamable.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { sleep } from './sleep' - -export function Streamable() { - const encoder = new TextEncoder() - const streamable = { - i: 0, - streamCleanedUp: false, - stream: new ReadableStream({ - async pull(controller) { - await sleep(100) - controller.enqueue(encoder.encode(String(streamable.i++))) - - if (streamable.i >= 25) controller.close() - }, - cancel() { - streamable.streamCleanedUp = true - }, - }), - } - return streamable -}