diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 013dd9a0..3a2cf959 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -150,6 +150,7 @@ "it-pushable": "^3.1.0", "multiformats": "^11.0.0", "p-queue": "^7.3.0", + "progress-events": "^1.0.0", "uint8arrays": "^4.0.2" }, "devDependencies": { diff --git a/packages/ipfs-unixfs-exporter/src/index.ts b/packages/ipfs-unixfs-exporter/src/index.ts index 4528ece3..e456e5d0 100644 --- a/packages/ipfs-unixfs-exporter/src/index.ts +++ b/packages/ipfs-unixfs-exporter/src/index.ts @@ -6,8 +6,9 @@ import type { UnixFS } from 'ipfs-unixfs' import type { PBNode } from '@ipld/dag-pb' import type { Blockstore as InterfaceBlockstore } from 'interface-blockstore' import type { Bucket } from 'hamt-sharding' +import type { ProgressOptions } from 'progress-events' -export interface ExporterOptions { +export interface ExporterOptions extends ProgressOptions { offset?: number length?: number signal?: AbortSignal diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/dag-cbor.ts b/packages/ipfs-unixfs-exporter/src/resolvers/dag-cbor.ts index 13e5c2e4..8531e2df 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/dag-cbor.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/dag-cbor.ts @@ -4,7 +4,7 @@ import * as dagCbor from '@ipld/dag-cbor' import type { Resolver } from '../index.js' const resolve: Resolver = async (cid, name, path, toResolve, resolve, depth, blockstore, options) => { - const block = await blockstore.get(cid) + const block = await blockstore.get(cid, options) const object = dagCbor.decode(block) let subObject = object let subPath = path diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts index 191d97b1..d0d8f6db 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts @@ -72,9 +72,7 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array, childOps, (source) => map(source, (op) => { return async () => { - const block = await blockstore.get(op.link.Hash, { - signal: options.signal - }) + const block = await blockstore.get(op.link.Hash, options) return { ...op, diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts index 0d5086fe..bc6fbcc0 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/hamt-sharded-directory.ts @@ -27,7 +27,7 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de return { entries: result.entry == null ? [] : [result.entry] } } else { // descend into subshard - const block = await blockstore.get(link.Hash) + const block = await blockstore.get(link.Hash, options) node = decode(block) return { entries: listDirectory(node, path, resolve, depth, blockstore, options) } diff --git a/packages/ipfs-unixfs-exporter/test/importer.spec.ts b/packages/ipfs-unixfs-exporter/test/importer.spec.ts index 7d5cea99..afdb6a91 100644 --- a/packages/ipfs-unixfs-exporter/test/importer.spec.ts +++ b/packages/ipfs-unixfs-exporter/test/importer.spec.ts @@ -634,13 +634,13 @@ strategies.forEach((strategy) => { } }) - it('will call an optional progress function', async () => { + it('will call an optional onProgress function', async () => { const chunkSize = 2048 const path = '1.2MiB.txt' - const progress = sinon.stub() + const onProgress = sinon.stub() const options: Partial = { - progress, + onProgress, chunker: fixedSize({ chunkSize }) @@ -651,8 +651,9 @@ strategies.forEach((strategy) => { content: asAsyncIterable(bigFile) }], block, options)) - expect(progress.called).to.equal(true) - expect(progress.args[0]).to.deep.equal([chunkSize, path]) + expect(onProgress.called).to.equal(true) + expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress') + expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytes: chunkSize, path }) }) it('will import files with CID version 1', async () => { diff --git a/packages/ipfs-unixfs-importer/package.json b/packages/ipfs-unixfs-importer/package.json index 1f526443..b6348c6c 100644 --- a/packages/ipfs-unixfs-importer/package.json +++ b/packages/ipfs-unixfs-importer/package.json @@ -163,14 +163,15 @@ "@multiformats/murmur3": "^2.0.0", "err-code": "^3.0.1", "hamt-sharding": "^3.0.0", - "ipfs-unixfs": "^11.0.0", "interface-blockstore": "^5.0.0", "interface-store": "^4.0.0", + "ipfs-unixfs": "^11.0.0", "it-all": "^2.0.0", "it-batch": "^2.0.0", "it-first": "^2.0.0", "it-parallel-batch": "^2.0.0", "multiformats": "^11.0.0", + "progress-events": "^1.0.0", "rabin-wasm": "^0.1.4", "uint8arraylist": "^2.4.3", "uint8arrays": "^4.0.2" diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts index f9b77eab..f2a561f9 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts @@ -2,26 +2,46 @@ import { UnixFS } from 'ipfs-unixfs' import { persist, PersistOptions } from '../utils/persist.js' import * as dagPb from '@ipld/dag-pb' import * as raw from 'multiformats/codecs/raw' -import type { BufferImporter, ProgressHandler } from '../index.js' +import type { BufferImporter } from '../index.js' import type { Version } from 'multiformats/cid' +import { CustomProgressEvent } from 'progress-events' +import type { ProgressOptions, ProgressEvent } from 'progress-events' -export interface BufferImporterOptions { +/** + * Passed to the onProgress callback while importing files + */ +export interface ImportProgressData { + /** + * The size of the current chunk + */ + bytes: number + + /** + * The path of the file being imported, if one was specified + */ + path?: string +} + +export type BufferImportProgressEvents = + ProgressEvent<'unixfs:importer:progress', ImportProgressData> + +export interface BufferImporterOptions extends ProgressOptions { cidVersion: Version rawLeaves: boolean leafType: 'file' | 'raw' - progress?: ProgressHandler } export function defaultBufferImporter (options: BufferImporterOptions): BufferImporter { return async function * bufferImporter (file, block) { for await (let buffer of file.content) { yield async () => { - options.progress?.(buffer.length, file.path) + options.onProgress?.(new CustomProgressEvent('unixfs:importer:progress', { bytes: buffer.length, path: file.path })) let unixfs const opts: PersistOptions = { codec: dagPb, - cidVersion: options.cidVersion + cidVersion: options.cidVersion, + onProgress: options.onProgress } if (options.rawLeaves) { diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts index b367119b..57a54e6e 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/dir.ts @@ -3,8 +3,9 @@ import { persist } from '../utils/persist.js' import { encode, prepare } from '@ipld/dag-pb' import type { Directory, InProgressImportResult, Blockstore } from '../index.js' import type { Version } from 'multiformats/cid' +import type { ProgressOptions } from 'progress-events' -export interface DirBuilderOptions { +export interface DirBuilderOptions extends ProgressOptions { cidVersion: Version signal?: AbortSignal } diff --git a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts index 6329ecec..89be051b 100644 --- a/packages/ipfs-unixfs-importer/src/dag-builder/file.ts +++ b/packages/ipfs-unixfs-importer/src/dag-builder/file.ts @@ -6,6 +6,7 @@ import * as rawCodec from 'multiformats/codecs/raw' import type { BufferImporter, File, InProgressImportResult, Blockstore } from '../index.js' import type { FileLayout, Reducer } from '../layout/index.js' import type { Version } from 'multiformats/cid' +import type { ProgressOptions } from 'progress-events' interface BuildFileBatchOptions { bufferImporter: BufferImporter @@ -36,7 +37,7 @@ async function * buildFileBatch (file: File, blockstore: Blockstore, options: Bu } } -interface ReduceOptions { +interface ReduceOptions extends ProgressOptions { reduceSingleLeafToSelf: boolean cidVersion: Version signal?: AbortSignal diff --git a/packages/ipfs-unixfs-importer/src/index.ts b/packages/ipfs-unixfs-importer/src/index.ts index 2311eafa..47d960d3 100644 --- a/packages/ipfs-unixfs-importer/src/index.ts +++ b/packages/ipfs-unixfs-importer/src/index.ts @@ -8,10 +8,11 @@ import { ChunkValidator, defaultChunkValidator } from './dag-builder/validate-ch import { fixedSize } from './chunker/fixed-size.js' import type { Chunker } from './chunker/index.js' import { balanced, FileLayout } from './layout/index.js' -import { defaultBufferImporter } from './dag-builder/buffer-importer.js' +import { BufferImportProgressEvents, defaultBufferImporter } from './dag-builder/buffer-importer.js' import first from 'it-first' import errcode from 'err-code' import type { AwaitIterable } from 'interface-store' +import type { ProgressOptions } from 'progress-events' export type ByteStream = AwaitIterable export type ImportContent = ByteStream | Uint8Array @@ -64,15 +65,17 @@ export interface InProgressImportResult extends ImportResult { originalPath?: string } -export interface ProgressHandler { (chunkSize: number, path?: string): void } export interface HamtHashFn { (value: Uint8Array): Promise } export interface TreeBuilder { (source: AsyncIterable, blockstore: Blockstore): AsyncIterable } export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise> } +export type ImportProgressEvents = + BufferImportProgressEvents + /** * Options to control the importer's behaviour */ -export interface ImporterOptions { +export interface ImporterOptions extends ProgressOptions { /** * When a file would span multiple DAGNodes, if this is true the leaf nodes * will not be wrapped in `UnixFS` protobufs and will instead contain the @@ -104,12 +107,6 @@ export interface ImporterOptions { */ cidVersion?: CIDVersion - /** - * A function that will be called with the byte length of chunks as a file - * is added to ipfs. - */ - progress?: ProgressHandler - /** * If the serialized node is larger than this it might be converted to a HAMT * sharded directory. Default: 256KiB @@ -252,16 +249,18 @@ export async function * importer (source: ImportCandidateStream, blockstore: Blo cidVersion, rawLeaves, leafType, - progress: options.progress + onProgress: options.onProgress }), blockWriteConcurrency, reduceSingleLeafToSelf, - cidVersion + cidVersion, + onProgress: options.onProgress }) const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({ wrapWithDirectory, shardSplitThresholdBytes, - cidVersion + cidVersion, + onProgress: options.onProgress }) for await (const entry of buildTree(parallelBatch(buildDag(candidates, blockstore), fileImportConcurrency), blockstore)) { diff --git a/packages/ipfs-unixfs-importer/src/utils/persist.ts b/packages/ipfs-unixfs-importer/src/utils/persist.ts index 06b6557b..7a86fa03 100644 --- a/packages/ipfs-unixfs-importer/src/utils/persist.ts +++ b/packages/ipfs-unixfs-importer/src/utils/persist.ts @@ -4,8 +4,9 @@ import { sha256 } from 'multiformats/hashes/sha2' import type { Blockstore } from '../index.js' import type { BlockCodec } from 'multiformats/codecs/interface' import type { Version as CIDVersion } from 'multiformats/cid' +import type { ProgressOptions } from 'progress-events' -export interface PersistOptions { +export interface PersistOptions extends ProgressOptions { codec?: BlockCodec cidVersion: CIDVersion signal?: AbortSignal @@ -19,9 +20,7 @@ export const persist = async (buffer: Uint8Array, blockstore: Blockstore, option const multihash = await sha256.digest(buffer) const cid = CID.create(options.cidVersion, options.codec.code, multihash) - await blockstore.put(cid, buffer, { - signal: options.signal - }) + await blockstore.put(cid, buffer, options) return cid } diff --git a/packages/ipfs-unixfs-importer/test/benchmark.spec.ts b/packages/ipfs-unixfs-importer/test/benchmark.spec.ts index cde008b8..5c843fdc 100644 --- a/packages/ipfs-unixfs-importer/test/benchmark.spec.ts +++ b/packages/ipfs-unixfs-importer/test/benchmark.spec.ts @@ -1,6 +1,6 @@ /* eslint-env mocha */ -import { importer } from '../src/index.js' +import { importer, ImporterOptions } from '../src/index.js' import bufferStream from 'it-buffer-stream' import { MemoryBlockstore } from 'blockstore-core' import drain from 'it-drain' @@ -32,9 +32,9 @@ describe.skip('benchmark', function () { let lastDate = Date.now() let lastPercent = 0 - const options = { - progress: (prog: number) => { - read += prog + const options: Partial = { + onProgress: (evt) => { + read += evt.detail.bytes const percent = Math.round((read / size) * 100)