Skip to content

Commit 492bb00

Browse files
committed
streams: use Array for Readable buffer
1 parent 25576b5 commit 492bb00

File tree

3 files changed

+110
-24
lines changed

3 files changed

+110
-24
lines changed

benchmark/streams/readable-bigread.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function main({ n }) {
1515

1616
bench.start();
1717
for (let k = 0; k < n; ++k) {
18-
for (let i = 0; i < 1e4; ++i)
18+
for (let i = 0; i < 1e3; ++i)
1919
s.push(b);
2020
while (s.read(128));
2121
}

benchmark/streams/readable-readall.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function main({ n }) {
1515

1616
bench.start();
1717
for (let k = 0; k < n; ++k) {
18-
for (let i = 0; i < 1e4; ++i)
18+
for (let i = 0; i < 1e3; ++i)
1919
s.push(b);
2020
while (s.read());
2121
}

lib/internal/streams/readable.js

Lines changed: 108 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const {
7373
const { validateObject } = require('internal/validators');
7474

7575
const kState = Symbol('kState');
76+
const FastBuffer = Buffer[Symbol.species];
7677

7778
const { StringDecoder } = require('string_decoder');
7879
const from = require('internal/streams/from');
@@ -278,7 +279,8 @@ function ReadableState(options, stream, isDuplex) {
278279
// A linked list is used to store data chunks instead of an array because the
279280
// linked list can remove elements from the beginning faster than
280281
// array.shift().
281-
this.buffer = new BufferList();
282+
this.buffer = [];
283+
this.bufferIndex = 0;
282284
this.length = 0;
283285
this.pipes = [];
284286

@@ -546,10 +548,15 @@ function addChunk(stream, state, chunk, addToFront) {
546548
} else {
547549
// Update the buffer info.
548550
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
549-
if (addToFront)
550-
state.buffer.unshift(chunk);
551-
else
551+
if (addToFront) {
552+
if (state.bufferIndex > 0) {
553+
state.buffer[--state.bufferIndex] = chunk;
554+
} else {
555+
state.buffer.unshift(chunk); // Slow path
556+
}
557+
} else {
552558
state.buffer.push(chunk);
559+
}
553560

554561
if ((state[kState] & kNeedReadable) !== 0)
555562
emitReadable(stream);
@@ -564,21 +571,24 @@ Readable.prototype.isPaused = function() {
564571

565572
// Backwards compatibility.
566573
Readable.prototype.setEncoding = function(enc) {
574+
const state = this._readableState;
575+
567576
const decoder = new StringDecoder(enc);
568-
this._readableState.decoder = decoder;
577+
state.decoder = decoder;
569578
// If setEncoding(null), decoder.encoding equals utf8.
570-
this._readableState.encoding = this._readableState.decoder.encoding;
579+
state.encoding = state.decoder.encoding;
571580

572-
const buffer = this._readableState.buffer;
573581
// Iterate over current buffer to convert already stored Buffers:
574582
let content = '';
575-
for (const data of buffer) {
583+
for (const data of state.buffer.slice(state.bufferIndex)) {
576584
content += decoder.write(data);
577585
}
578-
buffer.clear();
586+
state.buffer.length = 0;
587+
state.bufferIndex = 0;
588+
579589
if (content !== '')
580590
buffer.push(content);
581-
this._readableState.length = content.length;
591+
state.length = content.length;
582592
return this;
583593
};
584594

@@ -611,7 +621,7 @@ function howMuchToRead(n, state) {
611621
if (NumberIsNaN(n)) {
612622
// Only flow one buffer at a time.
613623
if ((state[kState] & kFlowing) !== 0 && state.length)
614-
return state.buffer.first().length;
624+
return state.buffer[state.bufferIndex].length;
615625
return state.length;
616626
}
617627
if (n <= state.length)
@@ -1550,20 +1560,96 @@ function fromList(n, state) {
15501560
return null;
15511561

15521562
let ret;
1553-
if (state.objectMode)
1554-
ret = state.buffer.shift();
1555-
else if (!n || n >= state.length) {
1563+
if ((state[kState] & kObjectMode) !== 0) {
1564+
ret = state.buffer[state.bufferIndex++];
1565+
} else if (!n || n >= state.length) {
15561566
// Read it all, truncate the list.
1557-
if (state.decoder)
1558-
ret = state.buffer.join('');
1559-
else if (state.buffer.length === 1)
1560-
ret = state.buffer.first();
1561-
else
1562-
ret = state.buffer.concat(state.length);
1563-
state.buffer.clear();
1567+
if ((state[kState] & kDecoder) !== 0) {
1568+
ret = ''
1569+
for (let n = state.bufferIndex; n < state.buffer.length; n++) {
1570+
ret += state.buffer[n];
1571+
}
1572+
} else if (state.buffer.length - state.bufferIndex === 0) {
1573+
ret = Buffer.alloc(0)
1574+
} else if (state.buffer.length - state.bufferIndex === 1) {
1575+
ret = state.buffer[state.bufferIndex];
1576+
} else {
1577+
ret = Buffer.allocUnsafe(n >>> 0);
1578+
let i = 0;
1579+
for (let n = state.bufferIndex; n < state.buffer.length; n++) {
1580+
const data = state.buffer[n];
1581+
ret.set(data, i);
1582+
i += data.length;
1583+
}
1584+
}
1585+
state.buffer.length = 0;
1586+
state.bufferIndex = 0;
15641587
} else {
15651588
// read part of list.
1566-
ret = state.buffer.consume(n, state.decoder);
1589+
1590+
const buf = state.buffer;
1591+
const len = buf.length;
1592+
1593+
let idx = state.bufferIndex;
1594+
1595+
if (n < buf[idx].length) {
1596+
// `slice` is the same for buffers and strings.
1597+
ret = buf[idx].slice(0, n);
1598+
buf[idx] = buf[idx].slice(n);
1599+
} else if (n === data.length) {
1600+
// First chunk is a perfect match.
1601+
ret = buf[idx++];
1602+
} else if ((state[kState] & kDecoder) !== 0) {
1603+
ret = '';
1604+
while (idx < state.buffer.length) {
1605+
const str = buf[idx];
1606+
if (n > str.length) {
1607+
ret += str;
1608+
n -= str.length;
1609+
idx++;
1610+
} else {
1611+
if (n === buf.length) {
1612+
ret += str;
1613+
idx++;
1614+
} else {
1615+
ret += str.slice(0, n);
1616+
buf[idx] = str.slice(n);
1617+
}
1618+
break;
1619+
}
1620+
}
1621+
} else {
1622+
ret = Buffer.allocUnsafe(n);
1623+
1624+
const retLen = n;
1625+
while (idx < len) {
1626+
const data = buf[idx];
1627+
if (n > data.length) {
1628+
ret.set(data, retLen - n);
1629+
n -= data.length;
1630+
idx++;
1631+
} else {
1632+
if (n === data.length) {
1633+
ret.set(data, retLen - n);
1634+
idx++;
1635+
} else {
1636+
ret.set(new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
1637+
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n);
1638+
}
1639+
break;
1640+
}
1641+
}
1642+
}
1643+
1644+
if (idx === buf.length) {
1645+
state.buffer.length = 0;
1646+
state.bufferIndex = 0
1647+
} else if (idx > 1024) {
1648+
state.buffer.splice(0, idx);
1649+
state.bufferIndex = 0;
1650+
} else {
1651+
state.bufferIndex = idx;
1652+
}
15671653
}
15681654

15691655
return ret;

0 commit comments

Comments
 (0)