diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 15026bd1..2868bb65 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -159,11 +159,13 @@ "blockstore-core": "^4.0.1", "delay": "^5.0.0", "ipfs-unixfs-importer": "^15.0.0", + "iso-random-stream": "^2.0.2", "it-all": "^2.0.0", "it-buffer-stream": "^3.0.0", "it-first": "^2.0.0", "merge-options": "^3.0.4", - "sinon": "^15.0.0" + "sinon": "^15.0.0", + "wherearewe": "^2.0.1" }, "browser": { "fs": false diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index d0d8f6db..21ff5005 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -11,7 +11,7 @@ import map from 'it-map' import PQueue from 'p-queue' import type { ExporterOptions, UnixfsV1FileContent, UnixfsV1Resolver, Blockstore } from '../../../index.js' -async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, walkQueue: PQueue, options: ExporterOptions): Promise { +async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise { // a `raw` node if (node instanceof Uint8Array) { queue.push(extractDataFromBlock(node, streamPosition, start, end)) @@ -98,9 +98,23 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, return } - void walkQueue.add(async () => { - await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options) + // create a queue for this child - we use a queue instead of recursion + // to avoid overflowing the stack + const childQueue = new PQueue({ + concurrency: 1 }) + // if any of the child jobs error, end the read queue with the error + childQueue.on('error', error => { + queue.end(error) + }) + + // if the job rejects the 'error' event will be emitted on the child queue + void childQueue.add(async () => { + await walkDAG(blockstore, child, queue, blockStart, start, end, options) + }) + + // wait for this child to complete before moving on to the next + await childQueue.onIdle() } } ) @@ -123,20 +137,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, return } - // use a queue to walk the DAG instead of recursion to ensure very deep DAGs - // don't overflow the stack - const walkQueue = new PQueue({ - concurrency: 1 - }) const queue = pushable() - void walkQueue.add(async () => { - await walkDAG(blockstore, node, queue, 0n, offset, offset + length, walkQueue, options) - }) - - walkQueue.on('error', error => { - queue.end(error) - }) + void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options) + .catch(err => { + queue.end(err) + }) let read = 0n diff --git a/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts new file mode 100644 index 00000000..ad10c2d4 --- /dev/null +++ b/packages/ipfs-unixfs-exporter/test/exporter-esoteric.spec.ts @@ -0,0 +1,214 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import all from 'it-all' +import { MemoryBlockstore } from 'blockstore-core' +import { concat, concat as uint8ArrayConcat } from 'uint8arrays/concat' +import { exporter } from './../src/index.js' +import randomBytes from 'iso-random-stream/src/random.js' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import * as raw from 'multiformats/codecs/raw' +import { UnixFS } from 'ipfs-unixfs' +import * as dagPb from '@ipld/dag-pb' +import type { Blockstore } from 'interface-blockstore' +import { isNode } from 'wherearewe' + +describe('exporter esoteric DAGs', () => { + let block: Blockstore + + beforeEach(() => { + block = new MemoryBlockstore() + }) + + async function storeBlock (buf: Uint8Array, codec: number): Promise { + const mh = await sha256.digest(buf) + const cid = CID.createV1(codec, mh) + + await block.put(cid, buf) + + return cid + } + + it('exports an unbalanced DAG', async () => { + const leaves = await Promise.all([ + randomBytes(5), + randomBytes(3), + randomBytes(6), + randomBytes(10), + randomBytes(4), + randomBytes(7), + randomBytes(8) + ].map(async buf => { + return { + cid: await storeBlock(buf, raw.code), + buf + } + })) + + // create an unbalanced DAG: + // + // root + // / | | \ + // 0 * 5 6 + // / | \ + // 1 * 4 + // / \ + // 2 3 + + const intermediateNode1 = { + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[2].buf.byteLength), + BigInt(leaves[3].buf.byteLength) + ] + }).marshal(), + Links: [{ + Name: '', + Hash: leaves[2].cid, + Tsize: leaves[2].buf.byteLength + }, { + Name: '', + Hash: leaves[3].cid, + Tsize: leaves[3].buf.byteLength + }] + } + const intermediateNode1Buf = dagPb.encode(intermediateNode1) + const intermediateNode1Cid = await storeBlock(intermediateNode1Buf, dagPb.code) + + const intermediateNode2 = { + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[1].buf.byteLength), + BigInt(leaves[2].buf.byteLength + leaves[3].buf.byteLength), + BigInt(leaves[4].buf.byteLength) + ] + }).marshal(), + Links: [{ + Name: '', + Hash: leaves[1].cid, + Tsize: leaves[1].buf.byteLength + }, { + Name: '', + Hash: intermediateNode1Cid, + Tsize: intermediateNode1Buf.length + }, { + Name: '', + Hash: leaves[4].cid, + Tsize: leaves[4].buf.byteLength + }] + } + + const intermediateNode2Buf = dagPb.encode(intermediateNode2) + const intermediateNode2Cid = await storeBlock(intermediateNode2Buf, dagPb.code) + + const unixfs = new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(leaves[0].buf.byteLength), + BigInt(leaves[1].buf.byteLength + leaves[2].buf.byteLength + leaves[3].buf.byteLength + leaves[4].buf.byteLength), + BigInt(leaves[5].buf.byteLength), + BigInt(leaves[6].buf.byteLength) + ] + }) + + const rootNode = { + Data: unixfs.marshal(), + Links: [{ + Name: '', + Hash: leaves[0].cid, + Tsize: leaves[0].buf.byteLength + }, { + Name: '', + Hash: intermediateNode2Cid, + Tsize: intermediateNode2Buf.byteLength + }, { + Name: '', + Hash: leaves[5].cid, + Tsize: leaves[5].buf.byteLength + }, { + Name: '', + Hash: leaves[6].cid, + Tsize: leaves[6].buf.byteLength + }] + } + + const rootBuf = dagPb.encode(rootNode) + const rootCid = await storeBlock(rootBuf, dagPb.code) + const exported = await exporter(rootCid, block) + + if (exported.type !== 'file') { + throw new Error('Unexpected type') + } + + const data = uint8ArrayConcat(await all(exported.content())) + expect(data).to.deep.equal(concat( + leaves.map(l => l.buf) + )) + }) + + it('exports a very deep DAG', async () => { + if (!isNode) { + // browsers are quite slow so only run on node + return + } + + const buf: Uint8Array = randomBytes(5) + let child = { + cid: await storeBlock(buf, raw.code), + buf + } + + // create a very deep DAG: + // + // root + // \ + // * + // \ + // * + // \ + // ... many nodes here + // \ + // 0 + let rootCid: CID | undefined + + for (let i = 0; i < 100000; i++) { + const parent = { + Data: new UnixFS({ + type: 'file', + blockSizes: [ + BigInt(buf.byteLength) + ] + }).marshal(), + Links: [{ + Name: '', + Hash: child.cid, + Tsize: child.buf.byteLength + }] + } + + const parentBuf = dagPb.encode(parent) + rootCid = await storeBlock(parentBuf, dagPb.code) + + child = { + cid: rootCid, + buf: parentBuf + } + } + + if (rootCid == null) { + throw new Error('Root CID not set') + } + + const exported = await exporter(rootCid, block) + + if (exported.type !== 'file') { + throw new Error('Unexpected type') + } + + const data = uint8ArrayConcat(await all(exported.content())) + expect(data).to.deep.equal(buf) + }) +})