Skip to content

Commit d0c981c

Browse files
committed
Revert "Revert "Fix stream cancellation in RenderResult.pipe() and sendResponse()" (#52277)"
This reverts commit 5046ee1.
1 parent 0084166 commit d0c981c

File tree

14 files changed

+333
-21
lines changed

14 files changed

+333
-21
lines changed

packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,20 @@ async function render(request: NextRequest, event: NextFetchEvent) {
8080
response.headers.append('Vary', RSC_VARY_HEADER)
8181

8282
const writer = tranform.writable.getWriter()
83-
result.pipe({
83+
const target = {
8484
write: (chunk: Uint8Array) => writer.write(chunk),
8585
end: () => writer.close(),
8686
destroy: (reason?: Error) => writer.abort(reason),
87-
})
87+
closed: false,
88+
}
89+
const onClose = () => {
90+
target.closed = true
91+
}
92+
// No, this cannot be replaced with `finally`, because early cancelling
93+
// the stream will create a rejected promise, and finally will create an
94+
// unhandled rejection.
95+
writer.closed.then(onClose, onClose)
96+
result.pipe(target)
8897

8998
return response
9099
}

packages/next/src/server/render-result.ts

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export interface PipeTarget {
1616
end: () => unknown
1717
flush?: () => unknown
1818
destroy: (err?: Error) => unknown
19+
get closed(): boolean
1920
}
2021

2122
export default class RenderResult {
@@ -111,31 +112,25 @@ export default class RenderResult {
111112
: () => {}
112113
const reader = this.response.getReader()
113114

114-
let shouldFatalError = false
115115
try {
116-
let result = await reader.read()
117-
if (!result.done) {
118-
// As we're going to write to the response, we should destroy the
119-
// response if an error occurs.
120-
shouldFatalError = true
121-
}
116+
while (true) {
117+
const result = await reader.read()
118+
119+
if (res.closed || result.done) {
120+
break
121+
}
122122

123-
while (!result.done) {
124123
// Write the data to the response.
125124
res.write(result.value)
126125

127126
// Flush it to the client (if it supports flushing).
128127
flush()
129-
130-
// Read the next chunk.
131-
result = await reader.read()
132128
}
133129

134130
// We're done writing to the response, so we can end it.
135131
res.end()
136132
} catch (err) {
137-
// If we've written to the response, we should destroy it.
138-
if (shouldFatalError) {
133+
if (!res.closed) {
139134
res.destroy(err as any)
140135
}
141136

packages/next/src/server/send-response.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export async function sendResponse(
4949
const iterator = consumeUint8ArrayReadableStream(response.body)
5050
try {
5151
for await (const chunk of iterator) {
52+
if (originalResponse.closed) break
5253
originalResponse.write(chunk)
5354
}
5455
} finally {

packages/next/src/server/stream-utils/node-web-streams-helper.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ export const streamToBufferedResult = async (
2525
},
2626
end() {},
2727
destroy() {},
28+
closed: false,
2829
}
29-
await renderResult.pipe(writable as any)
30+
await renderResult.pipe(writable)
3031
return renderChunks.join('')
3132
}
3233

packages/next/src/server/web-server.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,14 +417,20 @@ export default class NextWebServer extends BaseServer<WebServerOptions> {
417417

418418
if (options.result.isDynamic) {
419419
const writer = res.transformStream.writable.getWriter()
420-
options.result.pipe({
420+
const target = {
421421
write: (chunk: Uint8Array) => writer.write(chunk),
422422
end: () => writer.close(),
423423
destroy: (err: Error) => writer.abort(err),
424-
cork: () => {},
425-
uncork: () => {},
426-
// Not implemented: on/removeListener
427-
} as any)
424+
closed: false,
425+
} as any
426+
const onClose = () => {
427+
target.closed = true
428+
}
429+
// No, this cannot be replaced with `finally`, because early cancelling
430+
// the stream will create a rejected promise, and finally will create an
431+
// unhandled rejection.
432+
writer.closed.then(onClose, onClose)
433+
options.result.pipe(target)
428434
} else {
429435
const payload = await options.result.toUnchunkedString()
430436
res.setHeader('Content-Length', String(byteLength(payload)))
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'edge'
4+
5+
let streamable
6+
let requestAborted = false
7+
8+
export function GET(req: Request): Response {
9+
// The 2nd request should render the stats. We don't use a query param
10+
// because edge rendering will create a different bundle for that.
11+
if (streamable) {
12+
return new Response(
13+
JSON.stringify({
14+
requestAborted,
15+
i: streamable.i,
16+
streamCleanedUp: streamable.streamCleanedUp,
17+
})
18+
)
19+
}
20+
21+
streamable = Streamable()
22+
req.signal.onabort = () => {
23+
requestAborted = true
24+
}
25+
return new Response(streamable.stream)
26+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const runtime = 'nodejs'
4+
// Next thinks it can statically compile this route, which breaks the test.
5+
export const dynamic = 'force-dynamic'
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export function GET(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}

test/e2e/cancel-request/middleware.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from './streamable'
2+
3+
export const config = {
4+
matcher: '/middleware',
5+
}
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export default function handler(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Streamable } from '../../streamable'
2+
3+
export const config = {
4+
runtime: 'edge',
5+
}
6+
7+
let streamable
8+
let requestAborted = false
9+
10+
export default function handler(req: Request): Response {
11+
// The 2nd request should render the stats. We don't use a query param
12+
// because edge rendering will create a different bundle for that.
13+
if (streamable) {
14+
return new Response(
15+
JSON.stringify({
16+
requestAborted,
17+
i: streamable.i,
18+
streamCleanedUp: streamable.streamCleanedUp,
19+
})
20+
)
21+
}
22+
23+
streamable = Streamable()
24+
req.signal.onabort = () => {
25+
requestAborted = true
26+
}
27+
return new Response(streamable.stream)
28+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { IncomingMessage, ServerResponse } from 'http'
2+
import { pipeline } from 'stream'
3+
import { Readable } from '../../readable'
4+
5+
export const config = {
6+
runtime: 'nodejs',
7+
}
8+
9+
let readable
10+
let requestAborted = false
11+
12+
export default function handler(
13+
_req: IncomingMessage,
14+
res: ServerResponse
15+
): void {
16+
// The 2nd request should render the stats. We don't use a query param
17+
// because edge rendering will create a different bundle for that.
18+
if (readable) {
19+
res.end(
20+
JSON.stringify({
21+
requestAborted,
22+
i: readable.i,
23+
streamCleanedUp: readable.streamCleanedUp,
24+
})
25+
)
26+
return
27+
}
28+
29+
readable = Readable()
30+
res.on('close', () => {
31+
requestAborted = true
32+
})
33+
pipeline(readable.stream, res, () => {
34+
res.end()
35+
})
36+
}

0 commit comments

Comments
 (0)