Skip to content

Commit 683c368

Browse files
authored
fix: readable body (#2642)
1 parent e2652b7 commit 683c368

File tree

2 files changed

+76
-21
lines changed

2 files changed

+76
-21
lines changed

lib/api/readable.js

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ module.exports = class BodyReadable extends Readable {
9494
}
9595

9696
push (chunk) {
97-
if (this[kConsume] && chunk !== null && this.readableLength === 0) {
97+
if (this[kConsume] && chunk !== null) {
9898
consumePush(this[kConsume], chunk)
9999
return this[kReading] ? super.push(chunk) : true
100100
}
@@ -215,26 +215,28 @@ async function consume (stream, type) {
215215
reject(rState.errored ?? new TypeError('unusable'))
216216
}
217217
} else {
218-
stream[kConsume] = {
219-
type,
220-
stream,
221-
resolve,
222-
reject,
223-
length: 0,
224-
body: []
225-
}
218+
queueMicrotask(() => {
219+
stream[kConsume] = {
220+
type,
221+
stream,
222+
resolve,
223+
reject,
224+
length: 0,
225+
body: []
226+
}
226227

227-
stream
228-
.on('error', function (err) {
229-
consumeFinish(this[kConsume], err)
230-
})
231-
.on('close', function () {
232-
if (this[kConsume].body !== null) {
233-
consumeFinish(this[kConsume], new RequestAbortedError())
234-
}
235-
})
228+
stream
229+
.on('error', function (err) {
230+
consumeFinish(this[kConsume], err)
231+
})
232+
.on('close', function () {
233+
if (this[kConsume].body !== null) {
234+
consumeFinish(this[kConsume], new RequestAbortedError())
235+
}
236+
})
236237

237-
queueMicrotask(() => consumeStart(stream[kConsume]))
238+
consumeStart(stream[kConsume])
239+
})
238240
}
239241
})
240242
}
@@ -246,8 +248,16 @@ function consumeStart (consume) {
246248

247249
const { _readableState: state } = consume.stream
248250

249-
for (const chunk of state.buffer) {
250-
consumePush(consume, chunk)
251+
if (state.bufferIndex) {
252+
const start = state.bufferIndex
253+
const end = state.buffer.length
254+
for (let n = start; n < end; n++) {
255+
consumePush(consume, state.buffer[n])
256+
}
257+
} else {
258+
for (const chunk of state.buffer) {
259+
consumePush(consume, chunk)
260+
}
251261
}
252262

253263
if (state.endEmitted) {

test/node-test/large-body.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const { createServer } = require('http')
5+
const { request } = require('../../')
6+
const { strictEqual } = require('node:assert')
7+
8+
test('socket should not be reused unless body is consumed', async (t) => {
9+
const LARGE_BODY = 'x'.repeat(10000000)
10+
11+
const server = createServer((req, res) => {
12+
if (req.url === '/foo') {
13+
res.end(LARGE_BODY)
14+
return
15+
}
16+
if (req.url === '/bar') {
17+
res.end('bar')
18+
return
19+
}
20+
throw new Error('Unexpected request url: ' + req.url)
21+
})
22+
23+
await new Promise((resolve) => { server.listen(0, resolve) })
24+
t.after(() => { server.close() })
25+
26+
// Works fine
27+
// const fooRes = await request('http://localhost:3000/foo')
28+
// const fooBody = await fooRes.body.text()
29+
30+
// const barRes = await request('http://localhost:3000/bar')
31+
// await barRes.body.text()
32+
33+
const port = server.address().port
34+
35+
// Fails with:
36+
const fooRes = await request(`http://localhost:${port}/foo`)
37+
const barRes = await request(`http://localhost:${port}/bar`)
38+
39+
const fooBody = await fooRes.body.text()
40+
await barRes.body.text()
41+
42+
strictEqual(fooRes.headers['content-length'], String(LARGE_BODY.length))
43+
strictEqual(fooBody.length, LARGE_BODY.length)
44+
strictEqual(fooBody, LARGE_BODY)
45+
})

0 commit comments

Comments
 (0)