Skip to content

Commit 0f94c6b

Browse files
committed
Revert "stream: simpler and faster Readable async iterator"
This reverts commit 4bb4007.
1 parent 2d52bdf commit 0f94c6b

File tree

7 files changed

+269
-121
lines changed

7 files changed

+269
-121
lines changed

benchmark/streams/readable-async-iterator.js

Lines changed: 0 additions & 38 deletions
This file was deleted.

lib/_stream_readable.js

Lines changed: 5 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ const {
2727
NumberIsNaN,
2828
ObjectDefineProperties,
2929
ObjectSetPrototypeOf,
30-
Promise,
3130
Set,
3231
SymbolAsyncIterator,
3332
Symbol
@@ -60,11 +59,11 @@ const kPaused = Symbol('kPaused');
6059

6160
// Lazy loaded to improve the startup performance.
6261
let StringDecoder;
62+
let createReadableStreamAsyncIterator;
6363
let from;
6464

6565
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6666
ObjectSetPrototypeOf(Readable, Stream);
67-
function nop() {}
6867

6968
const { errorOrDestroy } = destroyImpl;
7069

@@ -1076,68 +1075,13 @@ Readable.prototype.wrap = function(stream) {
10761075
};
10771076

10781077
Readable.prototype[SymbolAsyncIterator] = function() {
1079-
let stream = this;
1080-
1081-
if (typeof stream.read !== 'function') {
1082-
// v1 stream
1083-
const src = stream;
1084-
stream = new Readable({
1085-
objectMode: true,
1086-
destroy(err, callback) {
1087-
destroyImpl.destroyer(src, err);
1088-
callback();
1089-
}
1090-
}).wrap(src);
1078+
if (createReadableStreamAsyncIterator === undefined) {
1079+
createReadableStreamAsyncIterator =
1080+
require('internal/streams/async_iterator');
10911081
}
1092-
1093-
const iter = createAsyncIterator(stream);
1094-
iter.stream = stream;
1095-
return iter;
1082+
return createReadableStreamAsyncIterator(this);
10961083
};
10971084

1098-
async function* createAsyncIterator(stream) {
1099-
let callback = nop;
1100-
1101-
function next(resolve) {
1102-
if (this === stream) {
1103-
callback();
1104-
callback = nop;
1105-
} else {
1106-
callback = resolve;
1107-
}
1108-
}
1109-
1110-
stream
1111-
.on('readable', next)
1112-
.on('error', next)
1113-
.on('end', next)
1114-
.on('close', next);
1115-
1116-
try {
1117-
const state = stream._readableState;
1118-
while (true) {
1119-
const chunk = stream.read();
1120-
if (chunk !== null) {
1121-
yield chunk;
1122-
} else if (state.errored) {
1123-
throw state.errored;
1124-
} else if (state.ended) {
1125-
break;
1126-
} else if (state.closed) {
1127-
// TODO(ronag): ERR_PREMATURE_CLOSE?
1128-
break;
1129-
} else {
1130-
await new Promise(next);
1131-
}
1132-
}
1133-
} catch (err) {
1134-
destroyImpl.destroyer(stream, err);
1135-
throw err;
1136-
} finally {
1137-
destroyImpl.destroyer(stream, null);
1138-
}
1139-
}
1140-
11411085
// Making it explicit these properties are not enumerable
11421086
// because otherwise some prototype manipulation in
11431087
// userland will fail.
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
'use strict';
2+
3+
const {
4+
ObjectCreate,
5+
ObjectGetPrototypeOf,
6+
ObjectSetPrototypeOf,
7+
Promise,
8+
PromiseReject,
9+
PromiseResolve,
10+
Symbol,
11+
} = primordials;
12+
13+
const finished = require('internal/streams/end-of-stream');
14+
const destroyImpl = require('internal/streams/destroy');
15+
16+
const kLastResolve = Symbol('lastResolve');
17+
const kLastReject = Symbol('lastReject');
18+
const kError = Symbol('error');
19+
const kEnded = Symbol('ended');
20+
const kLastPromise = Symbol('lastPromise');
21+
const kHandlePromise = Symbol('handlePromise');
22+
const kStream = Symbol('stream');
23+
24+
let Readable;
25+
26+
function createIterResult(value, done) {
27+
return { value, done };
28+
}
29+
30+
function readAndResolve(iter) {
31+
const resolve = iter[kLastResolve];
32+
if (resolve !== null) {
33+
const data = iter[kStream].read();
34+
// We defer if data is null. We can be expecting either 'end' or 'error'.
35+
if (data !== null) {
36+
iter[kLastPromise] = null;
37+
iter[kLastResolve] = null;
38+
iter[kLastReject] = null;
39+
resolve(createIterResult(data, false));
40+
}
41+
}
42+
}
43+
44+
function onReadable(iter) {
45+
// We wait for the next tick, because it might
46+
// emit an error with `process.nextTick()`.
47+
process.nextTick(readAndResolve, iter);
48+
}
49+
50+
function wrapForNext(lastPromise, iter) {
51+
return (resolve, reject) => {
52+
lastPromise.then(() => {
53+
if (iter[kEnded]) {
54+
resolve(createIterResult(undefined, true));
55+
return;
56+
}
57+
58+
iter[kHandlePromise](resolve, reject);
59+
}, reject);
60+
};
61+
}
62+
63+
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
64+
ObjectGetPrototypeOf(async function* () {}).prototype);
65+
66+
function finish(self, err) {
67+
return new Promise((resolve, reject) => {
68+
const stream = self[kStream];
69+
70+
finished(stream, (err) => {
71+
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
72+
reject(err);
73+
} else {
74+
resolve(createIterResult(undefined, true));
75+
}
76+
});
77+
destroyImpl.destroyer(stream, err);
78+
});
79+
}
80+
81+
const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
82+
get stream() {
83+
return this[kStream];
84+
},
85+
86+
next() {
87+
// If we have detected an error in the meanwhile
88+
// reject straight away.
89+
const error = this[kError];
90+
if (error !== null) {
91+
return PromiseReject(error);
92+
}
93+
94+
if (this[kEnded]) {
95+
return PromiseResolve(createIterResult(undefined, true));
96+
}
97+
98+
if (this[kStream].destroyed) {
99+
return new Promise((resolve, reject) => {
100+
if (this[kError]) {
101+
reject(this[kError]);
102+
} else if (this[kEnded]) {
103+
resolve(createIterResult(undefined, true));
104+
} else {
105+
finished(this[kStream], (err) => {
106+
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
107+
reject(err);
108+
} else {
109+
resolve(createIterResult(undefined, true));
110+
}
111+
});
112+
}
113+
});
114+
}
115+
116+
// If we have multiple next() calls we will wait for the previous Promise to
117+
// finish. This logic is optimized to support for await loops, where next()
118+
// is only called once at a time.
119+
const lastPromise = this[kLastPromise];
120+
let promise;
121+
122+
if (lastPromise) {
123+
promise = new Promise(wrapForNext(lastPromise, this));
124+
} else {
125+
// Fast path needed to support multiple this.push()
126+
// without triggering the next() queue.
127+
const data = this[kStream].read();
128+
if (data !== null) {
129+
return PromiseResolve(createIterResult(data, false));
130+
}
131+
132+
promise = new Promise(this[kHandlePromise]);
133+
}
134+
135+
this[kLastPromise] = promise;
136+
137+
return promise;
138+
},
139+
140+
return() {
141+
return finish(this);
142+
},
143+
144+
throw(err) {
145+
return finish(this, err);
146+
},
147+
}, AsyncIteratorPrototype);
148+
149+
const createReadableStreamAsyncIterator = (stream) => {
150+
if (typeof stream.read !== 'function') {
151+
// v1 stream
152+
153+
if (!Readable) {
154+
Readable = require('_stream_readable');
155+
}
156+
157+
const src = stream;
158+
stream = new Readable({ objectMode: true }).wrap(src);
159+
finished(stream, (err) => destroyImpl.destroyer(src, err));
160+
}
161+
162+
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
163+
[kStream]: { value: stream, writable: true },
164+
[kLastResolve]: { value: null, writable: true },
165+
[kLastReject]: { value: null, writable: true },
166+
[kError]: { value: null, writable: true },
167+
[kEnded]: {
168+
value: stream.readableEnded || stream._readableState.endEmitted,
169+
writable: true
170+
},
171+
// The function passed to new Promise is cached so we avoid allocating a new
172+
// closure at every run.
173+
[kHandlePromise]: {
174+
value: (resolve, reject) => {
175+
const data = iterator[kStream].read();
176+
if (data) {
177+
iterator[kLastPromise] = null;
178+
iterator[kLastResolve] = null;
179+
iterator[kLastReject] = null;
180+
resolve(createIterResult(data, false));
181+
} else {
182+
iterator[kLastResolve] = resolve;
183+
iterator[kLastReject] = reject;
184+
}
185+
},
186+
writable: true,
187+
},
188+
});
189+
iterator[kLastPromise] = null;
190+
191+
finished(stream, { writable: false }, (err) => {
192+
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
193+
const reject = iterator[kLastReject];
194+
// Reject if we are waiting for data in the Promise returned by next() and
195+
// store the error.
196+
if (reject !== null) {
197+
iterator[kLastPromise] = null;
198+
iterator[kLastResolve] = null;
199+
iterator[kLastReject] = null;
200+
reject(err);
201+
}
202+
iterator[kError] = err;
203+
return;
204+
}
205+
206+
const resolve = iterator[kLastResolve];
207+
if (resolve !== null) {
208+
iterator[kLastPromise] = null;
209+
iterator[kLastResolve] = null;
210+
iterator[kLastReject] = null;
211+
resolve(createIterResult(undefined, true));
212+
}
213+
iterator[kEnded] = true;
214+
});
215+
216+
stream.on('readable', onReadable.bind(null, iterator));
217+
218+
return iterator;
219+
};
220+
221+
module.exports = createReadableStreamAsyncIterator;

lib/internal/streams/pipeline.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const {
2323

2424
let EE;
2525
let PassThrough;
26-
let Readable;
26+
let createReadableStreamAsyncIterator;
2727

2828
function destroyer(stream, reading, writing, callback) {
2929
callback = once(callback);
@@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
113113
}
114114

115115
async function* fromReadable(val) {
116-
if (!Readable) {
117-
Readable = require('_stream_readable');
116+
if (!createReadableStreamAsyncIterator) {
117+
createReadableStreamAsyncIterator =
118+
require('internal/streams/async_iterator');
118119
}
119-
120-
yield* Readable.prototype[SymbolAsyncIterator].call(val);
120+
yield* createReadableStreamAsyncIterator(val);
121121
}
122122

123123
async function pump(iterable, writable, finish) {

node.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@
222222
'lib/internal/worker/js_transferable.js',
223223
'lib/internal/watchdog.js',
224224
'lib/internal/streams/lazy_transform.js',
225+
'lib/internal/streams/async_iterator.js',
225226
'lib/internal/streams/buffer_list.js',
226227
'lib/internal/streams/duplexpair.js',
227228
'lib/internal/streams/from.js',

test/parallel/test-readline-async-iterators-destroy.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ async function testMutualDestroy() {
7575
break;
7676
}
7777
assert.deepStrictEqual(iteratedLines, expectedLines);
78-
break;
7978
}
8079

8180
assert.deepStrictEqual(iteratedLines, expectedLines);

0 commit comments

Comments
 (0)