Skip to content

Commit 1819b6f

Browse files
committed
streams: perf improvments for clearBuffer in Writable.
This commits removes a function allocation inside the clearBuffer function. Moreover, it adds counting of buffered chunks to instantiate a fixed-sized Array. The performance improvements are in the range 5-50%, depending on the usecase.
1 parent 671347c commit 1819b6f

File tree

1 file changed

+48
-11
lines changed

1 file changed

+48
-11
lines changed

lib/_stream_writable.js

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
module.exports = Writable;
88
Writable.WritableState = WritableState;
99

10+
const assert = require('assert');
1011
const util = require('util');
1112
const internalUtil = require('internal/util');
1213
const Stream = require('stream');
@@ -108,6 +109,31 @@ function WritableState(options, stream) {
108109

109110
// True if the error was already emitted and should not be thrown again
110111
this.errorEmitted = false;
112+
113+
// count buffered requests
114+
this.bufferedRequestCount = 0;
115+
116+
// the requests that needs to be called by uncork
117+
this.corkedCbs = null;
118+
119+
// call all the corked requests
120+
this.afterCorkedWrite = function afterCorkedWrite(err) {
121+
var state = stream._writableState;
122+
var entry = state.corkedCbs;
123+
var cbs = entry.cbs;
124+
125+
state.corkedCbs = entry.next;
126+
127+
for (var i = 0; i < cbs.length; i++) {
128+
state.pendingcb--;
129+
cbs[i](err);
130+
}
131+
};
132+
}
133+
134+
function CorkedCbs(cbs) {
135+
this.cbs = cbs;
136+
this.next = null;
111137
}
112138

113139
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
@@ -274,6 +300,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
274300
} else {
275301
state.bufferedRequest = state.lastBufferedRequest;
276302
}
303+
state.bufferedRequestCount++;
277304
} else {
278305
doWrite(stream, state, false, len, chunk, encoding, cb);
279306
}
@@ -362,27 +389,37 @@ function onwriteDrain(stream, state) {
362389
function clearBuffer(stream, state) {
363390
state.bufferProcessing = true;
364391
var entry = state.bufferedRequest;
392+
var bufferedRequestCount = state.bufferedRequestCount;
393+
394+
state.bufferedRequestCount = 0;
365395

366396
if (stream._writev && entry && entry.next) {
367397
// Fast case, write everything using _writev()
368-
var buffer = [];
369-
var cbs = [];
398+
var buffer = new Array(bufferedRequestCount);
399+
var cbs = new Array(bufferedRequestCount);
400+
var count = 0;
401+
370402
while (entry) {
371-
cbs.push(entry.callback);
372-
buffer.push(entry);
403+
cbs[count] = entry.callback;
404+
buffer[count] = entry;
405+
count++;
373406
entry = entry.next;
374407
}
375408

376409
// count the one we are adding, as well.
377-
// TODO(isaacs) clean this up
378410
state.pendingcb++;
379411
state.lastBufferedRequest = null;
380-
doWrite(stream, state, true, state.length, buffer, '', function(err) {
381-
for (var i = 0; i < cbs.length; i++) {
382-
state.pendingcb--;
383-
cbs[i](err);
384-
}
385-
});
412+
413+
if (state.corkedCbs) {
414+
// only two corkedCbs objects are supported
415+
assert(!state.corkedCbs.next);
416+
state.corkedCbs.next = new CorkedCbs(cbs);
417+
} else {
418+
state.corkedCbs = new CorkedCbs(cbs);
419+
}
420+
421+
doWrite(stream, state, true, state.length, buffer, '',
422+
state.afterCorkedWrite);
386423

387424
// Clear buffer
388425
} else {

0 commit comments

Comments
 (0)