From 42cfb803f8cd45cf8b57f5e56a52eccffe25ef46 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 29 Nov 2019 02:04:46 +0100 Subject: [PATCH 1/2] stream: improve performance for sync write finishes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve performance and reduce memory usage when a writable stream is written to with the same callback (which is the most common case) and when the write operation finishes synchronously (which is also often the case). confidence improvement accuracy (*) (**) (***) streams/writable-manywrites.js sync='no' n=2000000 0.99 % ±3.20% ±4.28% ±5.61% streams/writable-manywrites.js sync='yes' n=2000000 *** 710.69 % ±19.65% ±26.47% ±35.09% Refs: https://github.com/nodejs/node/issues/18013 Refs: https://github.com/nodejs/node/issues/18367 --- benchmark/streams/writable-manywrites.js | 11 ++++-- lib/_stream_writable.js | 34 ++++++++++++++++--- .../test-stream-writable-samecb-singletick.js | 30 ++++++++++++++++ 3 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 test/parallel/test-stream-writable-samecb-singletick.js diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index 6fcb07e849d615..0ed38d0357a438 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -4,14 +4,19 @@ const common = require('../common'); const Writable = require('stream').Writable; const bench = common.createBenchmark(main, { - n: [2e6] + n: [2e6], + sync: ['yes', 'no'] }); -function main({ n }) { +function main({ n, sync }) { const b = Buffer.allocUnsafe(1024); const s = new Writable(); + sync = sync === 'yes'; s._write = function(chunk, encoding, cb) { - cb(); + if (sync) + cb(); + else + process.nextTick(cb); }; bench.start(); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index be8332055002d4..05c1a6193e67a6 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -142,6 +142,10 @@ function WritableState(options, stream, isDuplex) { // The amount that is being written when _write is called. this.writelen = 0; + // Storage for data passed to the afterWrite() callback in case of + // synchronous _write() completion. + this.afterWriteTickInfo = null; + this.bufferedRequest = null; this.lastBufferedRequest = null; @@ -498,22 +502,42 @@ function onwrite(stream, er) { } if (sync) { - process.nextTick(afterWrite, stream, state, cb); + // It is a common case that the callback passed to .write() is always + // the same. In that case, we do not schedule a new nextTick(), but rather + // just increase a counter, to improve performance and avoid memory + // allocations. + if (state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { count: 1, cb, stream, state }; + process.nextTick(afterWriteTick, state.afterWriteTickInfo); + } } else { - afterWrite(stream, state, cb); + afterWrite(stream, state, 1, cb); } } } -function afterWrite(stream, state, cb) { +function afterWriteTick({ stream, state, count, cb }) { + return afterWrite(stream, state, count, cb); +} + +function afterWrite(stream, state, count, cb) { + state.afterWriteTickInfo = null; + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain; if (needDrain) { state.needDrain = false; stream.emit('drain'); } - state.pendingcb--; - cb(); + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + finishMaybe(stream, state); } diff --git a/test/parallel/test-stream-writable-samecb-singletick.js b/test/parallel/test-stream-writable-samecb-singletick.js new file mode 100644 index 00000000000000..e7dfa648797821 --- /dev/null +++ b/test/parallel/test-stream-writable-samecb-singletick.js @@ -0,0 +1,30 @@ +'use strict'; +const common = require('../common'); +const { Console } = require('console'); +const { Writable } = require('stream'); +const async_hooks = require('async_hooks'); + +// Make sure that repeated calls to console.log(), and by extension +// stream.write() for the underlying stream, allocate exactly 1 tick object. +// At the time of writing, that is enough to ensure a flat memory profile +// from repeated console.log() calls, rather than having callbacks pile up +// over time, assuming that data can be written synchronously. +// Refs: https://github.com/nodejs/node/issues/18013 +// Refs: https://github.com/nodejs/node/issues/18367 + +const checkTickCreated = common.mustCall(); + +async_hooks.createHook({ + init(id, type, triggerId, resoure) { + if (type === 'TickObject') checkTickCreated(); + } +}).enable(); + +const console = new Console(new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + cb(); + }, 100) +})); + +for (let i = 0; i < 100; i++) + console.log(i); From 302ec92055eebc84295c101889a287ed19e7cd71 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 29 Nov 2019 16:35:10 +0100 Subject: [PATCH 2/2] fixup! stream: improve performance for sync write finishes --- lib/_stream_writable.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 05c1a6193e67a6..31efa54d200760 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -520,12 +520,11 @@ function onwrite(stream, er) { } function afterWriteTick({ stream, state, count, cb }) { + state.afterWriteTickInfo = null; return afterWrite(stream, state, count, cb); } function afterWrite(stream, state, count, cb) { - state.afterWriteTickInfo = null; - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain; if (needDrain) {