From ccc94a986729a43cbcfbb64954affd3448d71685 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 21 Aug 2018 20:05:12 +0200 Subject: [PATCH 1/7] stream: add auto-destroy mode --- doc/api/stream.md | 4 ++ lib/_stream_readable.js | 39 +++++++++++++--- lib/_stream_writable.js | 41 ++++++++++++++--- test/parallel/test-stream-auto-destroy.js | 55 +++++++++++++++++++++++ 4 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 test/parallel/test-stream-auto-destroy.js diff --git a/doc/api/stream.md b/doc/api/stream.md index fb2f2da28d1a87..6b7d833992c480 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1521,6 +1521,8 @@ changes: [`stream._destroy()`][writable-_destroy] method. * `final` {Function} Implementation for the [`stream._final()`][stream-_final] method. + * `autoDestroy` {boolean} Whether this stream should automatically call + .destroy() on itself after ending. ```js const { Writable } = require('stream'); @@ -1770,6 +1772,8 @@ constructor and implement the `readable._read()` method. method. * `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy] method. + * `autoDestroy` {boolean} Whether this stream should automatically call + .destroy() on itself after ending. ```js const { Readable } = require('stream'); diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 488d10a10b5bbd..8a7f8094f3fbe8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'end' (and potentially 'finish') + this.autoDestroy = !!options.autoDestroy; + // has it been destroyed this.destroyed = false; @@ -235,7 +238,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (!skipChunkCheck) er = chunkInvalid(state, chunk); if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && @@ -245,11 +248,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (addToFront) { if (state.endEmitted) - stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else addChunk(stream, state, chunk, true); } else if (state.ended) { - stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); } else if (state.destroyed) { return false; } else { @@ -581,7 +584,7 @@ function maybeReadMore_(stream, state) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function(n) { - this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); + errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -687,7 +690,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { unpipe(); dest.removeListener('error', onerror); if (EE.listenerCount(dest, 'error') === 0) - dest.emit('error', er); + errorOrDestroy(dest, er); } // Make sure our error handler is attached before userland ones. @@ -1084,6 +1087,28 @@ function endReadable(stream) { } } +function writableAutoDestroy(wState) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + return !wState || (wState.autoDestroy && wState.finished); +} + +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const rState = stream._readableState; + const wState = stream._writableState; + + if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + stream.destroy(err); + else + stream.emit('error', err); +} + function endReadableNT(state, stream) { debug('endReadableNT', state.endEmitted, state.length); @@ -1092,5 +1117,9 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.readable = false; stream.emit('end'); + + if (state.autoDestroy && writableAutoDestroy(stream._writableState)) { + stream.destroy(); + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 3bad957912b323..b4471973c21e40 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -147,6 +147,9 @@ function WritableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'finish' (and potentially 'end') + this.autoDestroy = !!options.autoDestroy; + // count buffered requests this.bufferedRequestCount = 0; @@ -235,14 +238,14 @@ function Writable(options) { // Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function() { - this.emit('error', new ERR_STREAM_CANNOT_PIPE()); + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); } @@ -258,7 +261,7 @@ function validChunk(stream, state, chunk, cb) { er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); } if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); return false; } @@ -422,13 +425,13 @@ function onwriteError(stream, state, sync, er, cb) { // after error process.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); } else { // the caller expect this to happen before if // it is async cb(er); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); // this can emit finish, but finish must // always follow error finishMaybe(stream, state); @@ -612,7 +615,7 @@ function callFinal(stream, state) { stream._final((err) => { state.pendingcb--; if (err) { - stream.emit('error', err); + errorOrDestroy(stream, err); } state.prefinished = true; stream.emit('prefinish'); @@ -632,6 +635,28 @@ function prefinish(stream, state) { } } +function readableAutoDestroy(rState) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well + return !rState || (rState.autoDestroy && rState.endEmitted); +} + +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const rState = stream._readableState; + const wState = stream._writableState; + + if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + stream.destroy(err); + else + stream.emit('error', err); +} + function finishMaybe(stream, state) { var need = needFinish(state); if (need) { @@ -639,6 +664,10 @@ function finishMaybe(stream, state) { if (state.pendingcb === 0) { state.finished = true; stream.emit('finish'); + + if (state.autoDestroy && readableAutoDestroy(stream._readableState)) { + stream.destroy(); + } } } return need; diff --git a/test/parallel/test-stream-auto-destroy.js b/test/parallel/test-stream-auto-destroy.js new file mode 100644 index 00000000000000..e4ef5c149ce969 --- /dev/null +++ b/test/parallel/test-stream-auto-destroy.js @@ -0,0 +1,55 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); + +{ + const r = new stream.Readable({ + autoDestroy: true, + read() { + this.push('hello'); + this.push('world'); + this.push(null); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + r.resume(); + r.on('end', common.mustCall()); + r.on('close', common.mustCall()); +} + +{ + const w = new stream.Writable({ + autoDestroy: true, + write(data, enc, cb) { + cb(null); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + w.write('hello'); + w.write('world'); + w.end(); + + w.on('finish', common.mustCall()); + w.on('close', common.mustCall()); +} + +{ + const t = new stream.Transform({ + autoDestroy: true, + transform(data, enc, cb) { + cb(null, data); + }, + destroy: common.mustCall((err, cb) => cb()) + }); + + t.write('hello'); + t.write('world'); + t.end(); + + t.resume(); + t.on('end', common.mustCall()); + t.on('finish', common.mustCall()); + t.on('close', common.mustCall()); +} From 38077e66d9229649b6a1873fa6bf27b0f063099b Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Fri, 26 Oct 2018 16:22:33 +0200 Subject: [PATCH 2/7] inline duplex autodestroy logic --- lib/_stream_readable.js | 13 ++++++------- lib/_stream_writable.js | 15 +++++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8a7f8094f3fbe8..e74f14aa5b89b1 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1087,12 +1087,6 @@ function endReadable(stream) { } } -function writableAutoDestroy(wState) { - // In case of duplex streams we need a way to detect - // if the writable side is ready for autoDestroy as well - return !wState || (wState.autoDestroy && wState.finished); -} - function errorOrDestroy(stream, err) { // We have tests that rely on errors being emitted // in the same tick, so changing this is semver major. @@ -1119,7 +1113,12 @@ function endReadableNT(state, stream) { stream.emit('end'); if (state.autoDestroy && writableAutoDestroy(stream._writableState)) { - stream.destroy(); + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + const wState = stream._writableState; + if (!wState || (wState.autoDestroy && wState.finished)) { + stream.destroy(); + } } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index b4471973c21e40..48592d3f6c8c9d 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -635,12 +635,6 @@ function prefinish(stream, state) { } } -function readableAutoDestroy(rState) { - // In case of duplex streams we need a way to detect - // if the readable side is ready for autoDestroy as well - return !rState || (rState.autoDestroy && rState.endEmitted); -} - function errorOrDestroy(stream, err) { // We have tests that rely on errors being emitted // in the same tick, so changing this is semver major. @@ -665,8 +659,13 @@ function finishMaybe(stream, state) { state.finished = true; stream.emit('finish'); - if (state.autoDestroy && readableAutoDestroy(stream._readableState)) { - stream.destroy(); + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well + const rState = stream._readableState; + if (!rState || (rState.autoDestroy && rState.endEmitted)) { + stream.destroy(); + } } } } From 294e10c0a6594423d11bcd5efb7b30286644607d Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Fri, 26 Oct 2018 16:27:01 +0200 Subject: [PATCH 3/7] move errorOrDestroy to destroyImpl --- lib/_stream_readable.js | 19 ++----------------- lib/_stream_writable.js | 18 ++---------------- lib/internal/streams/destroy.js | 20 +++++++++++++++++++- 3 files changed, 23 insertions(+), 34 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index e74f14aa5b89b1..2a2122e0e553cd 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator; util.inherits(Readable, Stream); +const { errorOrDestroy } = destroyImpl; const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { @@ -1087,22 +1088,6 @@ function endReadable(stream) { } } -function errorOrDestroy(stream, err) { - // We have tests that rely on errors being emitted - // in the same tick, so changing this is semver major. - // For now when you opt-in to autoDestroy we allow - // the error to be emitted nextTick. In a future - // semver major update we should change the default to this. - - const rState = stream._readableState; - const wState = stream._writableState; - - if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) - stream.destroy(err); - else - stream.emit('error', err); -} - function endReadableNT(state, stream) { debug('endReadableNT', state.endEmitted, state.length); @@ -1112,7 +1097,7 @@ function endReadableNT(state, stream) { stream.readable = false; stream.emit('end'); - if (state.autoDestroy && writableAutoDestroy(stream._writableState)) { + if (state.autoDestroy) { // In case of duplex streams we need a way to detect // if the writable side is ready for autoDestroy as well const wState = stream._writableState; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 48592d3f6c8c9d..160179cd0e84fa 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -45,6 +45,8 @@ const { ERR_UNKNOWN_ENCODING } = require('internal/errors').codes; +const { errorOrDestroy } = destroyImpl; + util.inherits(Writable, Stream); function nop() {} @@ -635,22 +637,6 @@ function prefinish(stream, state) { } } -function errorOrDestroy(stream, err) { - // We have tests that rely on errors being emitted - // in the same tick, so changing this is semver major. - // For now when you opt-in to autoDestroy we allow - // the error to be emitted nextTick. In a future - // semver major update we should change the default to this. - - const rState = stream._readableState; - const wState = stream._writableState; - - if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) - stream.destroy(err); - else - stream.emit('error', err); -} - function finishMaybe(stream, state) { var need = needFinish(state); if (need) { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a0383cc3cea70..ce9d2545e45022 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -82,7 +82,25 @@ function emitErrorNT(self, err) { self.emit('error', err); } +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const rState = stream._readableState; + const wState = stream._writableState; + + if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + stream.destroy(err); + else + stream.emit('error', err); +} + + module.exports = { destroy, - undestroy + undestroy, + errorOrDestroy }; From a6eb82edc9655b6f3b20e7b580248e30fb44c234 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Fri, 26 Oct 2018 18:05:40 +0200 Subject: [PATCH 4/7] test ordering of events --- test/parallel/test-stream-auto-destroy.js | 43 +++++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/test/parallel/test-stream-auto-destroy.js b/test/parallel/test-stream-auto-destroy.js index e4ef5c149ce969..7bce8a56368313 100644 --- a/test/parallel/test-stream-auto-destroy.js +++ b/test/parallel/test-stream-auto-destroy.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); const stream = require('stream'); +const assert = require('assert'); { const r = new stream.Readable({ @@ -13,9 +14,17 @@ const stream = require('stream'); destroy: common.mustCall((err, cb) => cb()) }); + let ended = false; + r.resume(); - r.on('end', common.mustCall()); - r.on('close', common.mustCall()); + + r.on('end', common.mustCall(() => { + ended = true; + })); + + r.on('close', common.mustCall(() => { + assert(ended); + })); } { @@ -27,12 +36,19 @@ const stream = require('stream'); destroy: common.mustCall((err, cb) => cb()) }); + let finished = false; + w.write('hello'); w.write('world'); w.end(); - w.on('finish', common.mustCall()); - w.on('close', common.mustCall()); + w.on('finish', common.mustCall(() => { + finished = true; + })); + + w.on('close', common.mustCall(() => { + assert(finished); + })); } { @@ -44,12 +60,25 @@ const stream = require('stream'); destroy: common.mustCall((err, cb) => cb()) }); + let ended = false; + let finished = false; + t.write('hello'); t.write('world'); t.end(); t.resume(); - t.on('end', common.mustCall()); - t.on('finish', common.mustCall()); - t.on('close', common.mustCall()); + + t.on('end', common.mustCall(() => { + ended = true; + })); + + t.on('finish', common.mustCall(() => { + finished = true; + })); + + t.on('close', common.mustCall(() => { + assert(ended); + assert(finished); + })); } From c40028a7a8c979a54b910b05859d17d8a312e614 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Oct 2018 11:49:57 +0100 Subject: [PATCH 5/7] fix docs --- doc/api/stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 6b7d833992c480..07f093d8aba118 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1522,7 +1522,7 @@ changes: * `final` {Function} Implementation for the [`stream._final()`][stream-_final] method. * `autoDestroy` {boolean} Whether this stream should automatically call - .destroy() on itself after ending. + `.destroy()` on itself after ending. **Default:** `false`. ```js const { Writable } = require('stream'); @@ -1773,7 +1773,7 @@ constructor and implement the `readable._read()` method. * `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy] method. * `autoDestroy` {boolean} Whether this stream should automatically call - .destroy() on itself after ending. + `.destroy()` on itself after ending. **Default:** `false`. ```js const { Readable } = require('stream'); From 7eb4e6409228cabaf45cbcb103e741fd95d82fd4 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Oct 2018 11:58:19 +0100 Subject: [PATCH 6/7] add to changes: section --- doc/api/stream.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 07f093d8aba118..99cf3c62bf844b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1493,6 +1493,11 @@ changes: pr-url: https://github.com/nodejs/node/pull/18438 description: > Add `emitClose` option to specify if `'close'` is emitted on destroy + - version: TBD + pr-url: https://github.com/nodejs/node/pull/22795 + description: > + Add `autoDestroy` option to automatically `destroy()` the stream + when it emits `'finish'` or errors --> * `options` {Object} @@ -1758,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])` constructor and implement the `readable._read()` method. #### new stream.Readable([options]) + * `options` {Object} * `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store From 713a808bff40699aeb309cae1405f1fdf8eeb784 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Oct 2018 13:21:24 +0100 Subject: [PATCH 7/7] TDB -> REPLACEME --- doc/api/stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 99cf3c62bf844b..f2e1c2a0c4ff9e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1493,7 +1493,7 @@ changes: pr-url: https://github.com/nodejs/node/pull/18438 description: > Add `emitClose` option to specify if `'close'` is emitted on destroy - - version: TBD + - version: REPLACEME pr-url: https://github.com/nodejs/node/pull/22795 description: > Add `autoDestroy` option to automatically `destroy()` the stream @@ -1765,7 +1765,7 @@ constructor and implement the `readable._read()` method. #### new stream.Readable([options])