Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,9 @@ added:
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* `destroyStream` {boolean} When set to `false`, the stream will not be
closed after take is finished unless the stream had an error.
**Default:** `true`.
* Returns: {Readable} a stream with `limit` chunks taken.

This method returns a new stream with the first `limit` chunks.
Expand All @@ -2447,6 +2450,26 @@ import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

Using the `destroyStream: false` option prevents `take` from closing the stream,
so that the remaining stream data can be consumed later on

```mjs
import fs from 'node:fs';
import { Readable } from 'node:stream';

const csvParsedStream = fs
.createReadStream('file.csv')
.compose(myAwesomeParseCSV());

const [columns] = await csvParsedStream
.take(1)
.toArray();

const parsed = await csvParsedStream
.map((row) => parseRowByColumns(row, columns))
.toArray();
```

##### `readable.asIndexedPairs([options])`

<!-- YAML
Expand Down
29 changes: 20 additions & 9 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {
validateAbortSignal,
validateInteger,
validateObject,
validateBoolean,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -397,24 +398,34 @@ function take(number, options = undefined) {
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
if (options?.destroyStream != null) {
validateBoolean(options.destroyStream, 'options.destroyStream');
}
Comment on lines +401 to +403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another name would be better? Not sure what though... @benjamingr


number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
try {
for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
}
// Don't get another item from iterator in case we reached the end
if (number <= 0) {
return;
}
}
if (number-- > 0) {
yield val;
} catch (e) {
if (!this.destroyed) {
this.destroy(e);
}

// Don't get another item from iterator in case we reached the end
if (number <= 0) {
return;
}
throw e;
}
}.call(this);
}
Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,52 @@ const naturals = () => from(async function*() {
throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
(async () => {
const streamShouldCloseWithoutOption = from([1, 2, 3, 4, 5]);

// Close stream by default
await streamShouldCloseWithoutOption.take(2).toArray();
strictEqual(streamShouldCloseWithoutOption.destroyed, true);
})().then(common.mustCall());
}

{
(async () => {
const streamShouldCloseWithOption = from([1, 2, 3, 4, 5]);

await streamShouldCloseWithOption.take(2, { destroyStream: true }).toArray();
strictEqual(streamShouldCloseWithOption.destroyed, true);
})().then(common.mustCall());
}

{
(async () => {
const streamShouldNotClose = from([1, 2, 3, 4, 5]);

// Do not close stream
await streamShouldNotClose.take(2, { destroyStream: false }).toArray();
strictEqual(streamShouldNotClose.destroyed, false);

deepStrictEqual(await streamShouldNotClose.toArray(), [3, 4, 5]);
strictEqual(streamShouldNotClose.destroyed, true);
})().then(common.mustCall());
}

{
const errorToThrow = new Error('should close');

const streamShouldNotClose = from((function *() {
yield 1;
throw errorToThrow;
})());

streamShouldNotClose.take(3, { destroyStream: false })
.toArray()
.then(common.mustNotCall())
.catch(common.mustCall((error) => {
strictEqual(streamShouldNotClose.destroyed, true);
strictEqual(error, errorToThrow);
}));
}