diff --git a/README.md b/README.md index bc499217..95cc04d0 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,8 @@ Nodes. In the second argument of the importer constructor you can specify the following options: +* `wrap` (boolean, defaults to false): if true, a wrapping node will be created +* `shardSplitThreshold` (positive integer, defaults to 1000): the number of directory entries above which we decide to use a sharding directory builder (instead of the default flat one) * `chunker` (string, defaults to `"fixed"`): the chunking strategy. Now only supports `"fixed"` * `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties: * `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size for the `fixed` chunker. @@ -164,6 +166,9 @@ In the second argument of the importer constructor you can specify the following * `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies * `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree. * `reduceSingleLeafToSelf` (boolean, defaults to `false`): optimization for, when reducing a set of nodes with one node, reduce it to that node. +* `dirBuilder` (object): the options for the directory builder + * `hamt` (object): the options for the HAMT sharded directory builder + * bits (positive integer, defaults to `5`): the number of bits at each bucket of the HAMT ### Example Exporter diff --git a/package.json b/package.json index 03551e3c..fe764806 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,8 @@ "pre-commit": "^1.2.2", "pull-generate": "^2.2.0", "pull-zip": "^2.0.1", - "rimraf": "^2.6.1" + "rimraf": "^2.6.1", + "split": "^1.0.0" }, "dependencies": { "async": "^2.1.5", @@ -58,8 +59,10 @@ "ipld-dag-pb": "^0.11.0", "ipld-resolver": "^0.11.0", "is-ipfs": "^0.3.0", + "left-pad": "^1.1.3", "lodash": "^4.17.4", - "multihashes": "^0.4.4", + "multihashes": "^0.4.5", + "multihashing-async": "^0.4.0", "pull-batch": "^1.0.0", "pull-block": "^1.1.0", "pull-cat": "^1.1.11", @@ -69,7 +72,8 @@ "pull-pushable": "^2.0.1", "pull-stream": "^3.5.0", "pull-traverse": "^1.0.3", - "pull-write": "^1.1.1" + "pull-write": "^1.1.1", + "sparse-array": "^1.3.1" }, "contributors": [ "David Dias ", @@ -83,4 +87,4 @@ "jbenet ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/builder/builder.js b/src/builder/builder.js index 71e32e32..4569210a 100644 --- a/src/builder/builder.js +++ b/src/builder/builder.js @@ -22,7 +22,7 @@ const defaultOptions = { module.exports = function (createChunker, ipldResolver, createReducer, _options) { const options = extend({}, defaultOptions, _options) - return function (source, files) { + return function (source) { return function (items, cb) { parallel(items.map((item) => (cb) => { if (!item.content) { @@ -33,7 +33,6 @@ module.exports = function (createChunker, ipldResolver, createReducer, _options) } if (node) { source.push(node) - files.push(node) } cb() }) @@ -46,7 +45,6 @@ module.exports = function (createChunker, ipldResolver, createReducer, _options) } if (node) { source.push(node) - files.push(node) } cb() }) diff --git a/src/builder/create-build-stream.js b/src/builder/create-build-stream.js index b9e696db..9579363c 100644 --- a/src/builder/create-build-stream.js +++ b/src/builder/create-build-stream.js @@ -3,23 +3,14 @@ const pullPushable = require('pull-pushable') const pullWrite = require('pull-write') -module.exports = function createBuildStream (createStrategy, ipldResolver, flushTree, options) { - const files = [] - +module.exports = function createBuildStream (createStrategy, ipldResolver, options) { const source = pullPushable() const sink = pullWrite( - createStrategy(source, files), + createStrategy(source), null, options.highWaterMark, - (err) => { - if (err) { - source.end(err) - return // early - } - - flushTree(files, ipldResolver, source, source.end) - } + (err) => source.end(err) ) return { diff --git a/src/builder/index.js b/src/builder/index.js index 0a769979..f539c592 100644 --- a/src/builder/index.js +++ b/src/builder/index.js @@ -16,10 +16,9 @@ const defaultOptions = { reduceSingleLeafToSelf: false } -module.exports = function (Chunker, ipldResolver, flushTree, _options) { +module.exports = function (Chunker, ipldResolver, _options) { assert(Chunker, 'Missing chunker creator function') assert(ipldResolver, 'Missing IPLD Resolver') - assert(flushTree, 'Missing flushTree argument') const options = Object.assign({}, defaultOptions, _options) @@ -29,5 +28,5 @@ module.exports = function (Chunker, ipldResolver, flushTree, _options) { const createStrategy = Builder(Chunker, ipldResolver, reducer, options) - return createBuildStream(createStrategy, ipldResolver, flushTree, options) + return createBuildStream(createStrategy, ipldResolver, options) } diff --git a/src/exporter/clean-multihash.js b/src/exporter/clean-multihash.js new file mode 100644 index 00000000..8c46972d --- /dev/null +++ b/src/exporter/clean-multihash.js @@ -0,0 +1,11 @@ +'use strict' + +const mh = require('multihashes') + +module.exports = (multihash) => { + if (Buffer.isBuffer(multihash)) { + return mh.toB58String(multihash) + } + + return multihash +} diff --git a/src/exporter/dir-flat.js b/src/exporter/dir-flat.js new file mode 100644 index 00000000..b19d2dbd --- /dev/null +++ b/src/exporter/dir-flat.js @@ -0,0 +1,36 @@ +'use strict' + +const path = require('path') +const pull = require('pull-stream') +const paramap = require('pull-paramap') +const CID = require('cids') +const cat = require('pull-cat') + +// Logic to export a unixfs directory. +module.exports = dirExporter + +function dirExporter (node, name, ipldResolver, resolve, parent) { + const dir = { + path: name, + hash: node.multihash + } + + return cat([ + pull.values([dir]), + pull( + pull.values(node.links), + pull.map((link) => ({ + path: path.join(name, link.name), + hash: link.multihash + })), + paramap((item, cb) => ipldResolver.get(new CID(item.hash), (err, n) => { + if (err) { + return cb(err) + } + + cb(null, resolve(n.value, item.path, ipldResolver, name, parent)) + })), + pull.flatten() + ) + ]) +} diff --git a/src/exporter/dir-hamt-sharded.js b/src/exporter/dir-hamt-sharded.js new file mode 100644 index 00000000..01171bfb --- /dev/null +++ b/src/exporter/dir-hamt-sharded.js @@ -0,0 +1,48 @@ +'use strict' + +const path = require('path') +const pull = require('pull-stream') +const paramap = require('pull-paramap') +const CID = require('cids') +const cat = require('pull-cat') +const cleanHash = require('./clean-multihash') + +// Logic to export a unixfs directory. +module.exports = shardedDirExporter + +function shardedDirExporter (node, name, ipldResolver, resolve, parent) { + let dir + if (!parent || parent.path !== name) { + dir = [{ + path: name, + hash: cleanHash(node.multihash) + }] + } + + return cat([ + pull.values(dir), + pull( + pull.values(node.links), + pull.map((link) => { + // remove the link prefix (2 chars for the bucket index) + let p = link.name.substring(2) + // another sharded dir or file? + p = p ? path.join(name, p) : name + + return { + name: link.name, + path: p, + hash: link.multihash + } + }), + paramap((item, cb) => ipldResolver.get(new CID(item.hash), (err, n) => { + if (err) { + return cb(err) + } + + cb(null, resolve(n.value, item.path, ipldResolver, (dir && dir[0]) || parent)) + })), + pull.flatten() + ) + ]) +} diff --git a/src/exporter/index.js b/src/exporter/index.js index 33167047..f8f91e07 100644 --- a/src/exporter/index.js +++ b/src/exporter/index.js @@ -1,58 +1,23 @@ 'use strict' -const traverse = require('pull-traverse') const pull = require('pull-stream') const CID = require('cids') const isIPFS = require('is-ipfs') -const util = require('./../util') -const switchType = util.switchType -const cleanMultihash = util.cleanMultihash +const resolve = require('./resolve').resolve +const cleanMultihash = require('./clean-multihash') -const dirExporter = require('./dir') -const fileExporter = require('./file') - -module.exports = (hash, ipldResolver, options) => { +module.exports = (hash, ipldResolver) => { if (!isIPFS.multihash(hash)) { return pull.error(new Error('not valid multihash')) } hash = cleanMultihash(hash) - options = options || {} - - function visitor (item) { - if (!item.hash) { - // having no hash means that this visitor got a file object - // which needs no further resolving. - // No further resolving means that the visitor does not - // need to do anyting else, so he's returning - // an empty stream - - // TODO: perhaps change the pull streams construct. - // Instead of traversing with a visitor, consider recursing. - return pull.empty() - } - return pull( - ipldResolver.getStream(new CID(item.hash)), - pull.map((result) => result.value), - pull.map((node) => switchType( - node, - () => dirExporter(node, item.path, ipldResolver), - () => fileExporter(node, item.path, ipldResolver) - )), - pull.flatten() - ) - } - // Traverse the DAG return pull( ipldResolver.getStream(new CID(hash)), pull.map((result) => result.value), - pull.map((node) => switchType( - node, - () => traverse.widthFirst({path: hash, hash}, visitor), - () => fileExporter(node, hash, ipldResolver) - )), + pull.map((node) => resolve(node, hash, ipldResolver)), pull.flatten() ) } diff --git a/src/exporter/resolve.js b/src/exporter/resolve.js new file mode 100644 index 00000000..53259a9a --- /dev/null +++ b/src/exporter/resolve.js @@ -0,0 +1,30 @@ +'use strict' + +const UnixFS = require('ipfs-unixfs') +const pull = require('pull-stream') + +const resolvers = { + directory: require('./dir-flat'), + 'hamt-sharded-directory': require('./dir-hamt-sharded'), + file: require('./file') +} + +module.exports = Object.assign({ + resolve: resolve, + typeOf: typeOf +}, resolvers) + +function resolve (node, name, ipldResolver, parentNode) { + const type = typeOf(node) + const resolver = resolvers[type] + if (!resolver) { + return pull.error(new Error('Unkown node type ' + type)) + } + let stream = resolver(node, name, ipldResolver, resolve, parentNode) + return stream +} + +function typeOf (node) { + const data = UnixFS.unmarshal(node.data) + return data.type +} diff --git a/src/hamt/bucket.js b/src/hamt/bucket.js new file mode 100644 index 00000000..f15a95aa --- /dev/null +++ b/src/hamt/bucket.js @@ -0,0 +1,289 @@ +'use strict' + +const SparseArray = require('sparse-array') +const map = require('async/map') +const eachSeries = require('async/eachSeries') +const wrapHash = require('./consumable-hash') + +const defaultOptions = { + bits: 8 +} + +// TODO: make HAMT a generic NPM package + +class Bucket { + constructor (options, parent, posAtParent) { + this._options = Object.assign({}, defaultOptions, options) + this._popCount = 0 + this._parent = parent + this._posAtParent = posAtParent + + if (!this._options.hashFn) { + throw new Error('please define an options.hashFn') + } + + // make sure we only wrap options.hashFn once in the whole tree + if (!this._options.hash) { + this._options.hash = wrapHash(this._options.hashFn) + } + this._children = new SparseArray() + } + + static isBucket (o) { + return o instanceof Bucket + } + + put (key, value, callback) { + this._findNewBucketAndPos(key, (err, place) => { + if (err) { + callback(err) + return // early + } + + place.bucket._putAt(place, key, value) + callback() + }) + } + + get (key, callback) { + this._findChild(key, (err, child) => { + if (err) { + callback(err) + } else { + callback(null, child && child.value) + } + }) + } + + del (key, callback) { + this._findPlace(key, (err, place) => { + if (err) { + callback(err) + return // early + } + const child = place.bucket._at(place.pos) + if (child && child.key === key) { + place.bucket._delAt(place.pos) + } + callback(null) + }) + } + + leafCount () { + this._children.reduce((acc, child) => { + if (child instanceof Bucket) { + return acc + child.leafCount() + } + return acc + 1 + }, 0) + } + + childrenCount () { + return this._children.length + } + + onlyChild (callback) { + process.nextTick(() => callback(null, this._children.get(0))) + } + + eachLeafSeries (iterator, callback) { + eachSeries( + this._children.compactArray(), + (child, cb) => { + if (child instanceof Bucket) { + child.eachLeafSeries(iterator, cb) + } else { + iterator(child.key, child.value, cb) + } + }, + callback) + } + + serialize (map, reduce) { + // serialize to a custom non-sparse representation + return reduce(this._children.reduce((acc, child, index) => { + if (child) { + if (child instanceof Bucket) { + acc.push(child.serialize(map, reduce)) + } else { + acc.push(map(child, index)) + } + } + return acc + }, [])) + } + + asyncTransform (asyncMap, asyncReduce, callback) { + asyncTransformBucket(this, asyncMap, asyncReduce, callback) + } + + toJSON () { + return this.serialize(mapNode, reduceNodes) + } + + prettyPrint () { + return JSON.stringify(this.toJSON(), null, ' ') + } + + tableSize () { + return Math.pow(2, this._options.bits) + } + + _findChild (key, callback) { + this._findPlace(key, (err, result) => { + if (err) { + callback(err) + return // early + } + + const child = result.bucket._at(result.pos) + if (child && child.key === key) { + callback(null, child) + } else { + callback(null, undefined) + } + }) + } + + _findPlace (key, callback) { + const hashValue = this._options.hash(key) + hashValue.take(this._options.bits, (err, index) => { + if (err) { + callback(err) + return // early + } + + const child = this._children.get(index) + if (child instanceof Bucket) { + child._findPlace(hashValue, callback) + } else { + const place = { + bucket: this, + pos: index, + hash: hashValue + } + callback(null, place) + } + }) + } + + _findNewBucketAndPos (key, callback) { + this._findPlace(key, (err, place) => { + if (err) { + callback(err) + return // early + } + const child = place.bucket._at(place.pos) + if (child && child.key !== key) { + // conflict + + const bucket = new Bucket(this._options, place.bucket, place.pos) + place.bucket._putObjectAt(place.pos, bucket) + + // put the previous value + bucket._findPlace(child.hash, (err, newPlace) => { + if (err) { + callback(err) + return // early + } + + newPlace.bucket._putAt(newPlace, child.key, child.value) + bucket._findNewBucketAndPos(place.hash, callback) + }) + } else { + // no conflict, we found the place + callback(null, place) + } + }) + } + + _putAt (place, key, value) { + this._putObjectAt(place.pos, { + key: key, + value: value, + hash: place.hash + }) + } + + _putObjectAt (pos, object) { + if (!this._children.get(pos)) { + this._popCount++ + } + this._children.set(pos, object) + } + + _delAt (pos) { + if (this._children.get(pos)) { + this._popCount-- + } + this._children.unset(pos) + this._level() + } + + _level () { + if (this._parent && this._popCount <= 1) { + if (this._popCount === 1) { + // remove myself from parent, replacing me with my only child + const onlyChild = this._children.find(exists) + if (!(onlyChild instanceof Bucket)) { + const hash = onlyChild.hash + hash.untake(this._options.bits) + const place = { + pos: this._posAtParent, + hash: hash + } + this._parent._putAt(place, onlyChild.key, onlyChild.value) + } + } else { + this._parent._delAt(this._posAtParent) + } + } + } + + _at (index) { + return this._children.get(index) + } +} + +function exists (o) { + return Boolean(o) +} + +function mapNode (node, index) { + return node.key +} + +function reduceNodes (nodes) { + return nodes +} + +function asyncTransformBucket (bucket, asyncMap, asyncReduce, callback) { + map( + bucket._children.compactArray(), + (child, callback) => { + if (child instanceof Bucket) { + asyncTransformBucket(child, asyncMap, asyncReduce, callback) + } else { + asyncMap(child, (err, mappedChildren) => { + if (err) { + callback(err) + } else { + callback(null, { + bitField: bucket._children.bitField(), + children: mappedChildren + }) + } + }) + } + }, + (err, mappedChildren) => { + if (err) { + callback(err) + } else { + asyncReduce(mappedChildren, callback) + } + } + ) +} + +module.exports = Bucket diff --git a/src/hamt/consumable-buffer.js b/src/hamt/consumable-buffer.js new file mode 100644 index 00000000..69f6a140 --- /dev/null +++ b/src/hamt/consumable-buffer.js @@ -0,0 +1,82 @@ +'use strict' + +const START_MASKS = [ + 0b11111111, + 0b11111110, + 0b11111100, + 0b11111000, + 0b11110000, + 0b11100000, + 0b11000000, + 0b10000000 +] + +const STOP_MASKS = [ + 0b00000001, + 0b00000011, + 0b00000111, + 0b00001111, + 0b00011111, + 0b00111111, + 0b01111111, + 0b11111111 +] + +module.exports = class ConsumableBuffer { + constructor (value) { + this._value = value + this._currentBytePos = value.length - 1 + this._currentBitPos = 7 + } + + availableBits () { + return this._currentBitPos + 1 + this._currentBytePos * 8 + } + + totalBits () { + return this._value.length * 8 + } + + take (bits) { + let pendingBits = bits + let result = 0 + while (pendingBits && this._haveBits()) { + const byte = this._value[this._currentBytePos] + const availableBits = this._currentBitPos + 1 + const taking = Math.min(availableBits, pendingBits) + const value = byteBitsToInt(byte, availableBits - taking, taking) + result = (result << taking) + value + + pendingBits -= taking + + this._currentBitPos -= taking + if (this._currentBitPos < 0) { + this._currentBitPos = 7 + this._currentBytePos -- + } + } + + return result + } + + untake (bits) { + this._currentBitPos += bits + while (this._currentBitPos > 7) { + this._currentBitPos -= 8 + this._currentBytePos += 1 + } + } + + _haveBits () { + return this._currentBytePos >= 0 + } +} + +function byteBitsToInt (byte, start, length) { + const mask = maskFor(start, length) + return (byte & mask) >>> start +} + +function maskFor (start, length) { + return START_MASKS[start] & STOP_MASKS[Math.min(length + start - 1, 7)] +} diff --git a/src/hamt/consumable-hash.js b/src/hamt/consumable-hash.js new file mode 100644 index 00000000..6fb40aa0 --- /dev/null +++ b/src/hamt/consumable-hash.js @@ -0,0 +1,103 @@ +'use strict' + +const whilst = require('async/whilst') +const ConsumableBuffer = require('./consumable-buffer') + +module.exports = function wrapHash (hashFn) { + return function hashing (value) { + if (value instanceof InfiniteHash) { + // already a hash. return it + return value + } else { + return new InfiniteHash(value, hashFn) + } + } +} + +class InfiniteHash { + constructor (value, hashFn) { + if ((typeof value) !== 'string' && !Buffer.isBuffer(value)) { + throw new Error('can only hash strings or buffers') + } + this._value = value + this._hashFn = hashFn + this._depth = -1 + this._availableBits = 0 + this._currentBufferIndex = 0 + this._buffers = [] + } + + take (bits, callback) { + let pendingBits = bits + whilst( + () => this._availableBits < pendingBits, + (callback) => { + this._produceMoreBits(callback) + }, + (err) => { + if (err) { + callback(err) + return // early + } + + let result = 0 + + // TODO: this is sync, no need to use whilst + whilst( + () => pendingBits > 0, + (callback) => { + const hash = this._buffers[this._currentBufferIndex] + const available = Math.min(hash.availableBits(), pendingBits) + const took = hash.take(available) + result = (result << available) + took + pendingBits -= available + this._availableBits -= available + if (hash.availableBits() === 0) { + this._currentBufferIndex++ + } + callback() + }, + (err) => { + if (err) { + callback(err) + return // early + } + + process.nextTick(() => callback(null, result)) + } + ) + } + ) + } + + untake (bits) { + let pendingBits = bits + while (pendingBits > 0) { + const hash = this._buffers[this._currentBufferIndex] + const availableForUntake = Math.min(hash.totalBits() - hash.availableBits(), pendingBits) + hash.untake(availableForUntake) + pendingBits -= availableForUntake + this._availableBits += availableForUntake + if (this._currentBufferIndex > 0 && hash.totalBits() === hash.availableBits()) { + this._depth-- + this._currentBufferIndex-- + } + } + } + + _produceMoreBits (callback) { + this._depth++ + const value = this._depth ? this._value + this._depth : this._value + this._hashFn(value, (err, hashValue) => { + if (err) { + callback(err) + return // early + } + + const buffer = new ConsumableBuffer(hashValue) + this._buffers.push(buffer) + this._availableBits += buffer.availableBits() + callback() + }) + } +} diff --git a/src/hamt/index.js b/src/hamt/index.js new file mode 100644 index 00000000..bdb5c46f --- /dev/null +++ b/src/hamt/index.js @@ -0,0 +1,9 @@ +'use strict' + +const Bucket = require('./bucket') + +module.exports = function createHAMT (options) { + return new Bucket(options) +} + +module.exports.isBucket = Bucket.isBucket diff --git a/src/importer/dir-flat.js b/src/importer/dir-flat.js new file mode 100644 index 00000000..a8d625b7 --- /dev/null +++ b/src/importer/dir-flat.js @@ -0,0 +1,89 @@ +'use strict' + +const asyncEachSeries = require('async/eachSeries') +const waterfall = require('async/waterfall') +const CID = require('cids') +const dagPB = require('ipld-dag-pb') +const UnixFS = require('ipfs-unixfs') +const DAGLink = dagPB.DAGLink +const DAGNode = dagPB.DAGNode + +class DirFlat { + constructor (props) { + this._children = {} + Object.assign(this, props) + } + + put (name, value, callback) { + this.multihash = undefined + this.size = undefined + this._children[name] = value + process.nextTick(callback) + } + + get (name, callback) { + process.nextTick(() => callback(null, this._children[name])) + } + + childCount () { + return Object.keys(this._children).length + } + + directChildrenCount () { + return this.childCount() + } + + onlyChild (callback) { + process.nextTick(() => callback(null, this._children[Object.keys(this._children)[0]])) + } + + eachChildSeries (iterator, callback) { + asyncEachSeries( + Object.keys(this._children), + (key, callback) => { + iterator(key, this._children[key], callback) + }, + callback + ) + } + + flush (path, ipldResolver, source, callback) { + const links = Object.keys(this._children) + .map((key) => { + const child = this._children[key] + return new DAGLink(key, child.size, child.multihash) + }) + + const dir = new UnixFS('directory') + waterfall( + [ + (callback) => DAGNode.create(dir.marshal(), links, callback), + (node, callback) => { + ipldResolver.put( + node, + { + cid: new CID(node.multihash) + }, + (err) => callback(err, node)) + }, + (node, callback) => { + this.multihash = node.multihash + this.size = node.size + const pushable = { + path: path, + multihash: node.multihash, + size: node.size + } + source.push(pushable) + callback(null, node) + } + ], + callback) + } +} + +module.exports = createDirFlat + +function createDirFlat (props) { + return new DirFlat(props) +} diff --git a/src/importer/dir-sharded.js b/src/importer/dir-sharded.js new file mode 100644 index 00000000..dc474dc6 --- /dev/null +++ b/src/importer/dir-sharded.js @@ -0,0 +1,170 @@ +'use strict' + +const leftPad = require('left-pad') +const whilst = require('async/whilst') +const waterfall = require('async/waterfall') +const CID = require('cids') +const dagPB = require('ipld-dag-pb') +const UnixFS = require('ipfs-unixfs') +const DAGLink = dagPB.DAGLink +const DAGNode = dagPB.DAGNode +const multihashing = require('multihashing-async') + +const Bucket = require('../hamt') + +const hashFn = function (value, callback) { + multihashing(value, 'murmur3-128', (err, hash) => { + if (err) { + callback(err) + } else { + // Multihashing inserts preamble of 2 bytes. Remove it. + // Also, murmur3 outputs 128 bit but, accidently, IPFS Go's + // implementation only uses the first 64, so we must do the same + // for parity.. + const justHash = hash.slice(2, 10) + const length = justHash.length + const result = new Buffer(length) + // TODO: invert buffer because that's how Go impl does it + for (let i = 0; i < length; i++) { + result[length - i - 1] = justHash[i] + } + callback(null, result) + } + }) +} +hashFn.code = 0x22 // TODO: get this from multihashing-async? + +const defaultOptions = { + hashFn: hashFn +} + +class DirSharded { + constructor (props, _options) { + const options = Object.assign({}, defaultOptions, _options) + this._options = options + this._bucket = Bucket(options) + Object.assign(this, props) + } + + put (name, value, callback) { + this._bucket.put(name, value, callback) + } + + get (name, callback) { + this._bucket.get(name, callback) + } + + childCount () { + return this._bucket.leafCount() + } + + directChildrenCount () { + return this._bucket.childrenCount() + } + + onlyChild (callback) { + this._bucket.onlyChild(callback) + } + + eachChildSeries (iterator, callback) { + this._bucket.eachLeafSeries(iterator, callback) + } + + flush (path, ipldResolver, source, callback) { + flush(this._options, this._bucket, path, ipldResolver, source, (err, node) => { + if (err) { + callback(err) + } else { + this.multihash = node.multihash + this.size = node.size + } + callback(null, node) + }) + } +} + +module.exports = createDirSharded + +function createDirSharded (props) { + return new DirSharded(props) +} + +function flush (options, bucket, path, ipldResolver, source, callback) { + const children = bucket._children // TODO: intromission + let index = 0 + const links = [] + whilst( + () => index < children.length, + (callback) => { + const child = children.get(index) + if (child) { + collectChild(child, index, (err) => { + index++ + callback(err) + }) + } else { + index++ + callback() + } + }, + (err) => { + if (err) { + callback(err) + return // early + } + haveLinks(links) + } + ) + + function collectChild (child, index, callback) { + const labelPrefix = leftPad(index.toString(16).toUpperCase(), 2, '0') + if (Bucket.isBucket(child)) { + flush(options, child, path, ipldResolver, null, (err, node) => { + if (err) { + callback(err) + return // early + } + links.push(new DAGLink(labelPrefix, node.size, node.multihash)) + callback() + }) + } else { + const value = child.value + const label = labelPrefix + child.key + links.push(new DAGLink(label, value.size, value.multihash)) + callback() + } + } + + function haveLinks (links) { + // go-ipfs uses little endian, that's why we have to + // reverse the bit field before storing it + const data = new Buffer(children.bitField().reverse()) + const dir = new UnixFS('hamt-sharded-directory', data) + dir.fanout = bucket.tableSize() + dir.hashType = options.hashFn.code + waterfall( + [ + (callback) => DAGNode.create(dir.marshal(), links, callback), + (node, callback) => { + ipldResolver.put( + node, + { + cid: new CID(node.multihash) + }, + (err) => callback(err, node)) + }, + (node, callback) => { + const pushable = { + path: path, + multihash: node.multihash, + size: node.size + } + if (source) { + source.push(pushable) + } + callback(null, node) + } + ], + callback) + } +} diff --git a/src/importer/flat-to-shard.js b/src/importer/flat-to-shard.js new file mode 100644 index 00000000..1b525e98 --- /dev/null +++ b/src/importer/flat-to-shard.js @@ -0,0 +1,74 @@ +'use strict' + +const waterfall = require('async/waterfall') +const DirSharded = require('./dir-sharded') + +module.exports = flatToShard + +function flatToShard (child, dir, threshold, callback) { + maybeFlatToShardOne(dir, threshold, (err, newDir) => { + if (err) { + callback(err) + return // early + } + + const parent = newDir.parent + if (parent) { + waterfall([ + (callback) => { + if (newDir !== dir) { + if (child) { + child.parent = newDir + } + parent.put(newDir.parentKey, newDir, callback) + } else { + callback() + } + }, + (callback) => { + if (parent) { + flatToShard(newDir, parent, threshold, callback) + } else { + callback(null, newDir) + } + } + ], callback) + } else { + // no parent, we're done climbing tree + callback(null, newDir) + } + }) +} + +function maybeFlatToShardOne (dir, threshold, callback) { + if (dir.flat && dir.directChildrenCount() >= threshold) { + definitelyShardOne(dir, callback) + } else { + callback(null, dir) + } +} + +function definitelyShardOne (oldDir, callback) { + const newDir = DirSharded({ + root: oldDir.root, + dir: true, + parent: oldDir.parent, + parentKey: oldDir.parentKey, + path: oldDir.path, + dirty: oldDir.dirty, + flat: false + }) + + oldDir.eachChildSeries( + (key, value, callback) => { + newDir.put(key, value, callback) + }, + (err) => { + if (err) { + callback(err) + } else { + callback(err, newDir) + } + } + ) +} diff --git a/src/importer/index.js b/src/importer/index.js index 03c07e63..8ae157a7 100644 --- a/src/importer/index.js +++ b/src/importer/index.js @@ -1,8 +1,13 @@ 'use strict' +const pause = require('pull-pause') +const pull = require('pull-stream') +const writable = require('pull-write') +const pushable = require('pull-pushable') const assert = require('assert') -const createAndStoreTree = require('./flush-tree') -const Builder = require('../builder') +const setImmediate = require('async/setImmediate') +const DAGBuilder = require('../builder') +const createTreeBuilder = require('./tree-builder') const chunkers = { fixed: require('../chunker/fixed-size') @@ -16,5 +21,78 @@ module.exports = function (ipldResolver, _options) { const options = Object.assign({}, defaultOptions, _options) const Chunker = chunkers[options.chunker] assert(Chunker, 'Unknkown chunker named ' + options.chunker) - return Builder(Chunker, ipldResolver, createAndStoreTree, options) + + let pending = 0 + const waitingPending = [] + + const entry = { + sink: writable( + (nodes, callback) => { + pending += nodes.length + nodes.forEach((node) => entry.source.push(node)) + setImmediate(callback) + }, + null, + 1, + (err) => entry.source.end(err) + ), + source: pushable() + } + + const dagStream = DAGBuilder(Chunker, ipldResolver, options) + + const treeBuilder = createTreeBuilder(ipldResolver, options) + const treeBuilderStream = treeBuilder.stream() + const pausable = pause(() => {}) + + // TODO: transform this entry -> pausable -> -> exit + // into a generic NPM package named something like pull-pause-and-drain + + pull( + entry, + pausable, + dagStream, + pull.map((node) => { + pending-- + if (!pending) { + process.nextTick(() => { + while (waitingPending.length) { + waitingPending.shift()() + } + }) + } + return node + }), + treeBuilderStream + ) + + return { + sink: entry.sink, + source: treeBuilderStream.source, + flush: flush + } + + function flush (callback) { + pausable.pause() + + // wait until all the files entered were + // transformed into DAG nodes + if (!pending) { + proceed() + } else { + waitingPending.push(proceed) + } + + function proceed () { + treeBuilder.flush((err, hash) => { + if (err) { + treeBuilderStream.source.end(err) + callback(err) + return + } + pausable.resume() + callback(null, hash) + }) + } + } } diff --git a/src/importer/tree-builder.js b/src/importer/tree-builder.js new file mode 100644 index 00000000..039dcb83 --- /dev/null +++ b/src/importer/tree-builder.js @@ -0,0 +1,217 @@ +'use strict' + +const eachSeries = require('async/eachSeries') +const eachOfSeries = require('async/eachOfSeries') +const waterfall = require('async/waterfall') +const createQueue = require('async/queue') +const writable = require('pull-write') +const pushable = require('pull-pushable') +const DirFlat = require('./dir-flat') +const flatToShard = require('./flat-to-shard') + +module.exports = createTreeBuilder + +const defaultOptions = { + wrap: false, + shardSplitThreshold: 1000 +} + +function createTreeBuilder (ipldResolver, _options) { + const options = Object.assign({}, defaultOptions, _options) + + const queue = createQueue(consumeQueue, 1) + + // returned stream + let stream = createStream() + + // root node + let tree = DirFlat({ + path: '', + root: true, + dir: true, + dirty: false, + flat: true + }) + + return { + flush: flushRoot, + stream: getStream + } + + function consumeQueue (action, callback) { + const args = action.args.concat(function () { + action.cb.apply(null, arguments) + callback() + }) + action.fn.apply(null, args) + } + + function getStream () { + return stream + } + + function createStream () { + const sink = writable(write, null, 1, ended) + const source = pushable() + + return { + sink: sink, + source: source + } + + function write (elems, callback) { + eachSeries( + elems, + (elem, callback) => { + queue.push({ + fn: addToTree, + args: [elem], + cb: (err) => { + if (err) { + callback(err) + } else { + source.push(elem) + callback() + } + } + }) + }, + callback + ) + } + + function ended (err) { + flushRoot((flushErr) => { + source.end(flushErr || err) + }) + } + } + + // ---- Add to tree + + function addToTree (elem, callback) { + const pathElems = elem.path.split('/').filter(notEmpty) + let parent = tree + const lastIndex = pathElems.length - 1 + + let currentPath = '' + eachOfSeries(pathElems, (pathElem, index, callback) => { + if (currentPath) { + currentPath += '/' + } + currentPath += pathElem + + const last = (index === lastIndex) + parent.dirty = true + parent.multihash = null + parent.size = null + + if (last) { + waterfall([ + (callback) => parent.put(pathElem, elem, callback), + (callback) => flatToShard(null, parent, options.shardSplitThreshold, callback), + (newRoot, callback) => { + tree = newRoot + callback() + } + ], callback) + } else { + parent.get(pathElem, (err, treeNode) => { + if (err) { + callback(err) + return // early + } + let dir = treeNode + if (!dir) { + dir = DirFlat({ + dir: true, + parent: parent, + parentKey: pathElem, + path: currentPath, + dirty: true, + flat: true + }) + } + const parentDir = parent + parent = dir + parentDir.put(pathElem, dir, callback) + }) + } + }, callback) + } + + // ---- Flush + + function flushRoot (callback) { + queue.push({ + fn: flush, + args: ['', tree], + cb: (err, node) => { + if (err) { + callback(err) + } else { + callback(null, node && node.multihash) + } + } + }) + } + + function flush (path, tree, callback) { + if (tree.dir) { + if (tree.root && tree.childCount() > 1 && !options.wrap) { + callback(new Error('detected more than one root')) + return // early + } + tree.eachChildSeries( + (key, child, callback) => { + flush(path ? (path + '/' + key) : key, child, callback) + }, + (err) => { + if (err) { + callback(err) + return // early + } + flushDir(path, tree, callback) + }) + } else { + // leaf node, nothing to do here + process.nextTick(callback) + } + } + + function flushDir (path, tree, callback) { + // don't create a wrapping node unless the user explicitely said so + if (tree.root && !options.wrap) { + tree.onlyChild((err, onlyChild) => { + if (err) { + callback(err) + return // early + } + + callback(null, onlyChild) + }) + + return // early + } + + if (!tree.dirty) { + callback(null, tree.multihash) + return // early + } + + // don't flush directory unless it's been modified + + tree.dirty = false + tree.flush(path, ipldResolver, stream.source, (err, node) => { + if (err) { + callback(err) + } else { + callback(null, node) + } + }) + } +} + +function notEmpty (str) { + return Boolean(str) +} diff --git a/src/util.js b/src/util.js deleted file mode 100644 index 2742316d..00000000 --- a/src/util.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict' - -const UnixFS = require('ipfs-unixfs') -const pull = require('pull-stream') -const mh = require('multihashes') - -exports.switchType = (node, dirHandler, fileHandler) => { - const data = UnixFS.unmarshal(node.data) - const type = data.type - - if (type === 'directory') { - return dirHandler() - } - - if (type === 'file') { - return fileHandler() - } - - return pull.error(new Error('Unkown node type')) -} - -exports.cleanMultihash = (multihash) => { - if (Buffer.isBuffer(multihash)) { - return mh.toB58String(multihash) - } - - return multihash -} diff --git a/test/browser.js b/test/browser.js index 911df173..9f319aae 100644 --- a/test/browser.js +++ b/test/browser.js @@ -42,8 +42,13 @@ describe('IPFS data importing tests on the Browser', function () { // relies on data in the repo // require('./test-exporter')(repo) + require('./test-consumable-buffer') + require('./test-consumable-hash') + require('./test-hamt') require('./test-importer')(repo) + require('./test-importer-flush')(repo) require('./test-import-export')(repo) require('./test-hash-parity-with-go-ipfs')(repo) require('./test-nested-dir-import-export')(repo) + require('./test-dirbuilder-sharding')(repo) }) diff --git a/test/node.js b/test/node.js index abbd4ca9..fce307cb 100644 --- a/test/node.js +++ b/test/node.js @@ -40,9 +40,14 @@ describe('IPFS UnixFS Engine', () => { require('./test-balanced-builder') require('./test-trickle-builder') require('./test-fixed-size-chunker') + require('./test-consumable-buffer') + require('./test-consumable-hash') + require('./test-hamt') require('./test-exporter')(repo) require('./test-importer')(repo) + require('./test-importer-flush')(repo) require('./test-import-export')(repo) require('./test-hash-parity-with-go-ipfs')(repo) require('./test-nested-dir-import-export')(repo) + require('./test-dirbuilder-sharding')(repo) }) diff --git a/test/test-consumable-buffer.js b/test/test-consumable-buffer.js new file mode 100644 index 00000000..d2eb1d60 --- /dev/null +++ b/test/test-consumable-buffer.js @@ -0,0 +1,103 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect + +const ConsumableBuffer = require('../src/hamt/consumable-buffer') + +describe('consumable buffer', () => { + let buf + + it('can create an empty one', () => { + buf = new ConsumableBuffer([]) + }) + + it('from which I can take nothing', () => { + expect(buf.take(0)).to.be.eql(0) + }) + + it('from which I can keep on taking', () => { + expect(buf.take(100)).to.be.eql(0) + expect(buf.take(1000)).to.be.eql(0) + }) + + it('can create one with one zeroed byte', () => { + buf = new ConsumableBuffer([0]) + }) + + it('from which I can take nothing', () => { + expect(buf.take(0)).to.be.eql(0) + }) + + it('from which I can keep on taking', () => { + expect(buf.take(100)).to.be.eql(0) + expect(buf.take(1000)).to.be.eql(0) + }) + + it('can create one with one byte with ones', () => { + buf = new ConsumableBuffer([0b11111111]) + }) + + it('from which I can take nothing', () => { + expect(buf.take(0)).to.be.eql(0) + }) + + it('from which I can take one bit at a time', () => { + for (let i = 0; i < 8; i++) { + expect(buf.take(1)).to.be.eql(1) + } + }) + + it('should be exhausted', () => { + expect(buf.take(1)).to.be.eql(0) + }) + + it('from which I can keep on taking', () => { + expect(buf.take(100)).to.be.eql(0) + expect(buf.take(1000)).to.be.eql(0) + }) + + it('can create one with 3 full bytes', () => { + buf = new ConsumableBuffer([0xff, 0xff, 0xff]) + }) + + it('from which I can take nothing', () => { + expect(buf.take(0)).to.be.eql(0) + }) + + it('from which I can take one bit at a time', () => { + for (let i = 0; i < 24; i++) { + expect(buf.take(1)).to.be.eql(1) + } + }) + + it('should be exhausted', () => { + expect(buf.take(1)).to.be.eql(0) + }) + + it('can create one with 3 full bytes', () => { + buf = new ConsumableBuffer([0xff, 0xff, 0xff]) + }) + + it('from which I can take 2 bits at a time', () => { + for (let i = 0; i < 12; i++) { + expect(buf.take(2)).to.be.eql(3) + } + }) + + it('should be exhausted', () => { + expect(buf.take(1)).to.be.eql(0) + }) + + it('can create one with 3 full bytes', () => { + buf = new ConsumableBuffer([0xff, 0xff, 0xff]) + }) + + it('from which I can take every bit', () => { + expect(buf.take(24)).to.be.eql(0b111111111111111111111111) + }) + + it('should be exhausted', () => { + expect(buf.take(1)).to.be.eql(0) + }) +}) diff --git a/test/test-consumable-hash.js b/test/test-consumable-hash.js new file mode 100644 index 00000000..8eed3cb8 --- /dev/null +++ b/test/test-consumable-hash.js @@ -0,0 +1,83 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const crypto = require('crypto') +const whilst = require('async/whilst') + +const ConsumableHash = require('../src/hamt/consumable-hash') + +describe('consumable hash', () => { + let hash, h + const maxIter = 100 + const values = [] + + it('can create a hashing function', () => { + hash = ConsumableHash(hashFn) + }) + + it('can take a 0 length value', (callback) => { + hash('some value').take(0, (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.eql(0) + callback() + }) + }) + + it('can take a 10 bit value', (callback) => { + hash('some value').take(10, (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.eql(110) + callback() + }) + }) + + it('can keep on taking a 10 bit value', (callback) => { + h = hash('some value') + let iter = maxIter + whilst( + () => iter > 0, + (callback) => { + h.take(10, (err, result) => { + expect(err).to.not.exist() + values.push(result) + expect(result).to.be.below(1024) + expect(result).to.be.above(0) + iter-- + callback() + }) + }, + callback + ) + }) + + it('can untake all', () => { + h.untake(10 * maxIter) + }) + + it('keeps taking the same values after untaking all', (callback) => { + let iter = maxIter + whilst( + () => iter > 0, + (callback) => { + h.take(10, (err, result) => { + expect(err).to.not.exist() + values.push(result) + expect(result).to.be.eql(values.shift()) + iter-- + callback() + }) + }, + callback + ) + }) +}) + +function hashFn (value, callback) { + callback(null, crypto + .createHash('sha256') + .update(value) + .digest()) +} diff --git a/test/test-dirbuilder-sharding.js b/test/test-dirbuilder-sharding.js new file mode 100644 index 00000000..c7ea7741 --- /dev/null +++ b/test/test-dirbuilder-sharding.js @@ -0,0 +1,339 @@ +/* eslint-env mocha */ +'use strict' + +const importer = require('./../src').importer +const exporter = require('./../src').exporter + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const mh = require('multihashes') +const BlockService = require('ipfs-block-service') +const IPLDResolver = require('ipld-resolver') +const pull = require('pull-stream') +const pushable = require('pull-pushable') +const whilst = require('async/whilst') +const setImmediate = require('async/setImmediate') +const leftPad = require('left-pad') + +module.exports = (repo) => { + describe('dirbuilder sharding', () => { + let ipldResolver + + before(() => { + const bs = new BlockService(repo) + ipldResolver = new IPLDResolver(bs) + }) + + describe('basic dirbuilder', () => { + let nonShardedHash, shardedHash + + it('yields a non-sharded dir', (done) => { + const options = { + shardSplitThreshold: Infinity // never shard + } + + pull( + pull.values([ + { + path: 'a/b', + content: pull.values([new Buffer('i have the best bytes')]) + } + ]), + importer(ipldResolver, options), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + expect(nodes.length).to.be.eql(2) + expect(nodes[0].path).to.be.eql('a/b') + expect(nodes[1].path).to.be.eql('a') + nonShardedHash = nodes[1].multihash + expect(nonShardedHash).to.exist() + done() + }) + ) + }) + + it('yields a sharded dir', (done) => { + const options = { + shardSplitThreshold: 0 // always shard + } + + pull( + pull.values([ + { + path: 'a/b', + content: pull.values([new Buffer('i have the best bytes')]) + } + ]), + importer(ipldResolver, options), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + expect(nodes.length).to.be.eql(2) + expect(nodes[0].path).to.be.eql('a/b') + expect(nodes[1].path).to.be.eql('a') + shardedHash = nodes[1].multihash + // hashes are different + expect(shardedHash).to.not.equal(nonShardedHash) + done() + }) + ) + }) + + it('exporting unsharded hash results in the correct files', (done) => { + pull( + exporter(nonShardedHash, ipldResolver), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + expect(nodes.length).to.be.eql(2) + const expectedHash = mh.toB58String(nonShardedHash) + expect(nodes[0].path).to.be.eql(expectedHash) + expect(mh.toB58String(nodes[0].hash)).to.be.eql(expectedHash) + expect(nodes[1].path).to.be.eql(expectedHash + '/b') + expect(nodes[1].size).to.be.eql(21) + pull( + nodes[1].content, + pull.collect(collected) + ) + }) + ) + + function collected (err, content) { + expect(err).to.not.exist() + expect(content.length).to.be.eql(1) + expect(content[0].toString()).to.be.eql('i have the best bytes') + done() + } + }) + + it('exporting sharded hash results in the correct files', (done) => { + pull( + exporter(shardedHash, ipldResolver), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + expect(nodes.length).to.be.eql(2) + const expectedHash = mh.toB58String(shardedHash) + expect(nodes[0].path).to.be.eql(expectedHash) + expect(nodes[0].hash).to.be.eql(expectedHash) + expect(nodes[1].path).to.be.eql(expectedHash + '/b') + expect(nodes[1].size).to.be.eql(21) + pull( + nodes[1].content, + pull.collect(collected) + ) + }) + ) + + function collected (err, content) { + expect(err).to.not.exist() + expect(content.length).to.be.eql(1) + expect(content[0].toString()).to.be.eql('i have the best bytes') + done() + } + }) + }) + + describe('big dir', () => { + const maxDirs = 2000 + let rootHash + + it('imports a big dir', (done) => { + const push = pushable() + pull( + push, + importer(ipldResolver), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + expect(nodes.length).to.be.eql(maxDirs + 1) + const last = nodes[nodes.length - 1] + expect(last.path).to.be.eql('big') + rootHash = last.multihash + done() + }) + ) + + let pending = maxDirs + let i = 0 + + whilst( + () => pending, + (callback) => { + pending-- + i++ + const pushable = { + path: 'big/' + leftPad(i.toString(), 4, '0'), + content: pull.values([new Buffer(i.toString())]) + } + push.push(pushable) + setImmediate(callback) + }, + (err) => { + expect(err).to.not.exist() + push.end() + } + ) + }) + + it('exports a big dir', (done) => { + const contentEntries = [] + const entries = {} + pull( + exporter(rootHash, ipldResolver), + pull.asyncMap((node, callback) => { + if (node.content) { + pull( + node.content, + pull.collect(collected) + ) + } else { + entries[node.path] = node + callback() + } + + function collected (err, content) { + expect(err).to.not.exist() + entries[node.path] = { content: content.toString() } + callback(null, node) + } + }), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + const paths = Object.keys(entries).sort() + expect(paths.length).to.be.eql(2001) + paths.forEach(eachPath) + done() + }) + ) + + function eachPath (path, index) { + if (!index) { + // first dir + expect(path).to.be.eql(mh.toB58String(rootHash)) + const entry = entries[path] + expect(entry).to.exist() + expect(entry.content).to.not.exist() + return + } + // dir entries + const content = entries[path] && entries[path].content + if (content) { + expect(content).to.be.eql(index.toString()) + contentEntries.push(path) + } + } + }) + }) + + describe('big nested dir', () => { + const maxDirs = 2000 + const maxDepth = 3 + let rootHash + + it('imports a big dir', (done) => { + const push = pushable() + pull( + push, + importer(ipldResolver), + pull.collect((err, nodes) => { + expect(err).to.not.exist() + const last = nodes[nodes.length - 1] + expect(last.path).to.be.eql('big') + rootHash = last.multihash + done() + }) + ) + + let pending = maxDirs + let pendingDepth = maxDepth + let i = 0 + let depth = 1 + + whilst( + () => pendingDepth && pending, + (callback) => { + i++ + const dir = [] + for (let d = 0; d < depth; d++) { + dir.push('big') + } + const pushed = { + path: dir.concat(leftPad(i.toString(), 4, '0')).join('/'), + content: pull.values([new Buffer(i.toString())]) + } + push.push(pushed) + pending-- + if (!pending) { + pendingDepth-- + pending = maxDirs + i = 0 + depth++ + } + setImmediate(callback) + }, + (err) => { + expect(err).to.not.exist() + push.end() + } + ) + }) + + it('exports a big dir', (done) => { + const entries = {} + pull( + exporter(rootHash, ipldResolver), + pull.asyncMap((node, callback) => { + if (node.content) { + pull( + node.content, + pull.collect(collected) + ) + } else { + entries[node.path] = node + callback() + } + + function collected (err, content) { + expect(err).to.not.exist() + entries[node.path] = { content: content.toString() } + callback(null, node) + } + }), + pull.collect(collected) + ) + + function collected (err, nodes) { + expect(err).to.not.exist() + const paths = Object.keys(entries).sort() + expect(paths.length).to.be.eql(maxDepth * maxDirs + maxDepth) + let index = 0 + let depth = 1 + paths.forEach(eachPath) + done() + + function eachPath (path) { + if (!index) { + // first dir + if (depth === 1) { + expect(path).to.be.eql(mh.toB58String(rootHash)) + } + const entry = entries[path] + expect(entry).to.exist() + expect(entry.content).to.not.exist() + } else { + // dir entries + const pathElements = path.split('/') + expect(pathElements.length).to.be.eql(depth + 1) + const lastElement = pathElements[pathElements.length - 1] + expect(lastElement).to.be.eql(leftPad(index.toString(), 4, '0')) + expect(entries[path].content).to.be.eql(index.toString()) + } + index++ + if (index > maxDirs) { + index = 0 + depth++ + } + } + } + }) + }) + }) +} diff --git a/test/test-hamt.js b/test/test-hamt.js new file mode 100644 index 00000000..4ae49ed2 --- /dev/null +++ b/test/test-hamt.js @@ -0,0 +1,190 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const crypto = require('crypto') +const each = require('async/each') +const eachSeries = require('async/eachSeries') + +const HAMT = require('../src/hamt') + +const hashFn = function (value, callback) { + callback(null, crypto + .createHash('sha256') + .update(value) + .digest()) +} + +const options = { + hashFn: hashFn +} + +describe('HAMT', () => { + describe('basic', () => { + let bucket + it('can create an empty one', () => { + bucket = HAMT(options) + }) + + it('get unknown key returns undefined', (callback) => { + bucket.get('unknown', (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.undefined() + callback() + }) + }) + + it('can put a value', (callback) => { + bucket.put('key', 'value', callback) + }) + + it('can get that value', (callback) => { + bucket.get('key', (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.eql('value') + callback() + }) + }) + + it('can override a value', (callback) => { + bucket.put('key', 'a different value', callback) + }) + + it('can get that value', (callback) => { + bucket.get('key', (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.eql('a different value') + callback() + }) + }) + + it('can remove a non existing value', (callback) => { + bucket.del('a key which does not exist', callback) + }) + + it('can remove an existing value', (callback) => { + bucket.del('key', callback) + }) + + it('get deleted key returns undefined', (callback) => { + bucket.get('key', (err, result) => { + expect(err).to.not.exist() + expect(result).to.be.undefined() + callback() + }) + }) + }) + + describe('many keys', () => { + let bucket, keys, masterHead + + it('can create an empty one', () => { + bucket = HAMT(options) + }) + + it('accepts putting many keys', (callback) => { + const max = 400 + keys = new Array(max) + for (let i = 1; i <= max; i++) { + keys[i - 1] = i.toString() + } + + each(keys, (key, callback) => bucket.put(key, key, callback), callback) + }) + + it('can remove all the keys and still find remaining', (callback) => { + masterHead = keys.pop() + iterate() + + function iterate () { + const head = keys.shift() + if (!head) { + callback() + return // early + } + + bucket.get(head, (err, value) => { + expect(err).to.not.exist() + expect(value).to.be.eql(head) + bucket.del(head, afterDel) + }) + + function afterDel (err) { + expect(err).to.not.exist() + bucket.get(head, afterGet) + } + + function afterGet (err, value) { + expect(err).to.not.exist() + expect(value).to.be.undefined() + + each( + keys, + onEachKey, + reiterate + ) + } + } + + function onEachKey (key, callback) { + bucket.get(key, (err, value) => { + expect(err).to.not.exist() + expect(value).to.be.eql(key) + callback() + }) + } + + function reiterate (err) { + expect(err).to.not.exist() + // break from stack on next iteration + process.nextTick(iterate) + } + }) + + it('collapsed all the buckets', () => { + expect(bucket.toJSON()).to.be.eql([masterHead]) + }) + + it('can still find sole head', (callback) => { + bucket.get(masterHead, (err, value) => { + expect(err).to.not.exist() + expect(value).to.be.eql(masterHead) + callback() + }) + }) + }) + + describe('exhausting hash', () => { + let bucket + + before(() => { + bucket = HAMT({ + hashFn: smallHashFn, + bits: 2 + }) + }) + + it('iterates', (callback) => { + const size = 300 + const keys = Array(size) + for (let i = 0; i < size; i++) { + keys[i] = i.toString() + } + + eachSeries(keys, (key, callback) => bucket.put(key, key, callback), (err) => { + expect(err).to.not.exist() + callback() + }) + }) + + function smallHashFn (value, callback) { + callback(null, crypto + .createHash('sha256') + .update(value) + .digest() + .slice(0, 2)) // just return the 2 first bytes of the hash + } + }) +}) diff --git a/test/test-importer-flush.js b/test/test-importer-flush.js new file mode 100644 index 00000000..87b6fc72 --- /dev/null +++ b/test/test-importer-flush.js @@ -0,0 +1,198 @@ +/* eslint-env mocha */ +'use strict' + +const createImporter = require('./../src').importer + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const BlockService = require('ipfs-block-service') +const IPLDResolver = require('ipld-resolver') +const pull = require('pull-stream') +const pushable = require('pull-pushable') + +module.exports = (repo) => { + describe('importer flush', () => { + let ipldResolver + + before(() => { + const bs = new BlockService(repo) + ipldResolver = new IPLDResolver(bs) + }) + + it('can push a single root file and flush yields no dirs', (done) => { + const source = pushable() + const importer = createImporter(ipldResolver) + pull( + source, + importer, + pull.map(node => { + expect(node.path).to.be.eql('a') + return node + }), + pull.collect((err, files) => { + expect(err).to.not.exist() + expect(files.length).to.be.eql(1) + done() + }) + ) + + source.push({ + path: 'a', + content: pull.values([new Buffer('hey')]) + }) + + importer.flush((err, hash) => { + expect(err).to.not.exist() + expect(Buffer.isBuffer(hash)).to.be.true() + source.end() + }) + }) + + it('can push a nested file and flush yields parent dir', (done) => { + const source = pushable() + const importer = createImporter(ipldResolver) + let count = 0 + pull( + source, + importer, + pull.map(function (node) { + count++ + if (count === 1) { + expect(node.path).to.be.eql('b/c') + } else if (count === 2) { + expect(node.path).to.be.eql('b') + } + return node + }), + pull.collect((err, files) => { + expect(err).to.not.exist() + expect(count).to.be.eql(2) + done() + }) + ) + + source.push({ + path: 'b/c', + content: pull.values([new Buffer('hey')]) + }) + + importer.flush((err, hash) => { + expect(err).to.not.exist() + expect(Buffer.isBuffer(hash)).to.be.true() + source.end() + }) + }) + + it('can flush many times, always coherent', (done) => { + const maxDepth = 4 + const maxEntriesPerDir = 3 + + let count = 0 + const tree = { children: {}, path: '', depth: 0, yielded: true } + let currentDir = tree + + const source = pushable() + const importer = createImporter(ipldResolver) + + pull( + source, + importer, + pull.map((node) => { + count++ + markDirAsYielded(node) + return node + }), + pull.collect((err, files) => { + expect(err).to.not.exist() + expect(count).to.be.eql(2) + done() + }) + ) + + pushAndFlush() + + function pushAndFlush () { + const childCount = Object.keys(currentDir.children).length + const newDirName = childCount.toString() + const dirPath = currentDir.path + (currentDir.depth > 0 ? '/' : '') + newDirName + const newDir = { + children: {}, + path: dirPath, + depth: currentDir.depth + 1, + yielded: false, + parent: currentDir + } + currentDir.children[newDirName] = newDir + markAncestorsAsDirty(currentDir) + + const filePath = dirPath + '/filename' + const file = { + path: filePath, + content: pull.values([new Buffer('file with path ' + filePath)]) + } + source.push(file) + if (currentDir.depth === 0 || childCount + 1 === maxEntriesPerDir) { + currentDir = newDir + } + importer.flush((err, hash) => { + expect(err).to.not.exist() + expect(Buffer.isBuffer(hash)).to.be.true() + testAllYielded(tree) + if (currentDir.depth < maxDepth) { + pushAndFlush() + } else { + expect(count).to.be.eql(38) + done() + } + }) + } + + function markDirAsYielded (node) { + const dir = findDir(tree, node.path) + if (node.path === dir.path) { + expect(dir.yielded).to.be.false() + dir.yielded = true + } + } + + function findDir (tree, path) { + const pathElems = path.split('/').filter(notEmpty) + const child = tree.children[pathElems.shift()] + if (!child) { + return tree + } + if (pathElems.length) { + return findDir(child, pathElems.join('/')) + } else { + return child + } + } + + function testAllYielded (tree) { + if (tree.depth) { + expect(tree.yielded).to.be.true() + } + const childrenNames = Object.keys(tree.children) + childrenNames.forEach((childName) => { + const child = tree.children[childName] + testAllYielded(child) + }) + } + + function markAncestorsAsDirty (dir) { + dir.yielded = false + while (dir) { + dir = dir.parent + if (dir) { + dir.yielded = false + } + } + } + }) + }) +} + +function notEmpty (str) { + return Boolean(str) +} diff --git a/test/test-importer.js b/test/test-importer.js index 0926fb03..4fedead2 100644 --- a/test/test-importer.js +++ b/test/test-importer.js @@ -156,6 +156,7 @@ module.exports = (repo) => { }, strategyOverrides[strategy]) const expected = extend({}, defaultResults, strategies[strategy]) + describe(strategy + ' importer', () => { let ipldResolver