Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

sharded dirs and interactive flush #164

Merged
merged 5 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ Nodes.

In the second argument of the importer constructor you can specify the following options:

* `wrap` (boolean, defaults to false): if true, a wrapping node will be created
* `shardSplitThreshold` (positive integer, defaults to 1000): the number of directory entries above which we decide to use a sharding directory builder (instead of the default flat one)
* `chunker` (string, defaults to `"fixed"`): the chunking strategy. Now only supports `"fixed"`
* `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
* `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size for the `fixed` chunker.
Expand All @@ -164,6 +166,9 @@ In the second argument of the importer constructor you can specify the following
* `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies
* `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree.
* `reduceSingleLeafToSelf` (boolean, defaults to `false`): optimization for, when reducing a set of nodes with one node, reduce it to that node.
* `dirBuilder` (object): the options for the directory builder
* `hamt` (object): the options for the HAMT sharded directory builder
* bits (positive integer, defaults to `5`): the number of bits at each bucket of the HAMT

### Example Exporter

Expand Down
12 changes: 8 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"pre-commit": "^1.2.2",
"pull-generate": "^2.2.0",
"pull-zip": "^2.0.1",
"rimraf": "^2.6.1"
"rimraf": "^2.6.1",
"split": "^1.0.0"
},
"dependencies": {
"async": "^2.1.5",
Expand All @@ -58,8 +59,10 @@
"ipld-dag-pb": "^0.11.0",
"ipld-resolver": "^0.11.0",
"is-ipfs": "^0.3.0",
"left-pad": "^1.1.3",
"lodash": "^4.17.4",
"multihashes": "^0.4.4",
"multihashes": "^0.4.5",
"multihashing-async": "^0.4.0",
"pull-batch": "^1.0.0",
"pull-block": "^1.1.0",
"pull-cat": "^1.1.11",
Expand All @@ -69,7 +72,8 @@
"pull-pushable": "^2.0.1",
"pull-stream": "^3.5.0",
"pull-traverse": "^1.0.3",
"pull-write": "^1.1.1"
"pull-write": "^1.1.1",
"sparse-array": "^1.3.1"
},
"contributors": [
"David Dias <[email protected]>",
Expand All @@ -83,4 +87,4 @@
"jbenet <[email protected]>",
"nginnever <[email protected]>"
]
}
}
4 changes: 1 addition & 3 deletions src/builder/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const defaultOptions = {
module.exports = function (createChunker, ipldResolver, createReducer, _options) {
const options = extend({}, defaultOptions, _options)

return function (source, files) {
return function (source) {
return function (items, cb) {
parallel(items.map((item) => (cb) => {
if (!item.content) {
Expand All @@ -33,7 +33,6 @@ module.exports = function (createChunker, ipldResolver, createReducer, _options)
}
if (node) {
source.push(node)
files.push(node)
}
cb()
})
Expand All @@ -46,7 +45,6 @@ module.exports = function (createChunker, ipldResolver, createReducer, _options)
}
if (node) {
source.push(node)
files.push(node)
}
cb()
})
Expand Down
15 changes: 3 additions & 12 deletions src/builder/create-build-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,14 @@
const pullPushable = require('pull-pushable')
const pullWrite = require('pull-write')

module.exports = function createBuildStream (createStrategy, ipldResolver, flushTree, options) {
const files = []

module.exports = function createBuildStream (createStrategy, ipldResolver, options) {
const source = pullPushable()

const sink = pullWrite(
createStrategy(source, files),
createStrategy(source),
null,
options.highWaterMark,
(err) => {
if (err) {
source.end(err)
return // early
}

flushTree(files, ipldResolver, source, source.end)
}
(err) => source.end(err)
)

return {
Expand Down
5 changes: 2 additions & 3 deletions src/builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ const defaultOptions = {
reduceSingleLeafToSelf: false
}

module.exports = function (Chunker, ipldResolver, flushTree, _options) {
module.exports = function (Chunker, ipldResolver, _options) {
assert(Chunker, 'Missing chunker creator function')
assert(ipldResolver, 'Missing IPLD Resolver')
assert(flushTree, 'Missing flushTree argument')

const options = Object.assign({}, defaultOptions, _options)

Expand All @@ -29,5 +28,5 @@ module.exports = function (Chunker, ipldResolver, flushTree, _options) {

const createStrategy = Builder(Chunker, ipldResolver, reducer, options)

return createBuildStream(createStrategy, ipldResolver, flushTree, options)
return createBuildStream(createStrategy, ipldResolver, options)
}
11 changes: 11 additions & 0 deletions src/exporter/clean-multihash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict'

const mh = require('multihashes')

module.exports = (multihash) => {
if (Buffer.isBuffer(multihash)) {
return mh.toB58String(multihash)
}

return multihash
}
36 changes: 36 additions & 0 deletions src/exporter/dir-flat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

const path = require('path')
const pull = require('pull-stream')
const paramap = require('pull-paramap')
const CID = require('cids')
const cat = require('pull-cat')

// Logic to export a unixfs directory.
module.exports = dirExporter

function dirExporter (node, name, ipldResolver, resolve, parent) {
const dir = {
path: name,
hash: node.multihash
}

return cat([
pull.values([dir]),
pull(
pull.values(node.links),
pull.map((link) => ({
path: path.join(name, link.name),
hash: link.multihash
})),
paramap((item, cb) => ipldResolver.get(new CID(item.hash), (err, n) => {
if (err) {
return cb(err)
}

cb(null, resolve(n.value, item.path, ipldResolver, name, parent))
})),
pull.flatten()
)
])
}
48 changes: 48 additions & 0 deletions src/exporter/dir-hamt-sharded.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict'

const path = require('path')
const pull = require('pull-stream')
const paramap = require('pull-paramap')
const CID = require('cids')
const cat = require('pull-cat')
const cleanHash = require('./clean-multihash')

// Logic to export a unixfs directory.
module.exports = shardedDirExporter

function shardedDirExporter (node, name, ipldResolver, resolve, parent) {
let dir
if (!parent || parent.path !== name) {
dir = [{
path: name,
hash: cleanHash(node.multihash)
}]
}

return cat([
pull.values(dir),
pull(
pull.values(node.links),
pull.map((link) => {
// remove the link prefix (2 chars for the bucket index)
let p = link.name.substring(2)
// another sharded dir or file?
p = p ? path.join(name, p) : name

return {
name: link.name,
path: p,
hash: link.multihash
}
}),
paramap((item, cb) => ipldResolver.get(new CID(item.hash), (err, n) => {
if (err) {
return cb(err)
}

cb(null, resolve(n.value, item.path, ipldResolver, (dir && dir[0]) || parent))
})),
pull.flatten()
)
])
}
43 changes: 4 additions & 39 deletions src/exporter/index.js
Original file line number Diff line number Diff line change
@@ -1,58 +1,23 @@
'use strict'

const traverse = require('pull-traverse')
const pull = require('pull-stream')
const CID = require('cids')
const isIPFS = require('is-ipfs')

const util = require('./../util')
const switchType = util.switchType
const cleanMultihash = util.cleanMultihash
const resolve = require('./resolve').resolve
const cleanMultihash = require('./clean-multihash')

const dirExporter = require('./dir')
const fileExporter = require('./file')

module.exports = (hash, ipldResolver, options) => {
module.exports = (hash, ipldResolver) => {
if (!isIPFS.multihash(hash)) {
return pull.error(new Error('not valid multihash'))
}

hash = cleanMultihash(hash)
options = options || {}

function visitor (item) {
if (!item.hash) {
// having no hash means that this visitor got a file object
// which needs no further resolving.
// No further resolving means that the visitor does not
// need to do anyting else, so he's returning
// an empty stream

// TODO: perhaps change the pull streams construct.
// Instead of traversing with a visitor, consider recursing.
return pull.empty()
}
return pull(
ipldResolver.getStream(new CID(item.hash)),
pull.map((result) => result.value),
pull.map((node) => switchType(
node,
() => dirExporter(node, item.path, ipldResolver),
() => fileExporter(node, item.path, ipldResolver)
)),
pull.flatten()
)
}

// Traverse the DAG
return pull(
ipldResolver.getStream(new CID(hash)),
pull.map((result) => result.value),
pull.map((node) => switchType(
node,
() => traverse.widthFirst({path: hash, hash}, visitor),
() => fileExporter(node, hash, ipldResolver)
)),
pull.map((node) => resolve(node, hash, ipldResolver)),
pull.flatten()
)
}
30 changes: 30 additions & 0 deletions src/exporter/resolve.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const UnixFS = require('ipfs-unixfs')
const pull = require('pull-stream')

const resolvers = {
directory: require('./dir-flat'),
'hamt-sharded-directory': require('./dir-hamt-sharded'),
file: require('./file')
}

module.exports = Object.assign({
resolve: resolve,
typeOf: typeOf
}, resolvers)

function resolve (node, name, ipldResolver, parentNode) {
const type = typeOf(node)
const resolver = resolvers[type]
if (!resolver) {
return pull.error(new Error('Unkown node type ' + type))
}
let stream = resolver(node, name, ipldResolver, resolve, parentNode)
return stream
}

function typeOf (node) {
const data = UnixFS.unmarshal(node.data)
return data.type
}
Loading