Skip to content

Commit dbe645f

Browse files
committed
http2: fix condition where data is lost
PR-URL: #18895 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]>
1 parent fcebb16 commit dbe645f

File tree

3 files changed

+146
-14
lines changed

3 files changed

+146
-14
lines changed

lib/internal/http2/core.js

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,23 @@ function onStreamClose(code) {
307307

308308
if (state.fd !== undefined)
309309
tryClose(state.fd);
310-
stream.push(null);
311-
stream[kMaybeDestroy](null, code);
310+
311+
// Defer destroy we actually emit end.
312+
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
313+
// If errored or ended, we can destroy immediately.
314+
stream[kMaybeDestroy](null, code);
315+
} else {
316+
// Wait for end to destroy.
317+
stream.on('end', stream[kMaybeDestroy]);
318+
// Push a null so the stream can end whenever the client consumes
319+
// it completely.
320+
stream.push(null);
321+
322+
// Same as net.
323+
if (stream.readableLength === 0) {
324+
stream.read(0);
325+
}
326+
}
312327
}
313328

314329
// Receives a chunk of data for a given stream and forwards it on
@@ -326,11 +341,19 @@ function onStreamRead(nread, buf) {
326341
}
327342
return;
328343
}
344+
329345
// Last chunk was received. End the readable side.
330346
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
331347
`${sessionName(stream[kSession][kType])}]: ending readable.`);
332-
stream.push(null);
333-
stream[kMaybeDestroy]();
348+
349+
// defer this until we actually emit end
350+
if (stream._readableState.endEmitted) {
351+
stream[kMaybeDestroy]();
352+
} else {
353+
stream.on('end', stream[kMaybeDestroy]);
354+
stream.push(null);
355+
stream.read(0);
356+
}
334357
}
335358

336359
// Called when the remote peer settings have been updated.
@@ -1833,21 +1856,25 @@ class Http2Stream extends Duplex {
18331856
session[kMaybeDestroy]();
18341857
process.nextTick(emit, this, 'close', code);
18351858
callback(err);
1836-
}
18371859

1860+
}
18381861
// The Http2Stream can be destroyed if it has closed and if the readable
18391862
// side has received the final chunk.
18401863
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
1841-
if (error == null) {
1842-
if (code === NGHTTP2_NO_ERROR &&
1843-
(!this._readableState.ended ||
1844-
!this._writableState.ended ||
1845-
this._writableState.pendingcb > 0 ||
1846-
!this.closed)) {
1847-
return;
1848-
}
1864+
if (error || code !== NGHTTP2_NO_ERROR) {
1865+
this.destroy(error);
1866+
return;
1867+
}
1868+
1869+
// TODO(mcollina): remove usage of _*State properties
1870+
if (this._readableState.ended &&
1871+
this._writableState.ended &&
1872+
this._writableState.pendingcb === 0 &&
1873+
this.closed) {
1874+
this.destroy();
1875+
// This should return, but eslint complains.
1876+
// return
18491877
}
1850-
this.destroy(error);
18511878
}
18521879
}
18531880

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
const { Readable } = require('stream');
9+
10+
const server = http2.createServer(common.mustCall((req, res) => {
11+
res.setHeader('content-type', 'text/html');
12+
const input = new Readable({
13+
read() {
14+
this.push('test');
15+
this.push(null);
16+
}
17+
});
18+
input.pipe(res);
19+
}));
20+
21+
server.listen(0, common.mustCall(() => {
22+
const port = server.address().port;
23+
const client = http2.connect(`http://localhost:${port}`);
24+
25+
const req = client.request();
26+
27+
req.on('response', common.mustCall((headers) => {
28+
assert.strictEqual(headers[':status'], 200);
29+
assert.strictEqual(headers['content-type'], 'text/html');
30+
}));
31+
32+
let data = '';
33+
34+
const notCallClose = common.mustNotCall();
35+
36+
setTimeout(() => {
37+
req.setEncoding('utf8');
38+
req.removeListener('close', notCallClose);
39+
req.on('close', common.mustCall(() => {
40+
server.close();
41+
client.close();
42+
}));
43+
req.on('data', common.mustCallAtLeast((d) => data += d));
44+
req.on('end', common.mustCall(() => {
45+
assert.strictEqual(data, 'test');
46+
}));
47+
}, common.platformTimeout(100));
48+
49+
req.on('close', notCallClose);
50+
}));
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
const { Readable } = require('stream');
9+
10+
const server = http2.createServer();
11+
server.on('stream', common.mustCall((stream) => {
12+
stream.respond({
13+
':status': 200,
14+
'content-type': 'text/html'
15+
});
16+
const input = new Readable({
17+
read() {
18+
this.push('test');
19+
this.push(null);
20+
}
21+
});
22+
input.pipe(stream);
23+
}));
24+
25+
26+
server.listen(0, common.mustCall(() => {
27+
const port = server.address().port;
28+
const client = http2.connect(`http://localhost:${port}`);
29+
30+
const req = client.request();
31+
32+
req.on('response', common.mustCall((headers) => {
33+
assert.strictEqual(headers[':status'], 200);
34+
assert.strictEqual(headers['content-type'], 'text/html');
35+
}));
36+
37+
let data = '';
38+
39+
const notCallClose = common.mustNotCall();
40+
41+
setTimeout(() => {
42+
req.setEncoding('utf8');
43+
req.removeListener('close', notCallClose);
44+
req.on('close', common.mustCall(() => {
45+
server.close();
46+
client.close();
47+
}));
48+
req.on('data', common.mustCallAtLeast((d) => data += d));
49+
req.on('end', common.mustCall(() => {
50+
assert.strictEqual(data, 'test');
51+
}));
52+
}, common.platformTimeout(100));
53+
54+
req.on('close', notCallClose);
55+
}));

0 commit comments

Comments
 (0)