Skip to content

fix: add tests for unbalanced dags and very deep dags #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@
"blockstore-core": "^4.0.1",
"delay": "^5.0.0",
"ipfs-unixfs-importer": "^15.0.0",
"iso-random-stream": "^2.0.2",
"it-all": "^2.0.0",
"it-buffer-stream": "^3.0.0",
"it-first": "^2.0.0",
"merge-options": "^3.0.4",
"sinon": "^15.0.0"
"sinon": "^15.0.0",
"wherearewe": "^2.0.1"
},
"browser": {
"fs": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import map from 'it-map'
import PQueue from 'p-queue'
import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, Blockstore } from '../../../index.js'

async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, walkQueue: PQueue, options: ExporterOptions): Promise<void> {
async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
// a `raw` node
if (node instanceof Uint8Array) {
queue.push(extractDataFromBlock(node, streamPosition, start, end))
Expand Down Expand Up @@ -98,9 +98,23 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array,
return
}

void walkQueue.add(async () => {
await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options)
// create a queue for this child - we use a queue instead of recursion
// to avoid overflowing the stack
const childQueue = new PQueue({
concurrency: 1
})
// if any of the child jobs error, end the read queue with the error
childQueue.on('error', error => {
queue.end(error)
})

// if the job rejects the 'error' event will be emitted on the child queue
void childQueue.add(async () => {
await walkDAG(blockstore, child, queue, blockStart, start, end, options)
})

// wait for this child to complete before moving on to the next
await childQueue.onIdle()
}
}
)
Expand All @@ -123,20 +137,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
return
}

// use a queue to walk the DAG instead of recursion to ensure very deep DAGs
// don't overflow the stack
const walkQueue = new PQueue({
concurrency: 1
})
const queue = pushable()

void walkQueue.add(async () => {
await walkDAG(blockstore, node, queue, 0n, offset, offset + length, walkQueue, options)
})

walkQueue.on('error', error => {
queue.end(error)
})
void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
.catch(err => {
queue.end(err)
})

let read = 0n

Expand Down
214 changes: 214 additions & 0 deletions packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import all from 'it-all'
import { MemoryBlockstore } from 'blockstore-core'
import { concat, concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { exporter } from './../src/index.js'
import randomBytes from 'iso-random-stream/src/random.js'
import { sha256 } from 'multiformats/hashes/sha2'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { UnixFS } from 'ipfs-unixfs'
import * as dagPb from '@ipld/dag-pb'
import type { Blockstore } from 'interface-blockstore'
import { isNode } from 'wherearewe'

describe('exporter esoteric DAGs', () => {
let block: Blockstore

beforeEach(() => {
block = new MemoryBlockstore()
})

async function storeBlock (buf: Uint8Array, codec: number): Promise<CID> {
const mh = await sha256.digest(buf)
const cid = CID.createV1(codec, mh)

await block.put(cid, buf)

return cid
}

it('exports an unbalanced DAG', async () => {
const leaves = await Promise.all([
randomBytes(5),
randomBytes(3),
randomBytes(6),
randomBytes(10),
randomBytes(4),
randomBytes(7),
randomBytes(8)
].map(async buf => {
return {
cid: await storeBlock(buf, raw.code),
buf
}
}))

// create an unbalanced DAG:
//
// root
// / | | \
// 0 * 5 6
// / | \
// 1 * 4
// / \
// 2 3

const intermediateNode1 = {
Data: new UnixFS({
type: 'file',
blockSizes: [
BigInt(leaves[2].buf.byteLength),
BigInt(leaves[3].buf.byteLength)
]
}).marshal(),
Links: [{
Name: '',
Hash: leaves[2].cid,
Tsize: leaves[2].buf.byteLength
}, {
Name: '',
Hash: leaves[3].cid,
Tsize: leaves[3].buf.byteLength
}]
}
const intermediateNode1Buf = dagPb.encode(intermediateNode1)
const intermediateNode1Cid = await storeBlock(intermediateNode1Buf, dagPb.code)

const intermediateNode2 = {
Data: new UnixFS({
type: 'file',
blockSizes: [
BigInt(leaves[1].buf.byteLength),
BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength),
BigInt(leaves[4].buf.byteLength)
]
}).marshal(),
Links: [{
Name: '',
Hash: leaves[1].cid,
Tsize: leaves[1].buf.byteLength
}, {
Name: '',
Hash: intermediateNode1Cid,
Tsize: intermediateNode1Buf.length
}, {
Name: '',
Hash: leaves[4].cid,
Tsize: leaves[4].buf.byteLength
}]
}

const intermediateNode2Buf = dagPb.encode(intermediateNode2)
const intermediateNode2Cid = await storeBlock(intermediateNode2Buf, dagPb.code)

const unixfs = new UnixFS({
type: 'file',
blockSizes: [
BigInt(leaves[0].buf.byteLength),
BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength),
BigInt(leaves[5].buf.byteLength),
BigInt(leaves[6].buf.byteLength)
]
})

const rootNode = {
Data: unixfs.marshal(),
Links: [{
Name: '',
Hash: leaves[0].cid,
Tsize: leaves[0].buf.byteLength
}, {
Name: '',
Hash: intermediateNode2Cid,
Tsize: intermediateNode2Buf.byteLength
}, {
Name: '',
Hash: leaves[5].cid,
Tsize: leaves[5].buf.byteLength
}, {
Name: '',
Hash: leaves[6].cid,
Tsize: leaves[6].buf.byteLength
}]
}

const rootBuf = dagPb.encode(rootNode)
const rootCid = await storeBlock(rootBuf, dagPb.code)
const exported = await exporter(rootCid, block)

if (exported.type !== 'file') {
throw new Error('Unexpected type')
}

const data = uint8ArrayConcat(await all(exported.content()))
expect(data).to.deep.equal(concat(
leaves.map(l => l.buf)
))
})

it('exports a very deep DAG', async () => {
if (!isNode) {
// browsers are quite slow so only run on node
return
}

const buf: Uint8Array = randomBytes(5)
let child = {
cid: await storeBlock(buf, raw.code),
buf
}

// create a very deep DAG:
//
// root
// \
// *
// \
// *
// \
// ... many nodes here
// \
// 0
let rootCid: CID | undefined

for (let i = 0; i < 100000; i++) {
const parent = {
Data: new UnixFS({
type: 'file',
blockSizes: [
BigInt(buf.byteLength)
]
}).marshal(),
Links: [{
Name: '',
Hash: child.cid,
Tsize: child.buf.byteLength
}]
}

const parentBuf = dagPb.encode(parent)
rootCid = await storeBlock(parentBuf, dagPb.code)

child = {
cid: rootCid,
buf: parentBuf
}
}

if (rootCid == null) {
throw new Error('Root CID not set')
}

const exported = await exporter(rootCid, block)

if (exported.type !== 'file') {
throw new Error('Unexpected type')
}

const data = uint8ArrayConcat(await all(exported.content()))
expect(data).to.deep.equal(buf)
})
})