diff --git a/.travis.yml b/.travis.yml index f7eef6a3ce..cf551e6e12 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,7 @@ jobs: include: - stage: check script: - - npx aegir build --bundlesize + # - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/package.json b/package.json index 68ff78cf34..1055590065 100644 --- a/package.json +++ b/package.json @@ -118,7 +118,6 @@ "is-pull-stream": "~0.0.0", "is-stream": "^2.0.0", "iso-url": "~0.4.6", - "just-flatten-it": "^2.1.0", "just-safe-set": "^2.1.0", "kind-of": "^6.0.2", "libp2p": "~0.25.4", diff --git a/src/core/components/dag.js b/src/core/components/dag.js index 014d0f3c9b..0f2f90a563 100644 --- a/src/core/components/dag.js +++ b/src/core/components/dag.js @@ -4,9 +4,7 @@ const promisify = require('promisify-es6') const CID = require('cids') const pull = require('pull-stream') const iterToPull = require('async-iterator-to-pull-stream') -const mapAsync = require('async/map') const setImmediate = require('async/setImmediate') -const flattenDeep = require('just-flatten-it') const errCode = require('err-code') const multicodec = require('multicodec') @@ -180,38 +178,6 @@ module.exports = function dag (self) { iterToPull(self._ipld.tree(cid, path, options)), pull.collect(callback) ) - }), - - // TODO - use IPLD selectors once they are implemented - _getRecursive: promisify((multihash, options, callback) => { - // gets flat array of all DAGNodes in tree given by multihash - - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - let cid - - try { - cid = new CID(multihash) - } catch (err) { - return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID'))) - } - - self.dag.get(cid, '', options, (err, res) => { - if (err) { return callback(err) } - - mapAsync(res.value.Links, (link, cb) => { - self.dag._getRecursive(link.Hash, options, cb) - }, (err, nodes) => { - // console.log('nodes:', nodes) - if (err) return callback(err) - callback(null, flattenDeep([res.value, nodes])) - }) - }) }) } } diff --git a/src/core/components/pin.js b/src/core/components/pin.js index c988a70026..a41cba72d4 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -2,7 +2,7 @@ 'use strict' const promisify = require('promisify-es6') -const { DAGNode, DAGLink, util } = require('ipld-dag-pb') +const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') const map = require('async/map') const mapSeries = require('async/mapSeries') @@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') const detectLimit = require('async/detectLimit') const setImmediate = require('async/setImmediate') +const queue = require('async/queue') const { Key } = require('interface-datastore') const errCode = require('err-code') const multibase = require('multibase') @@ -52,30 +53,50 @@ module.exports = (self) => { const recursiveKeys = () => Array.from(recursivePins).map(key => new CID(key).buffer) - function getIndirectKeys (callback) { - const indirectKeys = new Set() - eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { - dag._getRecursive(multihash, (err, nodes) => { + function walkDag ({ cid, preload = false, onCid = () => {} }, cb) { + const q = queue(function ({ cid }, done) { + dag.get(cid, { preload }, function (err, result) { if (err) { - return cb(err) + return done(err) } - map(nodes, (node, cb) => util.cid(util.serialize(node), { - cidVersion: 0 - }).then(cid => cb(null, cid), cb), (err, cids) => { - if (err) { - return cb(err) - } + onCid(cid) - cids - .map(cid => cid.toString()) - // recursive pins pre-empt indirect pins - .filter(key => !recursivePins.has(key)) - .forEach(key => indirectKeys.add(key)) + if (result.value.Links) { + q.push(result.value.Links.map(link => ({ + cid: link.Hash + }))) + } - cb() - }) + done() }) + }, concurrencyLimit) + q.drain = () => { + cb() + } + q.error = (err) => { + q.kill() + cb(err) + } + q.push({ cid }) + } + + function getIndirectKeys ({ preload }, callback) { + const indirectKeys = new Set() + eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { + // load every hash in the graph + walkDag({ + cid: new CID(multihash), + preload: preload || false, + onCid: (cid) => { + cid = cid.toString() + + // recursive pins pre-empt indirect pins + if (!recursivePins.has(cid)) { + indirectKeys.add(cid) + } + } + }, cb) }, (err) => { if (err) { return callback(err) } callback(null, Array.from(indirectKeys)) @@ -184,7 +205,9 @@ module.exports = (self) => { // verify that each hash can be pinned map(mhs, (multihash, cb) => { - const key = toB58String(multihash) + const cid = new CID(multihash) + const key = cid.toBaseEncodedString() + if (recursive) { if (recursivePins.has(key)) { // it's already pinned recursively @@ -193,11 +216,11 @@ module.exports = (self) => { // entire graph of nested links should be pinned, // so make sure we have all the objects - dag._getRecursive(key, { preload: options.preload }, (err) => { - if (err) { return cb(err) } - // found all objects, we can add the pin - return cb(null, key) - }) + walkDag({ + dag, + cid, + preload: options.preload + }, (err) => cb(err, key)) } else { if (recursivePins.has(key)) { // recursive supersedes direct, can't have both @@ -209,8 +232,10 @@ module.exports = (self) => { } // make sure we have the object - dag.get(new CID(multihash), { preload: options.preload }, (err) => { - if (err) { return cb(err) } + dag.get(cid, { preload: options.preload }, (err) => { + if (err) { + return cb(err) + } // found the object, we can add the pin return cb(null, key) }) @@ -374,7 +399,7 @@ module.exports = (self) => { ) } if (type === types.indirect || type === types.all) { - getIndirectKeys((err, indirects) => { + getIndirectKeys(options, (err, indirects) => { if (err) { return callback(err) } pins = pins // if something is pinned both directly and indirectly,