diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index c3208626..d8a842bd 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -154,6 +154,7 @@ "uint8arrays": "^4.0.2" }, "devDependencies": { + "@types/readable-stream": "^2.3.15", "@types/sinon": "^10.0.0", "aegir": "^38.1.2", "blockstore-core": "^4.0.1", @@ -163,12 +164,15 @@ "it-all": "^3.0.2", "it-buffer-stream": "^3.0.0", "it-first": "^3.0.2", + "it-to-buffer": "^4.0.2", "merge-options": "^3.0.4", + "readable-stream": "^4.4.0", "sinon": "^15.0.0", "wherearewe": "^2.0.1" }, "browser": { - "fs": false + "fs": false, + "readable-stream": false }, "typedoc": { "entryPoint": "./src/index.ts" diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index d155bdbd..c9ed8fcb 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -59,7 +59,7 @@ export interface Exportable { cid: CID depth: number size: bigint - content: (options?: ExporterOptions) => AsyncIterable + content: (options?: ExporterOptions) => AsyncGenerator } export interface UnixFSFile extends Exportable { diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts b/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts index d63a6786..31de81e6 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/identity.ts @@ -8,15 +8,15 @@ import { CustomProgressEvent } from 'progress-events' const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator) => { async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator { const { - offset, - length + start, + end } = validateOffsetAndLength(node.length, options.offset, options.length) - const buf = extractDataFromBlock(node, 0n, offset, offset + length) + const buf = extractDataFromBlock(node, 0n, start, end) options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:identity', { bytesRead: BigInt(buf.byteLength), - totalBytes: length - offset, + totalBytes: end - start, fileSize: BigInt(node.byteLength) })) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts b/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts index 502dba9f..d90cc026 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/raw.ts @@ -7,15 +7,15 @@ import { CustomProgressEvent } from 'progress-events' const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator) => { async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator { const { - offset, - length + start, + end } = validateOffsetAndLength(node.length, options.offset, options.length) - const buf = extractDataFromBlock(node, 0n, offset, offset + length) + const buf = extractDataFromBlock(node, 0n, start, end) options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:raw', { bytesRead: BigInt(buf.byteLength), - totalBytes: length - offset, + totalBytes: end - start, fileSize: BigInt(node.byteLength) })) diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 4a0abf63..07947804 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -15,7 +15,9 @@ import { CustomProgressEvent } from 'progress-events' async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise { // a `raw` node if (node instanceof Uint8Array) { - queue.push(extractDataFromBlock(node, streamPosition, start, end)) + const buf = extractDataFromBlock(node, streamPosition, start, end) + + queue.push(buf) return } @@ -123,6 +125,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A } } ) + + if (streamPosition >= end) { + queue.end() + } } const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => { @@ -134,34 +140,23 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, } const { - offset, - length + start, + end } = validateOffsetAndLength(fileSize, options.offset, options.length) - if (length === 0n) { + if (end === 0n) { return } let read = 0n - const wanted = length - offset + const wanted = end - start const queue = pushable() options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:file', { cid })) - void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options) - .then(() => { - if (read < wanted) { - throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ') - } - - if (read > wanted) { - throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ') - } - - queue.end() - }) + void walkDAG(blockstore, node, queue, 0n, start, end, options) .catch(err => { queue.end(err) }) @@ -173,7 +168,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, read += BigInt(buf.byteLength) - if (read === length) { + if (read > wanted) { + queue.end() + throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ') + } + + if (read === wanted) { queue.end() } @@ -185,6 +185,10 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, yield buf } + + if (read < wanted) { + throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ') + } } return yieldFileContent diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts index e85471b0..74a2ba30 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/raw.ts @@ -12,19 +12,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b const size = unixfs.data.length const { - offset, - length + start, + end } = validateOffsetAndLength(size, options.offset, options.length) options.onProgress?.(new CustomProgressEvent('unixfs:exporter:walk:raw', { cid })) - const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length) + const buf = extractDataFromBlock(unixfs.data, 0n, start, end) options.onProgress?.(new CustomProgressEvent('unixfs:exporter:progress:unixfs:raw', { bytesRead: BigInt(buf.byteLength), - totalBytes: length - offset, + totalBytes: end - start, fileSize: BigInt(unixfs.data.byteLength) })) diff --git a/packages/ipfs-unixfs-exporter/src/utils/validate-offset-and-length.ts b/packages/ipfs-unixfs-exporter/src/utils/validate-offset-and-length.ts index 0984aea9..da6d9427 100644 --- a/packages/ipfs-unixfs-exporter/src/utils/validate-offset-and-length.ts +++ b/packages/ipfs-unixfs-exporter/src/utils/validate-offset-and-length.ts @@ -1,36 +1,37 @@ import errCode from 'err-code' -const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { offset: bigint, length: bigint } => { - offset = BigInt(offset ?? 0) - length = BigInt(length ?? size) +const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { start: bigint, end: bigint } => { + const fileSize = BigInt(size) + const start = BigInt(offset ?? 0) + let end = BigInt(length) - if (offset == null) { - offset = 0n + if (end !== fileSize) { + end = start + end } - if (offset < 0n) { - throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS') + if (end > fileSize) { + end = fileSize } - if (offset > size) { - throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS') + if (start < 0n) { + throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS') } - if (length == null) { - length = BigInt(size) - offset + if (start > fileSize) { + throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS') } - if (length < 0n) { + if (end < 0n) { throw errCode(new Error('Length must be greater than or equal to 0'), 'ERR_INVALID_PARAMS') } - if (offset + length > size) { - length = BigInt(size) - offset + if (end > fileSize) { + throw errCode(new Error('Length must be less than the file size'), 'ERR_INVALID_PARAMS') } return { - offset, - length + start, + end } } diff --git a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts index 12cb9373..367611aa 100644 --- a/packages/ipfs-unixfs-exporter/test/exporter.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/exporter.spec.ts @@ -25,6 +25,9 @@ import type { Blockstore } from 'interface-blockstore' import { balanced, FileLayout, flat, trickle } from 'ipfs-unixfs-importer/layout' import type { Chunker } from 'ipfs-unixfs-importer/chunker' import { fixedSize } from 'ipfs-unixfs-importer/chunker' +import toBuffer from 'it-to-buffer' +import { Readable } from 'readable-stream' +import { isNode } from 'wherearewe' const ONE_MEG = Math.pow(1024, 2) @@ -1229,4 +1232,81 @@ describe('exporter', () => { signal: abortController.signal })).to.eventually.be.rejectedWith(message) }) + + it('should support being used with readable-stream', async () => { + if (!isNode) { + // node-only test + return + } + + let dataSizeInBytes = 10 + + // iterate through order of magnitude in size until hitting 10MB + while (dataSizeInBytes <= 10_000_000) { + const bytes = await toBuffer(randomBytes(dataSizeInBytes)) + + // chunk up the bytes to simulate a more real-world like behavior + const chunkLength = 100_000 + let currentIndex = 0 + + const readableStream = new Readable({ + read (): void { + // if this is the last chunk + if (currentIndex + chunkLength > bytes.length) { + this.push(bytes.subarray(currentIndex)) + this.push(null) + } else { + this.push(bytes.subarray(currentIndex, currentIndex + chunkLength)) + + currentIndex = currentIndex + chunkLength + } + } + }) + + const result = await last(importer([{ + content: readableStream + }], block)) + + if (result == null) { + throw new Error('Import failed') + } + + const file = await exporter(result.cid, block) + const contentIterator = file.content() + + const readableStreamToBytes = async (readableStream: Readable): Promise => { + return await new Promise((resolve, reject) => { + const chunks: any[] = [] + readableStream.on('data', chunk => { + chunks.push(chunk) + }) + + readableStream.on('end', () => { + const uint8Array = uint8ArrayConcat(chunks) + resolve(uint8Array) + }) + + readableStream.on('error', reject) + }) + } + + const dataStream = new Readable({ + async read (): Promise { + const result = await contentIterator.next() + if (result.done === true) { + this.push(null) // end the stream + } else { + this.push(result.value) + } + } + }) + + const data = await readableStreamToBytes(dataStream) + + expect(data.byteLength).to.equal(dataSizeInBytes) + expect(data).to.equalBytes(bytes) + + dataSizeInBytes *= 10 + } + }) })