From 8025a2bd7e9a66b1d4b7402fbc55d6152eb41fd6 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 20 Jun 2023 22:35:52 -0400 Subject: [PATCH] Support response aborting --- .../server/lib/render-server-standalone.ts | 14 +++++++ packages/next/src/server/lib/start-server.ts | 15 +++++++ packages/next/src/server/next-server.ts | 42 ++++++++++--------- 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/packages/next/src/server/lib/render-server-standalone.ts b/packages/next/src/server/lib/render-server-standalone.ts index 32b66022dd2d9..6a15dd23879ef 100644 --- a/packages/next/src/server/lib/render-server-standalone.ts +++ b/packages/next/src/server/lib/render-server-standalone.ts @@ -105,6 +105,20 @@ export const createServerHandler = async ({ return } const proxyServer = getProxyServer(req.url || '/') + + // http-proxy does not properly detect a client disconnect in newer + // versions of Node.js. This is caused because it only listens for the + // `aborted` event on the our request object, but it also fully reads and + // closes the request object. Node **will not** fire `aborted` when the + // request is already closed. Listening for `close` on our response object + // will detect the disconnect, and we can abort the proxy's connection. + proxyServer.on('proxyReq', (proxyReq) => { + res.on('close', () => proxyReq.destroy()) + }) + proxyServer.on('proxyRes', (proxyRes) => { + res.on('close', () => proxyRes.destroy()) + }) + proxyServer.web(req, res) proxyServer.on('error', (err) => { res.statusCode = 500 diff --git a/packages/next/src/server/lib/start-server.ts b/packages/next/src/server/lib/start-server.ts index 2e7b95d018bc9..2414639f3fee1 100644 --- a/packages/next/src/server/lib/start-server.ts +++ b/packages/next/src/server/lib/start-server.ts @@ -284,6 +284,21 @@ export async function startServer({ return } const proxyServer = getProxyServer(req.url || '/') + + // http-proxy does not properly detect a client disconnect in newer + // versions of Node.js. This is caused because it only listens for the + // `aborted` event on the our request object, but it also fully reads + // and closes the request object. Node **will not** fire `aborted` when + // the request is already closed. Listening for `close` on our response + // object will detect the disconnect, and we can abort the proxy's + // connection. + proxyServer.on('proxyReq', (proxyReq) => { + res.on('close', () => proxyReq.destroy()) + }) + proxyServer.on('proxyRes', (proxyRes) => { + res.on('close', () => proxyRes.destroy()) + }) + proxyServer.web(req, res) } upgradeHandler = async (req, socket, head) => { diff --git a/packages/next/src/server/next-server.ts b/packages/next/src/server/next-server.ts index 9a4d8d2d16646..f285c59676f53 100644 --- a/packages/next/src/server/next-server.ts +++ b/packages/next/src/server/next-server.ts @@ -986,13 +986,12 @@ export default class NextNodeServer extends BaseServer { ) } - protected streamResponseChunk(res: NodeNextResponse, chunk: any) { - res.originalResponse.write(chunk) - + private streamResponseChunk(res: ServerResponse, chunk: any) { + res.write(chunk) // When both compression and streaming are enabled, we need to explicitly // flush the response to avoid it being buffered by gzip. - if (this.compression && 'flush' in res.originalResponse) { - ;(res.originalResponse as any).flush() + if (this.compression && 'flush' in res) { + ;(res as any).flush() } } @@ -1525,10 +1524,12 @@ export default class NextNodeServer extends BaseServer { res.statusCode = invokeRes.statusCode res.statusMessage = invokeRes.statusMessage + const { originalResponse } = res as NodeNextResponse for await (const chunk of invokeRes) { - this.streamResponseChunk(res as NodeNextResponse, chunk) + if (originalResponse.closed) break + this.streamResponseChunk(originalResponse, chunk) } - ;(res as NodeNextResponse).originalResponse.end() + res.send() return { finished: true, } @@ -2505,9 +2506,12 @@ export default class NextNodeServer extends BaseServer { } } res.statusCode = result.response.status + + const { originalResponse } = res as NodeNextResponse for await (const chunk of result.response.body || ([] as any)) { - this.streamResponseChunk(res as NodeNextResponse, chunk) + if (originalResponse.closed) break + this.streamResponseChunk(originalResponse, chunk) } res.send() return { @@ -2678,17 +2682,14 @@ export default class NextNodeServer extends BaseServer { if (result.response.headers.has('x-middleware-refresh')) { res.statusCode = result.response.status - if ((result.response as any).invokeRes) { - for await (const chunk of (result.response as any).invokeRes) { - this.streamResponseChunk(res as NodeNextResponse, chunk) - } - ;(res as NodeNextResponse).originalResponse.end() - } else { - for await (const chunk of result.response.body || ([] as any)) { - this.streamResponseChunk(res as NodeNextResponse, chunk) - } - res.send() + const { originalResponse } = res as NodeNextResponse + const body = + (result.response as any).invokeRes || result.response.body || [] + for await (const chunk of body) { + if (originalResponse.closed) break + this.streamResponseChunk(originalResponse, chunk) } + res.send() return { finished: true, } @@ -2864,22 +2865,23 @@ export default class NextNodeServer extends BaseServer { } }) + const nodeResStream = (params.res as NodeNextResponse).originalResponse if (result.response.body) { // TODO(gal): not sure that we always need to stream - const nodeResStream = (params.res as NodeNextResponse).originalResponse const { consumeUint8ArrayReadableStream } = require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime') try { for await (const chunk of consumeUint8ArrayReadableStream( result.response.body )) { + if (nodeResStream.closed) break nodeResStream.write(chunk) } } finally { nodeResStream.end() } } else { - ;(params.res as NodeNextResponse).originalResponse.end() + nodeResStream.end() } return result