From 88cdb06186f1917494502d30606e5c33c9bdf578 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Mar 2023 12:13:12 +0100 Subject: [PATCH 1/5] fix: add tests for unbalanced dags and very deep dags To ensure that blocks are emitted in the correct order while also not overflowing the stack, when we loop of the children of a node, use a separate queue for each child and wait for the queue to be idle before moving on to the next child. --- .../src/resolvers/unixfs-v1/content/file.ts | 8 + .../test/exporter-esoteric.spec.ts | 196 ++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index d0d8f6db..1c16259d 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -98,9 +98,17 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, return } + // create a sub-queue for this child + const walkQueue = new PQueue({ + concurrency: 1 + }) + void walkQueue.add(async () => { await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options) }) + + // wait for this child to complete before moving on to the next + await walkQueue.onIdle() } } ) diff --git a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts new file mode 100644 index 00000000..6183f421 --- /dev/null +++ b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts @@ -0,0 +1,196 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import all from 'it-all' +import { MemoryBlockstore } from 'blockstore-core' +import { concat, concat as uint8ArrayConcat } from 'uint8arrays/concat' +import { exporter } from './../src/index.js' +import randomBytes from 'iso-random-stream/src/random.js' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import * as raw from 'multiformats/codecs/raw' +import { UnixFS } from 'ipfs-unixfs' +import * as dagPb from '@ipld/dag-pb' +import type { Blockstore } from 'interface-blockstore' + +describe('exporter esoteric DAGs', () => { + let block: Blockstore + + beforeEach(() => { + block = new MemoryBlockstore() + }) + + async function storeBlock (buf: Uint8Array, codec: number): Promise { + const mh = await sha256.digest(buf) + const cid = CID.createV1(codec, mh) + + await block.put(cid, buf) + + return cid + } + + it('exports an uneven DAG', async () => { + const leaves = await Promise.all([ + randomBytes(5), + randomBytes(3), + randomBytes(6), + randomBytes(10), + randomBytes(4), + randomBytes(7), + randomBytes(8) + ].map(async buf => { + return { + cid: await storeBlock(buf, raw.code), + buf + } + })) + + // create an unbalanced DAG: + // + // root + // / | | \ + // 0 * 5 6 + // / | \ + // 1 * 4 + // / \ + // 2 3 + + const intermediateNode1 = { + Data: new UnixFS({ type: 'file', blockSizes: [ + BigInt(leaves[2].buf.byteLength), + BigInt(leaves[3].buf.byteLength) + ]}).marshal(), + Links: [{ + Name: '', + Hash: leaves[2].cid, + Tsize: leaves[2].buf.byteLength, + }, { + Name: '', + Hash: leaves[3].cid, + Tsize: leaves[3].buf.byteLength, + }] + } + const intermediateNode1Buf = dagPb.encode(intermediateNode1) + const intermediateNode1Cid = await storeBlock(intermediateNode1Buf, dagPb.code) + + const intermediateNode2 = { + Data: new UnixFS({ type: 'file', blockSizes: [ + BigInt(leaves[1].buf.byteLength), + BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength), + BigInt(leaves[4].buf.byteLength) + ]}).marshal(), + Links: [{ + Name: '', + Hash: leaves[1].cid, + Tsize: leaves[1].buf.byteLength, + }, { + Name: '', + Hash: intermediateNode1Cid, + Tsize: intermediateNode1Buf.length, + }, { + Name: '', + Hash: leaves[4].cid, + Tsize: leaves[4].buf.byteLength, + }] + } + + const intermediateNode2Buf = dagPb.encode(intermediateNode2) + const intermediateNode2Cid = await storeBlock(intermediateNode2Buf, dagPb.code) + + const unixfs = new UnixFS({ type: 'file', blockSizes: [ + BigInt(leaves[0].buf.byteLength), + BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength), + BigInt(leaves[5].buf.byteLength), + BigInt(leaves[6].buf.byteLength) + ]}) + + const rootNode = { + Data: unixfs.marshal(), + Links: [{ + Name: '', + Hash: leaves[0].cid, + Tsize: leaves[0].buf.byteLength, + }, { + Name: '', + Hash: intermediateNode2Cid, + Tsize: intermediateNode2Buf.byteLength, + }, { + Name: '', + Hash: leaves[5].cid, + Tsize: leaves[5].buf.byteLength, + }, { + Name: '', + Hash: leaves[6].cid, + Tsize: leaves[6].buf.byteLength, + }] + } + + const rootBuf = dagPb.encode(rootNode) + const rootCid = await storeBlock(rootBuf, dagPb.code) + const exported = await exporter(rootCid, block) + + if (exported.type !== 'file') { + throw new Error('Unexpected type') + } + + const data = uint8ArrayConcat(await all(exported.content())) + expect(data).to.deep.equal(concat( + leaves.map(l => l.buf) + )) + }) + + it('exports a very deep DAG', async () => { + const buf: Uint8Array = randomBytes(5) + let child = { + cid: await storeBlock(buf, raw.code), + buf + } + + // create a very deep DAG: + // + // root + // \ + // * + // \ + // * + // \ + // ... many nodes here + // \ + // 0 + let rootCid: CID | undefined + + for (let i = 0; i < 100000; i++) { + const parent = { + Data: new UnixFS({ type: 'file', blockSizes: [ + BigInt(buf.byteLength) + ]}).marshal(), + Links: [{ + Name: '', + Hash: child.cid, + Tsize: child.buf.byteLength, + }] + } + + const parentBuf = dagPb.encode(parent) + rootCid = await storeBlock(parentBuf, dagPb.code) + + child = { + cid: rootCid, + buf: parentBuf + } + } + + if (rootCid == null) { + throw new Error('Root CID not set') + } + + const exported = await exporter(rootCid, block) + + if (exported.type !== 'file') { + throw new Error('Unexpected type') + } + + const data = uint8ArrayConcat(await all(exported.content())) + expect(data).to.deep.equal(buf) + }) +}) From 8333d7b048c24c9b977331234c91a1e09b757dc1 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Mar 2023 12:19:29 +0100 Subject: [PATCH 2/5] chore: fix linting --- .../src/resolvers/unixfs-v1/content/file.ts | 32 ++++----- .../test/exporter-esoteric.spec.ts | 68 +++++++++++-------- 2 files changed, 54 insertions(+), 46 deletions(-) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 1c16259d..14a1fad5 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -11,7 +11,7 @@ import map from 'it-map' import PQueue from 'p-queue' import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, Blockstore } from '../../../index.js' -async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, walkQueue: PQueue, options: ExporterOptions): Promise { +async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise { // a `raw` node if (node instanceof Uint8Array) { queue.push(extractDataFromBlock(node, streamPosition, start, end)) @@ -98,17 +98,21 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, return } - // create a sub-queue for this child - const walkQueue = new PQueue({ + // create a queue for this child - we use a queue instead of recursion + // to avoid overflowing the stack + const childQueue = new PQueue({ concurrency: 1 }) + childQueue.on('error', error => { + queue.end(error) + }) - void walkQueue.add(async () => { - await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options) + void childQueue.add(async () => { + await walkDAG(blockstore, child, queue, blockStart, start, end, options) }) // wait for this child to complete before moving on to the next - await walkQueue.onIdle() + await childQueue.onIdle() } } ) @@ -131,20 +135,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, return } - // use a queue to walk the DAG instead of recursion to ensure very deep DAGs - // don't overflow the stack - const walkQueue = new PQueue({ - concurrency: 1 - }) const queue = pushable() - void walkQueue.add(async () => { - await walkDAG(blockstore, node, queue, 0n, offset, offset + length, walkQueue, options) - }) - - walkQueue.on('error', error => { - queue.end(error) - }) + void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options) + .catch(err => { + queue.end(err) + }) let read = 0n diff --git a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts index 6183f421..f527e2c6 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts @@ -56,72 +56,81 @@ describe('exporter esoteric DAGs', () => { // 2 3 const intermediateNode1 = { - Data: new UnixFS({ type: 'file', blockSizes: [ - BigInt(leaves[2].buf.byteLength), - BigInt(leaves[3].buf.byteLength) - ]}).marshal(), + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[2].buf.byteLength), + BigInt(leaves[3].buf.byteLength) + ] + }).marshal(), Links: [{ Name: '', Hash: leaves[2].cid, - Tsize: leaves[2].buf.byteLength, + Tsize: leaves[2].buf.byteLength }, { Name: '', Hash: leaves[3].cid, - Tsize: leaves[3].buf.byteLength, + Tsize: leaves[3].buf.byteLength }] } const intermediateNode1Buf = dagPb.encode(intermediateNode1) const intermediateNode1Cid = await storeBlock(intermediateNode1Buf, dagPb.code) const intermediateNode2 = { - Data: new UnixFS({ type: 'file', blockSizes: [ - BigInt(leaves[1].buf.byteLength), - BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength), - BigInt(leaves[4].buf.byteLength) - ]}).marshal(), + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[1].buf.byteLength), + BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength), + BigInt(leaves[4].buf.byteLength) + ] + }).marshal(), Links: [{ Name: '', Hash: leaves[1].cid, - Tsize: leaves[1].buf.byteLength, + Tsize: leaves[1].buf.byteLength }, { Name: '', Hash: intermediateNode1Cid, - Tsize: intermediateNode1Buf.length, + Tsize: intermediateNode1Buf.length }, { Name: '', Hash: leaves[4].cid, - Tsize: leaves[4].buf.byteLength, + Tsize: leaves[4].buf.byteLength }] } const intermediateNode2Buf = dagPb.encode(intermediateNode2) const intermediateNode2Cid = await storeBlock(intermediateNode2Buf, dagPb.code) - const unixfs = new UnixFS({ type: 'file', blockSizes: [ - BigInt(leaves[0].buf.byteLength), - BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength), - BigInt(leaves[5].buf.byteLength), - BigInt(leaves[6].buf.byteLength) - ]}) + const unixfs = new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[0].buf.byteLength), + BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength), + BigInt(leaves[5].buf.byteLength), + BigInt(leaves[6].buf.byteLength) + ] + }) const rootNode = { Data: unixfs.marshal(), Links: [{ Name: '', Hash: leaves[0].cid, - Tsize: leaves[0].buf.byteLength, + Tsize: leaves[0].buf.byteLength }, { Name: '', Hash: intermediateNode2Cid, - Tsize: intermediateNode2Buf.byteLength, + Tsize: intermediateNode2Buf.byteLength }, { Name: '', Hash: leaves[5].cid, - Tsize: leaves[5].buf.byteLength, + Tsize: leaves[5].buf.byteLength }, { Name: '', Hash: leaves[6].cid, - Tsize: leaves[6].buf.byteLength, + Tsize: leaves[6].buf.byteLength }] } @@ -161,13 +170,16 @@ describe('exporter esoteric DAGs', () => { for (let i = 0; i < 100000; i++) { const parent = { - Data: new UnixFS({ type: 'file', blockSizes: [ - BigInt(buf.byteLength) - ]}).marshal(), + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(buf.byteLength) + ] + }).marshal(), Links: [{ Name: '', Hash: child.cid, - Tsize: child.buf.byteLength, + Tsize: child.buf.byteLength }] } From 4f66cdd4aa55c591039c235f5048b648fb988158 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Mar 2023 12:22:04 +0100 Subject: [PATCH 3/5] chore: add missing dev dep --- packages/ipfs-unixfs-exporter/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 15026bd1..89894a82 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -159,6 +159,7 @@ "blockstore-core": "^4.0.1", "delay": "^5.0.0", "ipfs-unixfs-importer": "^15.0.0", + "iso-random-stream": "^2.0.2", "it-all": "^2.0.0", "it-buffer-stream": "^3.0.0", "it-first": "^2.0.0", From 59afc75609b67d4df6ad6792259fcbee35692771 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Mar 2023 12:37:20 +0100 Subject: [PATCH 4/5] chore: only run test on node --- packages/ipfs-unixfs-exporter/package.json | 3 ++- .../ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 89894a82..2868bb65 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -164,7 +164,8 @@ "it-buffer-stream": "^3.0.0", "it-first": "^2.0.0", "merge-options": "^3.0.4", - "sinon": "^15.0.0" + "sinon": "^15.0.0", + "wherearewe": "^2.0.1" }, "browser": { "fs": false diff --git a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts index f527e2c6..44981d80 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts @@ -12,6 +12,7 @@ import * as raw from 'multiformats/codecs/raw' import { UnixFS } from 'ipfs-unixfs' import * as dagPb from '@ipld/dag-pb' import type { Blockstore } from 'interface-blockstore' +import { isNode } from 'wherearewe' describe('exporter esoteric DAGs', () => { let block: Blockstore @@ -149,6 +150,11 @@ describe('exporter esoteric DAGs', () => { }) it('exports a very deep DAG', async () => { + if (!isNode) { + // browsers are quite slow so only run on node + return + } + const buf: Uint8Array = randomBytes(5) let child = { cid: await storeBlock(buf, raw.code), From dcceceed77a6ea332fee3b2c587d0288bcc20ada Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Mar 2023 12:42:24 +0100 Subject: [PATCH 5/5] chore: add more exposition --- .../src/resolvers/unixfs-v1/content/file.ts | 2 ++ packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 14a1fad5..21ff5005 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -103,10 +103,12 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, const childQueue = new PQueue({ concurrency: 1 }) + // if any of the child jobs error, end the read queue with the error childQueue.on('error', error => { queue.end(error) }) + // if the job rejects the 'error' event will be emitted on the child queue void childQueue.add(async () => { await walkDAG(blockstore, child, queue, blockStart, start, end, options) }) diff --git a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts index 44981d80..ad10c2d4 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts @@ -30,7 +30,7 @@ describe('exporter esoteric DAGs', () => { return cid } - it('exports an uneven DAG', async () => { + it('exports an unbalanced DAG', async () => { const leaves = await Promise.all([ randomBytes(5), randomBytes(3),