Skip to content

Commit 7cafd5f

Browse files
committed
stream: fix finished w/ 'close' before 'end'
Emitting 'close' before 'end' on a Readable should result in a premature close error. PR-URL: #31545 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent 3ec4b21 commit 7cafd5f

File tree

4 files changed

+62
-13
lines changed

4 files changed

+62
-13
lines changed

lib/internal/streams/end-of-stream.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ function isWritableFinished(stream) {
3434

3535
function nop() {}
3636

37+
function isReadableEnded(stream) {
38+
if (stream.readableEnded) return true;
39+
const rState = stream._readableState;
40+
if (!rState || rState.errored) return false;
41+
return rState.endEmitted || (rState.ended && rState.length === 0);
42+
}
43+
3744
function eos(stream, opts, callback) {
3845
if (arguments.length === 2) {
3946
callback = opts;
@@ -84,7 +91,7 @@ function eos(stream, opts, callback) {
8491
const onclose = () => {
8592
let err;
8693
if (readable && !readableEnded) {
87-
if (!rState || !rState.ended)
94+
if (!isReadableEnded(stream))
8895
err = new ERR_STREAM_PREMATURE_CLOSE();
8996
return callback.call(stream, err);
9097
}

lib/internal/streams/pipeline.js

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,37 @@ let PassThrough;
2626
let createReadableStreamAsyncIterator;
2727

2828
function destroyer(stream, reading, writing, callback) {
29-
callback = once(callback);
30-
let destroyed = false;
29+
const _destroy = once((err) => {
30+
destroyImpl.destroyer(stream, err);
31+
callback(err);
32+
});
3133

3234
if (eos === undefined) eos = require('internal/streams/end-of-stream');
3335
eos(stream, { readable: reading, writable: writing }, (err) => {
34-
if (destroyed) return;
35-
destroyed = true;
36-
destroyImpl.destroyer(stream, err);
37-
callback(err);
36+
const rState = stream._readableState;
37+
if (
38+
err &&
39+
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
40+
reading &&
41+
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
42+
) {
43+
// Some readable streams will emit 'close' before 'end'. However, since
44+
// this is on the readable side 'end' should still be emitted if the
45+
// stream has been ended and no error emitted. This should be allowed in
46+
// favor of backwards compatibility. Since the stream is piped to a
47+
// destination this should not result in any observable difference.
48+
// We don't need to check if this is a writable premature close since
49+
// eos will only fail with premature close on the reading side for
50+
// duplex streams.
51+
stream
52+
.once('end', _destroy)
53+
.once('error', _destroy);
54+
} else {
55+
_destroy(err);
56+
}
3857
});
3958

40-
return (err) => {
41-
if (destroyed) return;
42-
destroyed = true;
43-
destroyImpl.destroyer(stream, err);
44-
callback(err || new ERR_STREAM_DESTROYED('pipe'));
45-
};
59+
return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe'));
4660
}
4761

4862
function popCallback(streams) {

test/parallel/test-stream-finished.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,3 +342,13 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
342342
d._writableState.errored = true;
343343
d.emit('close');
344344
}
345+
346+
{
347+
const r = new Readable();
348+
finished(r, common.mustCall((err) => {
349+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
350+
}));
351+
r.push('asd');
352+
r.push(null);
353+
r.destroy();
354+
}

test/parallel/test-stream-pipeline.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,3 +920,21 @@ const { promisify } = require('util');
920920
}));
921921
src.end();
922922
}
923+
924+
{
925+
// Make sure 'close' before 'end' finishes without error
926+
// if readable has received eof.
927+
// Ref: https://github.com/nodejs/node/issues/29699
928+
const r = new Readable();
929+
const w = new Writable({
930+
write(chunk, encoding, cb) {
931+
cb();
932+
}
933+
});
934+
pipeline(r, w, (err) => {
935+
assert.strictEqual(err, undefined);
936+
});
937+
r.push('asd');
938+
r.push(null);
939+
r.emit('close');
940+
}

0 commit comments

Comments
 (0)