Skip to content

Commit 2e23587

Browse files
committed
streams: add stream.pipe
pipe is similar to pipeline however it supports stream composition.
1 parent 4e17ffc commit 2e23587

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed

lib/internal/streams/pipe.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
'use strict';
3+
4+
const pipeline = require('internal/streams/pipeline');
5+
const Duplex = require('internal/streams/duplex');
6+
7+
module.exports = function pipe(...streams) {
8+
let cb;
9+
let ret;
10+
11+
const r = pipeline(streams, function(err) {
12+
if (cb) {
13+
cb(err);
14+
} else {
15+
ret.destroy(err);
16+
}
17+
});
18+
const w = streams[0];
19+
20+
ret = new Duplex({
21+
writable: !!w?.writable,
22+
readable: !!r?.readable,
23+
objectMode: streams[0].readableObjectMode,
24+
highWaterMark: 1
25+
});
26+
27+
if (ret.writable) {
28+
ret._write = function(chunk, encoding, callback) {
29+
w.write(chunk, encoding, callback);
30+
};
31+
32+
ret._final = function(chunk, encoding, callback) {
33+
w.end(chunk, encoding, callback);
34+
};
35+
}
36+
37+
if (ret.readable) {
38+
ret._read = function() {
39+
r.resume();
40+
};
41+
42+
r
43+
.on('data', function(buf) {
44+
if (!ret.push(buf)) {
45+
this.pause();
46+
}
47+
})
48+
.on('end', function() {
49+
ret.push(null);
50+
});
51+
}
52+
53+
ret._destroy = function(err, callback) {
54+
cb = callback;
55+
streams[0].destroy(err);
56+
};
57+
}

0 commit comments

Comments
 (0)