Skip to content

Commit da18ea6

Browse files
Alan Shawachingbrainrvagg
authored
feat: add a shardFanoutBits option to the importer (#355)
Adds a `shardFanoutBits` option to the importer to allow configuring the number of bytes used for the HAMT prefix, also a test. Use the UnixFS `fanout` value to slice off the correct number of characters from `Name` in HAMTs. Fixes exporting the HAMT from https://github.com/ipld/ipld/blob/master/specs/transport/trustless-pathing/fixtures/unixfs_20m_variety.car (which has a small fanout to force collisions). refs ipld/ipld#296 (comment) --------- Co-authored-by: Alex Potsides <[email protected]> Co-authored-by: Rod Vagg <[email protected]>
1 parent beeb0a7 commit da18ea6

File tree

8 files changed

+142
-22
lines changed

8 files changed

+142
-22
lines changed

packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { decode, type PBNode } from '@ipld/dag-pb'
2+
import errCode from 'err-code'
3+
import { UnixFS } from 'ipfs-unixfs'
24
import map from 'it-map'
35
import parallel from 'it-parallel'
46
import { pipe } from 'it-pipe'
@@ -20,11 +22,28 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path,
2022
async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: ReadableStorage, options: ExporterOptions): UnixfsV1DirectoryContent {
2123
const links = node.Links
2224

25+
if (node.Data == null) {
26+
throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS')
27+
}
28+
29+
let dir: UnixFS
30+
try {
31+
dir = UnixFS.unmarshal(node.Data)
32+
} catch (err: any) {
33+
throw errCode(err, 'ERR_NOT_UNIXFS')
34+
}
35+
36+
if (dir.fanout == null) {
37+
throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS')
38+
}
39+
40+
const padLength = (dir.fanout - 1n).toString(16).length
41+
2342
const results = pipe(
2443
links,
2544
source => map(source, link => {
2645
return async () => {
27-
const name = link.Name != null ? link.Name.substring(2) : null
46+
const name = link.Name != null ? link.Name.substring(padLength) : null
2847

2948
if (name != null && name !== '') {
3049
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)

packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { decode, type PBLink, type PBNode } from '@ipld/dag-pb'
22
import { murmur3128 } from '@multiformats/murmur3'
3+
import errCode from 'err-code'
34
import { Bucket, type BucketPosition, createHAMT } from 'hamt-sharding'
5+
import { UnixFS } from 'ipfs-unixfs'
46
import type { ExporterOptions, ShardTraversalContext, ReadableStorage } from '../index.js'
57
import type { CID } from 'multiformats/cid'
68

@@ -16,13 +18,14 @@ const hashFn = async function (buf: Uint8Array): Promise<Uint8Array> {
1618
}
1719

1820
const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket<boolean>, rootBucket: Bucket<boolean>): Promise<void> => {
21+
const padLength = (bucket.tableSize() - 1).toString(16).length
1922
await Promise.all(
2023
links.map(async link => {
2124
if (link.Name == null) {
2225
// TODO(@rvagg): what do? this is technically possible
2326
throw new Error('Unexpected Link without a Name')
2427
}
25-
if (link.Name.length === 2) {
28+
if (link.Name.length === padLength) {
2629
const pos = parseInt(link.Name, 16)
2730

2831
bucket._putObjectAt(pos, new Bucket({
@@ -37,12 +40,12 @@ const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket<boolean>, ro
3740
)
3841
}
3942

40-
const toPrefix = (position: number): string => {
43+
const toPrefix = (position: number, padLength: number): string => {
4144
return position
4245
.toString(16)
4346
.toUpperCase()
44-
.padStart(2, '0')
45-
.substring(0, 2)
47+
.padStart(padLength, '0')
48+
.substring(0, padLength)
4649
}
4750

4851
const toBucketPath = (position: BucketPosition<boolean>): Array<Bucket<boolean>> => {
@@ -62,8 +65,27 @@ const toBucketPath = (position: BucketPosition<boolean>): Array<Bucket<boolean>>
6265

6366
const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStorage, context?: ShardTraversalContext, options?: ExporterOptions): Promise<CID | undefined> => {
6467
if (context == null) {
68+
if (node.Data == null) {
69+
throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS')
70+
}
71+
72+
let dir: UnixFS
73+
try {
74+
dir = UnixFS.unmarshal(node.Data)
75+
} catch (err: any) {
76+
throw errCode(err, 'ERR_NOT_UNIXFS')
77+
}
78+
79+
if (dir.type !== 'hamt-sharded-directory') {
80+
throw errCode(new Error('not a HAMT'), 'ERR_NOT_UNIXFS')
81+
}
82+
if (dir.fanout == null) {
83+
throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS')
84+
}
85+
6586
const rootBucket = createHAMT<boolean>({
66-
hashFn
87+
hashFn,
88+
bits: Math.log2(Number(dir.fanout))
6789
})
6890

6991
context = {
@@ -73,25 +95,27 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor
7395
}
7496
}
7597

98+
const padLength = (context.lastBucket.tableSize() - 1).toString(16).length
99+
76100
await addLinksToHamtBucket(node.Links, context.lastBucket, context.rootBucket)
77101

78102
const position = await context.rootBucket._findNewBucketAndPos(name)
79-
let prefix = toPrefix(position.pos)
103+
let prefix = toPrefix(position.pos, padLength)
80104
const bucketPath = toBucketPath(position)
81105

82106
if (bucketPath.length > context.hamtDepth) {
83107
context.lastBucket = bucketPath[context.hamtDepth]
84108

85-
prefix = toPrefix(context.lastBucket._posAtParent)
109+
prefix = toPrefix(context.lastBucket._posAtParent, padLength)
86110
}
87111

88112
const link = node.Links.find(link => {
89113
if (link.Name == null) {
90114
return false
91115
}
92116

93-
const entryPrefix = link.Name.substring(0, 2)
94-
const entryName = link.Name.substring(2)
117+
const entryPrefix = link.Name.substring(0, padLength)
118+
const entryName = link.Name.substring(padLength)
95119

96120
if (entryPrefix !== prefix) {
97121
// not the entry or subshard we're looking for
@@ -110,7 +134,7 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor
110134
return
111135
}
112136

113-
if (link.Name != null && link.Name.substring(2) === name) {
137+
if (link.Name != null && link.Name.substring(padLength) === name) {
114138
return link.Hash
115139
}
116140

packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import * as dagPb from '@ipld/dag-pb'
44
import { expect } from 'aegir/chai'
55
import { MemoryBlockstore } from 'blockstore-core'
66
import { UnixFS } from 'ipfs-unixfs'
7-
import { importer } from 'ipfs-unixfs-importer'
7+
import { importer, type ImportCandidate } from 'ipfs-unixfs-importer'
88
import all from 'it-all'
99
import randomBytes from 'it-buffer-stream'
1010
import last from 'it-last'
@@ -241,7 +241,7 @@ describe('exporter sharded', function () {
241241
await block.put(nodeBlockCid, nodeBlockBuf)
242242

243243
const shardNodeBuf = dagPb.encode({
244-
Data: new UnixFS({ type: 'hamt-sharded-directory' }).marshal(),
244+
Data: new UnixFS({ type: 'hamt-sharded-directory', fanout: 2n ** 8n }).marshal(),
245245
Links: [{
246246
Name: '75normal-dir',
247247
Tsize: nodeBlockBuf.length,
@@ -255,4 +255,64 @@ describe('exporter sharded', function () {
255255

256256
expect(exported.name).to.deep.equal('file-1')
257257
})
258+
259+
it('exports a shard with a different fanout size', async () => {
260+
const files: ImportCandidate[] = [{
261+
path: '/baz.txt',
262+
content: Uint8Array.from([0, 1, 2, 3, 4])
263+
}, {
264+
path: '/foo.txt',
265+
content: Uint8Array.from([0, 1, 2, 3, 4])
266+
}, {
267+
path: '/bar.txt',
268+
content: Uint8Array.from([0, 1, 2, 3, 4])
269+
}]
270+
271+
const result = await last(importer(files, block, {
272+
shardSplitThresholdBytes: 0,
273+
shardFanoutBits: 4, // 2**4 = 16 children max
274+
wrapWithDirectory: true
275+
}))
276+
277+
if (result == null) {
278+
throw new Error('Import failed')
279+
}
280+
281+
const { cid } = result
282+
const dir = await exporter(cid, block)
283+
284+
expect(dir).to.have.nested.property('unixfs.fanout', 16n)
285+
286+
const contents = await all(dir.content())
287+
288+
expect(contents.map(entry => ({
289+
path: `/${entry.name}`,
290+
content: entry.node
291+
})))
292+
.to.deep.equal(files)
293+
})
294+
295+
it('walks path of a HAMT with a different fanout size', async () => {
296+
const files: ImportCandidate[] = [{
297+
path: '/foo/bar/baz.txt',
298+
content: Uint8Array.from([0, 1, 2, 3, 4])
299+
}]
300+
301+
const result = await last(importer(files, block, {
302+
shardSplitThresholdBytes: 0,
303+
shardFanoutBits: 4, // 2**4 = 16 children max
304+
wrapWithDirectory: true
305+
}))
306+
307+
if (result == null) {
308+
throw new Error('Import failed')
309+
}
310+
311+
const { cid } = result
312+
const file = await last(walkPath(`${cid}/foo/bar/baz.txt`, block))
313+
expect([{
314+
path: file?.path.replace(`${cid}`, ''),
315+
content: file?.node
316+
}]).to.deep.equal(files)
317+
})
258318
})

packages/ipfs-unixfs-importer/src/dir-sharded.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@ async function hamtHashFn (buf: Uint8Array): Promise<Uint8Array> {
1818
}
1919

2020
const HAMT_HASH_CODE = BigInt(0x22)
21+
const DEFAULT_FANOUT_BITS = 8
22+
23+
export interface DirShardedOptions extends PersistOptions {
24+
shardFanoutBits: number
25+
}
2126

2227
class DirSharded extends Dir {
2328
private readonly _bucket: Bucket<InProgressImportResult | Dir>
2429

25-
constructor (props: DirProps, options: PersistOptions) {
30+
constructor (props: DirProps, options: DirShardedOptions) {
2631
super(props, options)
2732

2833
this._bucket = createHAMT({
2934
hashFn: hamtHashFn,
30-
bits: 8
35+
bits: options.shardFanoutBits ?? DEFAULT_FANOUT_BITS
3136
})
3237
}
3338

@@ -88,6 +93,7 @@ export default DirSharded
8893

8994
async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore: Blockstore, shardRoot: DirSharded | null, options: PersistOptions): AsyncIterable<ImportResult> {
9095
const children = bucket._children
96+
const padLength = (bucket.tableSize() - 1).toString(16).length
9197
const links: PBLink[] = []
9298
let childrenSize = 0n
9399

@@ -98,7 +104,7 @@ async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore
98104
continue
99105
}
100106

101-
const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
107+
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')
102108

103109
if (child instanceof Bucket) {
104110
let shard
@@ -191,6 +197,7 @@ function isDir (obj: any): obj is Dir {
191197

192198
function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, options: PersistOptions): number {
193199
const children = bucket._children
200+
const padLength = (bucket.tableSize() - 1).toString(16).length
194201
const links: PBLink[] = []
195202

196203
for (let i = 0; i < children.length; i++) {
@@ -200,7 +207,7 @@ function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, optio
200207
continue
201208
}
202209

203-
const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
210+
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')
204211

205212
if (child instanceof Bucket) {
206213
const size = calculateSize(child, null, options)

packages/ipfs-unixfs-importer/src/flat-to-shard.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { DirFlat } from './dir-flat.js'
2-
import DirSharded from './dir-sharded.js'
2+
import DirSharded, { type DirShardedOptions } from './dir-sharded.js'
33
import type { Dir } from './dir.js'
4-
import type { PersistOptions } from './utils/persist.js'
54

6-
export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: PersistOptions): Promise<DirSharded> {
5+
export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: DirShardedOptions): Promise<DirSharded> {
76
let newDir = dir as DirSharded
87

98
if (dir instanceof DirFlat && dir.estimateNodeSize() > threshold) {
@@ -31,7 +30,7 @@ export async function flatToShard (child: Dir | null, dir: Dir, threshold: numbe
3130
return newDir
3231
}
3332

34-
async function convertToShard (oldDir: DirFlat, options: PersistOptions): Promise<DirSharded> {
33+
async function convertToShard (oldDir: DirFlat, options: DirShardedOptions): Promise<DirSharded> {
3534
const newDir = new DirSharded({
3635
root: oldDir.root,
3736
dir: true,

packages/ipfs-unixfs-importer/src/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ export interface ImporterOptions extends ProgressOptions<ImporterProgressEvents>
123123
*/
124124
shardSplitThresholdBytes?: number
125125

126+
/**
127+
* The number of bits of a hash digest used at each level of sharding to
128+
* the child index. 2**shardFanoutBits will dictate the maximum number of
129+
* children for any shard in the HAMT. Default: 8
130+
*/
131+
shardFanoutBits?: number
132+
126133
/**
127134
* How many files to import concurrently. For large numbers of small files this
128135
* should be high (e.g. 50). Default: 10
@@ -241,6 +248,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri
241248

242249
const wrapWithDirectory = options.wrapWithDirectory ?? false
243250
const shardSplitThresholdBytes = options.shardSplitThresholdBytes ?? 262144
251+
const shardFanoutBits = options.shardFanoutBits ?? 8
244252
const cidVersion = options.cidVersion ?? 1
245253
const rawLeaves = options.rawLeaves ?? true
246254
const leafType = options.leafType ?? 'file'
@@ -269,6 +277,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri
269277
const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({
270278
wrapWithDirectory,
271279
shardSplitThresholdBytes,
280+
shardFanoutBits,
272281
cidVersion,
273282
onProgress: options.onProgress
274283
})

packages/ipfs-unixfs-importer/src/tree-builder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { PersistOptions } from './utils/persist.js'
77

88
export interface AddToTreeOptions extends PersistOptions {
99
shardSplitThresholdBytes: number
10+
shardFanoutBits: number
1011
}
1112

1213
async function addToTree (elem: InProgressImportResult, tree: Dir, options: AddToTreeOptions): Promise<Dir> {

packages/ipfs-unixfs/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class UnixFS {
5252
secs: message.mtime.Seconds ?? 0n,
5353
nsecs: message.mtime.FractionalNanoseconds
5454
}
55-
: undefined
55+
: undefined,
56+
fanout: message.fanout
5657
})
5758

5859
// make sure we honour the original mode

0 commit comments

Comments
 (0)