Skip to content

Commit 01d0351

Browse files
committed
streams: add signal support to pipeline generators
Generators in pipeline must be able to be aborted or pipeline can deadlock.
1 parent 0536be2 commit 01d0351

File tree

5 files changed

+102
-31
lines changed

5 files changed

+102
-31
lines changed

doc/api/stream.md

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,16 +1812,14 @@ const { pipeline } = require('stream/promises');
18121812

18131813
async function run() {
18141814
const ac = new AbortController();
1815-
const options = {
1816-
signal: ac.signal,
1817-
};
1815+
const signal = ac.signal;
18181816

18191817
setTimeout(() => ac.abort(), 1);
18201818
await pipeline(
18211819
fs.createReadStream('archive.tar'),
18221820
zlib.createGzip(),
18231821
fs.createWriteStream('archive.tar.gz'),
1824-
options,
1822+
{ signal },
18251823
);
18261824
}
18271825

@@ -1837,10 +1835,10 @@ const fs = require('fs');
18371835
async function run() {
18381836
await pipeline(
18391837
fs.createReadStream('lowercase.txt'),
1840-
async function* (source) {
1838+
async function* (source, signal) {
18411839
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
18421840
for await (const chunk of source) {
1843-
yield chunk.toUpperCase();
1841+
yield await processChunk(chunk, { signal });
18441842
}
18451843
},
18461844
fs.createWriteStream('uppercase.txt')
@@ -1851,6 +1849,28 @@ async function run() {
18511849
run().catch(console.error);
18521850
```
18531851

1852+
Remember to handle the `signal` argument passed into the async generator.
1853+
Especially in the case where the async generator is the source for the
1854+
pipeline (i.e. first argument) or the pipeline will never complete.
1855+
1856+
```js
1857+
const { pipeline } = require('stream/promises');
1858+
const fs = require('fs');
1859+
1860+
async function run() {
1861+
await pipeline(
1862+
async function * (signal) {
1863+
await someLongRunningfn({ signal });
1864+
yield 'asd';
1865+
},
1866+
fs.createWriteStream('uppercase.txt')
1867+
);
1868+
console.log('Pipeline succeeded.');
1869+
}
1870+
1871+
run().catch(console.error);
1872+
```
1873+
18541874
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
18551875
* `Readable` streams which have emitted `'end'` or `'close'`.
18561876
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -3139,13 +3159,20 @@ the `Readable.from()` utility method:
31393159
```js
31403160
const { Readable } = require('stream');
31413161

3162+
const ac = new AbortController();
3163+
const signal = ac.signal;
3164+
31423165
async function * generate() {
31433166
yield 'a';
3167+
await someLongRunningFn({ signal });
31443168
yield 'b';
31453169
yield 'c';
31463170
}
31473171

31483172
const readable = Readable.from(generate());
3173+
readable.on('close', () => {
3174+
ac.abort();
3175+
});
31493176

31503177
readable.on('data', (chunk) => {
31513178
console.log(chunk);
@@ -3165,21 +3192,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
31653192

31663193
const writable = fs.createWriteStream('./file');
31673194

3195+
const ac = new AbortController();
3196+
const signal = ac.signal;
3197+
3198+
const iterator = createIterator({ signal });
3199+
31683200
// Callback Pattern
31693201
pipeline(iterator, writable, (err, value) => {
31703202
if (err) {
31713203
console.error(err);
31723204
} else {
31733205
console.log(value, 'value returned');
31743206
}
3207+
}).on('close', () => {
3208+
ac.abort();
31753209
});
31763210

31773211
// Promise Pattern
31783212
pipelinePromise(iterator, writable)
31793213
.then((value) => {
31803214
console.log(value, 'value returned');
31813215
})
3182-
.catch(console.error);
3216+
.catch((err) => {
3217+
console.error(err);
3218+
ac.abort();
3219+
});
31833220
```
31843221

31853222
<!--type=misc-->

lib/internal/streams/pipeline.js

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ const {
2222
ERR_STREAM_DESTROYED,
2323
ERR_STREAM_PREMATURE_CLOSE,
2424
},
25+
AbortError,
2526
} = require('internal/errors');
2627

27-
const { validateCallback } = require('internal/validators');
28+
const {
29+
validateCallback,
30+
validateAbortSignal
31+
} = require('internal/validators');
2832

2933
function noop() {}
3034

@@ -34,6 +38,7 @@ const {
3438
isStream,
3539
} = require('internal/streams/utils');
3640
const assert = require('internal/assert');
41+
const { AbortController } = require('internal/abort_controller');
3742

3843
let PassThrough;
3944
let Readable;
@@ -176,19 +181,37 @@ function pipeline(...streams) {
176181
streams = streams[0];
177182
}
178183

184+
return pipelineImpl(streams, callback);
185+
}
186+
187+
function pipelineImpl(streams, callback, opts) {
179188
if (streams.length < 2) {
180189
throw new ERR_MISSING_ARGS('streams');
181190
}
182191

192+
const ac = new AbortController();
193+
const signal = ac.signal;
194+
const outerSignal = opts?.signal;
195+
196+
validateAbortSignal(outerSignal, 'options.signal');
197+
198+
function abort() {
199+
finishImpl(new AbortError());
200+
}
201+
202+
outerSignal?.addEventListener('abort', abort);
203+
183204
let error;
184205
let value;
185206
const destroys = [];
186207

187208
let finishCount = 0;
188209

189210
function finish(err) {
190-
const final = --finishCount === 0;
211+
finishImpl(err, --finishCount === 0);
212+
}
191213

214+
function finishImpl(err, final) {
192215
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
193216
error = err;
194217
}
@@ -201,6 +224,9 @@ function pipeline(...streams) {
201224
destroys.shift()(error);
202225
}
203226

227+
outerSignal?.removeEventListener('abort', abort);
228+
ac.abort();
229+
204230
if (final) {
205231
callback(error, value);
206232
}
@@ -219,7 +245,7 @@ function pipeline(...streams) {
219245

220246
if (i === 0) {
221247
if (typeof stream === 'function') {
222-
ret = stream();
248+
ret = stream(signal);
223249
if (!isIterable(ret)) {
224250
throw new ERR_INVALID_RETURN_VALUE(
225251
'Iterable, AsyncIterable or Stream', 'source', ret);
@@ -233,7 +259,7 @@ function pipeline(...streams) {
233259
}
234260
} else if (typeof stream === 'function') {
235261
ret = makeAsyncIterable(ret);
236-
ret = stream(ret);
262+
ret = stream(ret, signal);
237263

238264
if (reading) {
239265
if (!isIterable(ret, true)) {
@@ -303,10 +329,11 @@ function pipeline(...streams) {
303329
}
304330
}
305331

306-
// TODO(ronag): Consider returning a Duplex proxy if the first argument
307-
// is a writable. Would improve composability.
308-
// See, https://github.com/nodejs/node/issues/32020
332+
if (signal?.aborted || outerSignal?.aborted) {
333+
process.nextTick(abort);
334+
}
335+
309336
return ret;
310337
}
311338

312-
module.exports = pipeline;
339+
module.exports = { pipelineImpl, pipeline };

lib/stream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const {
2929
promisify: { custom: customPromisify },
3030
} = require('internal/util');
3131

32-
const pipeline = require('internal/streams/pipeline');
32+
const { pipeline } = require('internal/streams/pipeline');
3333
const { destroyer } = require('internal/streams/destroy');
3434
const eos = require('internal/streams/end-of-stream');
3535
const internalBuffer = require('internal/buffer');

lib/stream/promises.js

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,12 @@ const {
55
Promise,
66
} = primordials;
77

8-
const {
9-
addAbortSignalNoValidate,
10-
} = require('internal/streams/add-abort-signal');
11-
12-
const {
13-
validateAbortSignal,
14-
} = require('internal/validators');
15-
168
const {
179
isIterable,
1810
isStream,
1911
} = require('internal/streams/utils');
2012

21-
const pl = require('internal/streams/pipeline');
13+
const { pipelineImpl: pl } = require('internal/streams/pipeline');
2214
const eos = require('internal/streams/end-of-stream');
2315

2416
function pipeline(...streams) {
@@ -29,19 +21,15 @@ function pipeline(...streams) {
2921
!isStream(lastArg) && !isIterable(lastArg)) {
3022
const options = ArrayPrototypePop(streams);
3123
signal = options.signal;
32-
validateAbortSignal(signal, 'options.signal');
3324
}
3425

35-
const pipe = pl(...streams, (err, value) => {
26+
pl(streams, (err, value) => {
3627
if (err) {
3728
reject(err);
3829
} else {
3930
resolve(value);
4031
}
41-
});
42-
if (signal) {
43-
addAbortSignalNoValidate(signal, pipe);
44-
}
32+
}, { signal });
4533
});
4634
}
4735

test/parallel/test-stream-pipeline.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ const {
1111
Duplex,
1212
addAbortSignal,
1313
} = require('stream');
14+
const pipelinep = require('stream/promises').pipeline;
1415
const assert = require('assert');
1516
const http = require('http');
1617
const { promisify } = require('util');
1718
const net = require('net');
19+
const tsp = require('timers/promises');
1820

1921
{
2022
let finished = false;
@@ -1420,3 +1422,20 @@ const net = require('net');
14201422

14211423
writableLike.emit('close');
14221424
}
1425+
1426+
{
1427+
const ac = new AbortController();
1428+
const signal = ac.signal;
1429+
pipelinep(
1430+
async function * (signal) {
1431+
await tsp.setTimeout(1e6, signal);
1432+
},
1433+
async function(source) {
1434+
1435+
},
1436+
{ signal }
1437+
).catch(common.mustCall((err) => {
1438+
assert.strictEqual(err.name, 'AbortError');
1439+
}));
1440+
ac.abort();
1441+
}

0 commit comments

Comments
 (0)