From b5acb248d066a8f22ae2470d41f5fd6ff7117132 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Tue, 24 May 2016 11:46:46 -0700 Subject: [PATCH 1/6] Rename "stream" to "content" in tuples. --- README.md | 6 +++--- src/exporter.js | 6 +++--- src/importer.js | 4 ++-- test/test-exporter.js | 8 ++++---- test/test-importer.js | 18 +++++++++--------- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 6a1ced84..79badbfc 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,8 @@ const res = [] const rs = fs.createReadStream(file) const rs2 = fs.createReadStream(file2) -const input = {path: /tmp/foo/bar, stream: rs} -const input2 = {path: /tmp/foo/quxx, stream: rs2} +const input = {path: /tmp/foo/bar, content: rs} +const input2 = {path: /tmp/foo/quxx, content: rs2} // Listen for the data event from the importer stream @@ -138,7 +138,7 @@ exportEvent.on('data', (result) => { const Importer = require('ipfs-unixfs-engine').exporter ``` -The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service. +The exporter is a readable stream in object mode that returns an object ```{ content: stream, path: 'path' }``` by the multihash of the file from the dag service. ## install diff --git a/src/exporter.js b/src/exporter.js index dacff549..3bfaaab0 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -42,7 +42,7 @@ function Exporter (hash, dagService, options) { rs.push(unmarshaledData.data) rs.push(null) } - this.push({ stream: rs, path: name }) + this.push({ content: rs, path: name }) callback() return } else { @@ -75,7 +75,7 @@ function Exporter (hash, dagService, options) { return }) } - this.push({ stream: rs, path: name }) + this.push({ content: rs, path: name }) callback() return } @@ -97,7 +97,7 @@ function Exporter (hash, dagService, options) { rs.push(node.data) rs.push(null) } - this.push({stream: null, path: name}) + this.push({content: null, path: name}) callback() return } else { diff --git a/src/importer.js b/src/importer.js index a5506607..cda4e34e 100644 --- a/src/importer.js +++ b/src/importer.js @@ -36,7 +36,7 @@ function Importer (dagService, options) { this._write = (fl, enc, next) => { this.read() counter++ - if (!fl.stream) { + if (!fl.content) { // 1. create the empty dir dag node // 2. write it to the dag store // 3. add to the files array {path: <>, hash: <>} @@ -64,7 +64,7 @@ function Importer (dagService, options) { } const leaves = [] - fl.stream + fl.content .pipe(fsc(CHUNK_SIZE)) .pipe(through2((chunk, enc, cb) => { // 1. create the unixfs merkledag node diff --git a/test/test-exporter.js b/test/test-exporter.js index 91346eaf..8e4f79db 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -33,7 +33,7 @@ module.exports = function (repo) { expect(err).to.not.exist const testExport = exporter(hash, ds) testExport.on('data', (file) => { - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(err).to.not.exist expect(bldata).to.deep.equal(unmarsh.data) done() @@ -48,7 +48,7 @@ module.exports = function (repo) { const ds = new DAGService(bs) const testExport = exporter(hash, ds) testExport.on('data', (file) => { - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(bldata).to.deep.equal(bigFile) expect(err).to.not.exist done() @@ -63,7 +63,7 @@ module.exports = function (repo) { const testExport = exporter(hash, ds) testExport.on('data', (file) => { expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE') - file.stream.pipe(bl((err, bldata) => { + file.content.pipe(bl((err, bldata) => { expect(err).to.not.exist done() })) @@ -94,7 +94,7 @@ module.exports = function (repo) { const ds = new DAGService(bs) const testExport = exporter(hash, ds) testExport.on('data', (dir) => { - expect(dir.stream).to.equal(null) + expect(dir.content).to.equal(null) done() }) }) diff --git a/test/test-importer.js b/test/test-importer.js index edf77f03..09acaf44 100644 --- a/test/test-importer.js +++ b/test/test-importer.js @@ -38,7 +38,7 @@ module.exports = function (repo) { expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8') expect(obj.size).to.equal(211) }) - i.write({path: '200Bytes.txt', stream: r}) + i.write({path: '200Bytes.txt', content: r}) i.end() i.on('end', () => { done() @@ -69,7 +69,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'foo/bar/200Bytes.txt', stream: r}) + i.write({path: 'foo/bar/200Bytes.txt', content: r}) i.end() }) @@ -85,7 +85,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: '1.2MiB.txt', stream: r}) + i.write({path: '1.2MiB.txt', content: r}) i.end() }) @@ -106,7 +106,7 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'foo-big/1.2MiB.txt', stream: r}) + i.write({path: 'foo-big/1.2MiB.txt', content: r}) i.end() }) @@ -156,8 +156,8 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'pim/200Bytes.txt', stream: r1}) - i.write({path: 'pim/1.2MiB.txt', stream: r2}) + i.write({path: 'pim/200Bytes.txt', content: r1}) + i.write({path: 'pim/1.2MiB.txt', content: r2}) i.end() }) @@ -195,9 +195,9 @@ module.exports = function (repo) { i.on('end', () => { done() }) - i.write({path: 'pam/pum/200Bytes.txt', stream: r1}) - i.write({path: 'pam/pum/1.2MiB.txt', stream: r2}) - i.write({path: 'pam/1.2MiB.txt', stream: r3}) + i.write({path: 'pam/pum/200Bytes.txt', content: r1}) + i.write({path: 'pam/pum/1.2MiB.txt', content: r2}) + i.write({path: 'pam/1.2MiB.txt', content: r3}) i.end() }) }) From b177a08d2d3aae053c84a211f42f29fcecf19110 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Wed, 25 May 2016 10:21:35 -0700 Subject: [PATCH 2/6] Support content as a Buffer. --- package.json | 2 ++ src/importer.js | 14 ++++++++++++++ test/test-importer.js | 26 ++++++++++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/package.json b/package.json index 1d3587e9..834bca3f 100644 --- a/package.json +++ b/package.json @@ -56,8 +56,10 @@ "debug": "^2.2.0", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", + "isstream": "^0.1.2", "readable-stream": "^1.1.13", "run-series": "^1.1.4", + "streamifier": "^0.1.1", "through2": "^2.0.0" }, "contributors": [ diff --git a/src/importer.js b/src/importer.js index cda4e34e..5eaa01e0 100644 --- a/src/importer.js +++ b/src/importer.js @@ -10,6 +10,8 @@ const UnixFS = require('ipfs-unixfs') const util = require('util') const bs58 = require('bs58') const Duplex = require('readable-stream').Duplex +const isStream = require('isstream') +const streamifier = require('streamifier') exports = module.exports = Importer @@ -63,6 +65,18 @@ function Importer (dagService, options) { return } + // Convert a buffer to a readable stream + if (Buffer.isBuffer(fl.content)) { + const r = streamifier.createReadStream(fl.content) + fl.content = r + } + + // Bail if 'content' is not readable + if (!isStream.isReadable(fl.content)) { + this.emit('error', new Error('"content" is not a Buffer nor Readable stream')) + return + } + const leaves = [] fl.content .pipe(fsc(CHUNK_SIZE)) diff --git a/test/test-importer.js b/test/test-importer.js index 09acaf44..fea1c465 100644 --- a/test/test-importer.js +++ b/test/test-importer.js @@ -29,6 +29,17 @@ module.exports = function (repo) { done() }) + it('bad input', (done) => { + const r = 'banana' + const i = new Importer(ds) + i.on('error', (err) => { + expect(err).to.exist + done() + }) + i.write({path: '200Bytes.txt', content: r}) + i.end() + }) + it('small file (smaller than a chunk)', (done) => { const buffered = smallFile const r = streamifier.createReadStream(buffered) @@ -45,6 +56,21 @@ module.exports = function (repo) { }) }) + it('small file as buffer (smaller than a chunk)', (done) => { + const buffered = smallFile + const i = new Importer(ds) + i.on('data', (obj) => { + expect(obj.path).to.equal('200Bytes.txt') + expect(bs58.encode(obj.multihash)).to.equal('QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8') + expect(obj.size).to.equal(211) + }) + i.write({path: '200Bytes.txt', content: buffered}) + i.end() + i.on('end', () => { + done() + }) + }) + it('small file (smaller than a chunk) inside a dir', (done) => { const buffered = smallFile const r = streamifier.createReadStream(buffered) From 462d2f4a312273199d25639fb65aaace6324e9cb Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Wed, 25 May 2016 16:33:56 -0700 Subject: [PATCH 3/6] End import stream after all writes. Previously it would be possible for certain writes to the DAG Service to occur *after* the stream had ended, meaning the Importer would return objects that did not yet exist in the DAG Service. This change defers stream termination until after those writes have occurred, to preserve the powerful invariant of "element emission => written to dag service". --- src/importer.js | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/importer.js b/src/importer.js index 5eaa01e0..141d8fb0 100644 --- a/src/importer.js +++ b/src/importer.js @@ -238,13 +238,15 @@ function Importer (dagService, options) { // If the value is not an object // add as a link to the dirNode - function traverse (tree, base) { + let pendingWrites = 0 + + function traverse (tree, base, done) { const keys = Object.keys(tree) let tmpTree = tree keys.map((key) => { if (typeof tmpTree[key] === 'object' && !Buffer.isBuffer(tmpTree[key])) { - tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key) + tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key, done) } }) @@ -264,9 +266,24 @@ function Importer (dagService, options) { }) n.data = d.marshal() + + pendingWrites++ dagService.add(n, (err) => { + pendingWrites-- if (err) { this.push({error: 'failed to store dirNode'}) + } else if (base) { + const el = { + path: base, + multihash: n.multihash(), + yes: 'no', + size: n.size() + } + this.push(el) + } + + if (pendingWrites <= 0) { + done() } }) @@ -274,18 +291,14 @@ function Importer (dagService, options) { return } - const el = { - path: base, - multihash: n.multihash(), - size: n.size() - } - this.push(el) - mhIndex[bs58.encode(n.multihash())] = { size: n.size() } return n.multihash() } - /* const rootHash = */ traverse.call(this, fileTree) - this.push(null) + + let self = this + /* const rootHash = */ traverse.call(this, fileTree, null, function () { + self.push(null) + }) } } } From 96a8aa2d2a9a34c6fff520d6b528c99c8b9eddc8 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Thu, 26 May 2016 09:39:23 -0700 Subject: [PATCH 4/6] README <3 --- README.md | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 79badbfc..82ecad12 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ IPFS unixFS Engine =================== -> Import data into an IPFS DAG Service. +> Import & Export data to/from an [IPFS DAG Service][] [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) @@ -74,24 +74,24 @@ When run, the stat of DAG Node is outputted for each file on data event until th ``` { multihash: , - Size: 39243, + size: 39243, path: '/tmp/foo/bar' } { multihash: , - Size: 59843, + size: 59843, path: '/tmp/foo/quxx' } { multihash: , - Size: 93242, + size: 93242, path: '/tmp/foo' } { multihash: , - Size: 94234, + size: 94234, path: '/tmp' } ``` -## API +## Importer API ```js const Importer = require('ipfs-unixfs-engine').importer @@ -99,16 +99,22 @@ const Importer = require('ipfs-unixfs-engine').importer ### const add = new Importer(dag) -The importer is a duplex stream in object mode that writes inputs of tuples -of path and readable streams of data. You can stream an array of files to the -importer, just call the 'end' function to signal that you are done inputting file/s. -Listen to the 'data' for the returned informtion 'multihash, size and path' for -each file added. Listen to the 'end' event from the stream to know when the -importer has finished importing files. Input file paths with directory structure -will preserve the hierarchy in the dag node. +The importer is a object Transform stream that accepts objects of the form + +```js +{ + path: 'a name', + content: (Buffer or Readable stream) +} +``` + +The stream will output IPFS DAG Node stats for the nodes it as added to the DAG +Service. When stats on a node are emitted they are guaranteed to have been +written into the DAG Service's storage mechanism. + +The input's file paths and directory structure will be preserved in the DAG +Nodes. -Uses the [DAG Service](https://github.com/vijayee/js-ipfs-merkle-dag/) instance -`dagService`. ## Example Exporter @@ -133,15 +139,17 @@ exportEvent.on('data', (result) => { } ``` -##API +## Exporter: API ```js const Importer = require('ipfs-unixfs-engine').exporter ``` -The exporter is a readable stream in object mode that returns an object ```{ content: stream, path: 'path' }``` by the multihash of the file from the dag service. +The exporter is a readable stream in object mode that returns an object ```{ +content: stream, path: 'path' }``` by the multihash of the file from the dag +service. -## install +## Install With [npm](https://npmjs.org/) installed, run @@ -149,6 +157,9 @@ With [npm](https://npmjs.org/) installed, run $ npm install ipfs-unixfs-engine ``` -## license +## License ISC + + +[IPFS DAG Service]: https://github.com/vijayee/js-ipfs-merkle-dag/ From f65abd6e403df75de1adaf353256b2ac5ef71531 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Thu, 26 May 2016 09:42:59 -0700 Subject: [PATCH 5/6] more readme tweaks; squash me --- README.md | 16 ++++++++++++---- package.json | 4 ++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 82ecad12..d56c27b1 100644 --- a/README.md +++ b/README.md @@ -141,12 +141,20 @@ exportEvent.on('data', (result) => { ## Exporter: API ```js -const Importer = require('ipfs-unixfs-engine').exporter +const Exporter = require('ipfs-unixfs-engine').exporter ``` -The exporter is a readable stream in object mode that returns an object ```{ -content: stream, path: 'path' }``` by the multihash of the file from the dag -service. +The exporter is a readable stream in object mode that outputs objects of the +form + +```js +{ + path: 'a name', + content: (Buffer or Readable stream) +} +``` + +by the multihash of the file from the DAG Service. ## Install diff --git a/package.json b/package.json index 834bca3f..1f6445b9 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "ipfs-unixfs-engine", "version": "0.8.0", "description": "JavaScript implementation of the unixfs Engine used by IPFS", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "lint": "aegir-lint", @@ -70,4 +70,4 @@ "greenkeeperio-bot ", "nginnever " ] -} \ No newline at end of file +} From 3f07067145506d1d62eea3e1bd8f6894ab2fe0a9 Mon Sep 17 00:00:00 2001 From: Stephen Whitmore Date: Thu, 26 May 2016 09:56:34 -0700 Subject: [PATCH 6/6] Rename "base" to "path" in traverse. --- src/importer.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/importer.js b/src/importer.js index 141d8fb0..aede8925 100644 --- a/src/importer.js +++ b/src/importer.js @@ -240,13 +240,13 @@ function Importer (dagService, options) { let pendingWrites = 0 - function traverse (tree, base, done) { + function traverse (tree, path, done) { const keys = Object.keys(tree) let tmpTree = tree keys.map((key) => { if (typeof tmpTree[key] === 'object' && !Buffer.isBuffer(tmpTree[key])) { - tmpTree[key] = traverse.call(this, tmpTree[key], base ? base + '/' + key : key, done) + tmpTree[key] = traverse.call(this, tmpTree[key], path ? path + '/' + key : key, done) } }) @@ -272,9 +272,9 @@ function Importer (dagService, options) { pendingWrites-- if (err) { this.push({error: 'failed to store dirNode'}) - } else if (base) { + } else if (path) { const el = { - path: base, + path: path, multihash: n.multihash(), yes: 'no', size: n.size() @@ -287,7 +287,7 @@ function Importer (dagService, options) { } }) - if (!base) { + if (!path) { return }