Skip to content

fix: pass onProgress option to blockstore #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
"it-pushable": "^3.1.0",
"multiformats": "^11.0.0",
"p-queue": "^7.3.0",
"progress-events": "^1.0.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs-exporter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import type { UnixFS } from 'ipfs-unixfs'
import type { PBNode } from '@ipld/dag-pb'
import type { Blockstore as InterfaceBlockstore } from 'interface-blockstore'
import type { Bucket } from 'hamt-sharding'
import type { ProgressOptions } from 'progress-events'

export interface ExporterOptions {
export interface ExporterOptions extends ProgressOptions {
offset?: number
length?: number
signal?: AbortSignal
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-exporter/src/resolvers/dag-cbor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as dagCbor from '@ipld/dag-cbor'
import type { Resolver } from '../index.js'

const resolve: Resolver = async (cid, name, path, toResolve, resolve, depth, blockstore, options) => {
const block = await blockstore.get(cid)
const block = await blockstore.get(cid, options)
const object = dagCbor.decode<any>(block)
let subObject = object
let subPath = path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ async function walkDAG (blockstore: Blockstore, node: dagPb.PBNode | Uint8Array,
childOps,
(source) => map(source, (op) => {
return async () => {
const block = await blockstore.get(op.link.Hash, {
signal: options.signal
})
const block = await blockstore.get(op.link.Hash, options)

return {
...op,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de
return { entries: result.entry == null ? [] : [result.entry] }
} else {
// descend into subshard
const block = await blockstore.get(link.Hash)
const block = await blockstore.get(link.Hash, options)
node = decode(block)

return { entries: listDirectory(node, path, resolve, depth, blockstore, options) }
Expand Down
11 changes: 6 additions & 5 deletions packages/ipfs-unixfs-exporter/test/importer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,13 +634,13 @@ strategies.forEach((strategy) => {
}
})

it('will call an optional progress function', async () => {
it('will call an optional onProgress function', async () => {
const chunkSize = 2048
const path = '1.2MiB.txt'
const progress = sinon.stub()
const onProgress = sinon.stub()

const options: Partial<ImporterOptions> = {
progress,
onProgress,
chunker: fixedSize({
chunkSize
})
Expand All @@ -651,8 +651,9 @@ strategies.forEach((strategy) => {
content: asAsyncIterable(bigFile)
}], block, options))

expect(progress.called).to.equal(true)
expect(progress.args[0]).to.deep.equal([chunkSize, path])
expect(onProgress.called).to.equal(true)
expect(onProgress.getCall(0).args[0]).to.have.property('type', 'unixfs:importer:progress')
expect(onProgress.getCall(0).args[0]).to.have.deep.property('detail', { bytes: chunkSize, path })
})

it('will import files with CID version 1', async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,15 @@
"@multiformats/murmur3": "^2.0.0",
"err-code": "^3.0.1",
"hamt-sharding": "^3.0.0",
"ipfs-unixfs": "^11.0.0",
"interface-blockstore": "^5.0.0",
"interface-store": "^4.0.0",
"ipfs-unixfs": "^11.0.0",
"it-all": "^2.0.0",
"it-batch": "^2.0.0",
"it-first": "^2.0.0",
"it-parallel-batch": "^2.0.0",
"multiformats": "^11.0.0",
"progress-events": "^1.0.0",
"rabin-wasm": "^0.1.4",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.2"
Expand Down
30 changes: 25 additions & 5 deletions packages/ipfs-unixfs-importer/src/dag-builder/buffer-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,46 @@ import { UnixFS } from 'ipfs-unixfs'
import { persist, PersistOptions } from '../utils/persist.js'
import * as dagPb from '@ipld/dag-pb'
import * as raw from 'multiformats/codecs/raw'
import type { BufferImporter, ProgressHandler } from '../index.js'
import type { BufferImporter } from '../index.js'
import type { Version } from 'multiformats/cid'
import { CustomProgressEvent } from 'progress-events'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export interface BufferImporterOptions {
/**
* Passed to the onProgress callback while importing files
*/
export interface ImportProgressData {
/**
* The size of the current chunk
*/
bytes: number

/**
* The path of the file being imported, if one was specified
*/
path?: string
}

export type BufferImportProgressEvents =
ProgressEvent<'unixfs:importer:progress', ImportProgressData>

export interface BufferImporterOptions extends ProgressOptions<BufferImportProgressEvents> {
cidVersion: Version
rawLeaves: boolean
leafType: 'file' | 'raw'
progress?: ProgressHandler
}

export function defaultBufferImporter (options: BufferImporterOptions): BufferImporter {
return async function * bufferImporter (file, block) {
for await (let buffer of file.content) {
yield async () => {
options.progress?.(buffer.length, file.path)
options.onProgress?.(new CustomProgressEvent<ImportProgressData>('unixfs:importer:progress', { bytes: buffer.length, path: file.path }))
let unixfs

const opts: PersistOptions = {
codec: dagPb,
cidVersion: options.cidVersion
cidVersion: options.cidVersion,
onProgress: options.onProgress
}

if (options.rawLeaves) {
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs-importer/src/dag-builder/dir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { persist } from '../utils/persist.js'
import { encode, prepare } from '@ipld/dag-pb'
import type { Directory, InProgressImportResult, Blockstore } from '../index.js'
import type { Version } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

export interface DirBuilderOptions {
export interface DirBuilderOptions extends ProgressOptions {
cidVersion: Version
signal?: AbortSignal
}
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs-importer/src/dag-builder/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as rawCodec from 'multiformats/codecs/raw'
import type { BufferImporter, File, InProgressImportResult, Blockstore } from '../index.js'
import type { FileLayout, Reducer } from '../layout/index.js'
import type { Version } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

interface BuildFileBatchOptions {
bufferImporter: BufferImporter
Expand Down Expand Up @@ -36,7 +37,7 @@ async function * buildFileBatch (file: File, blockstore: Blockstore, options: Bu
}
}

interface ReduceOptions {
interface ReduceOptions extends ProgressOptions {
reduceSingleLeafToSelf: boolean
cidVersion: Version
signal?: AbortSignal
Expand Down
23 changes: 11 additions & 12 deletions packages/ipfs-unixfs-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import { ChunkValidator, defaultChunkValidator } from './dag-builder/validate-ch
import { fixedSize } from './chunker/fixed-size.js'
import type { Chunker } from './chunker/index.js'
import { balanced, FileLayout } from './layout/index.js'
import { defaultBufferImporter } from './dag-builder/buffer-importer.js'
import { BufferImportProgressEvents, defaultBufferImporter } from './dag-builder/buffer-importer.js'
import first from 'it-first'
import errcode from 'err-code'
import type { AwaitIterable } from 'interface-store'
import type { ProgressOptions } from 'progress-events'

export type ByteStream = AwaitIterable<Uint8Array>
export type ImportContent = ByteStream | Uint8Array
Expand Down Expand Up @@ -64,15 +65,17 @@ export interface InProgressImportResult extends ImportResult {
originalPath?: string
}

export interface ProgressHandler { (chunkSize: number, path?: string): void }
export interface HamtHashFn { (value: Uint8Array): Promise<Uint8Array> }
export interface TreeBuilder { (source: AsyncIterable<InProgressImportResult>, blockstore: Blockstore): AsyncIterable<ImportResult> }
export interface BufferImporter { (file: File, blockstore: Blockstore): AsyncIterable<() => Promise<InProgressImportResult>> }

export type ImportProgressEvents =
BufferImportProgressEvents

/**
* Options to control the importer's behaviour
*/
export interface ImporterOptions {
export interface ImporterOptions extends ProgressOptions<ImportProgressEvents> {
/**
* When a file would span multiple DAGNodes, if this is true the leaf nodes
* will not be wrapped in `UnixFS` protobufs and will instead contain the
Expand Down Expand Up @@ -104,12 +107,6 @@ export interface ImporterOptions {
*/
cidVersion?: CIDVersion

/**
* A function that will be called with the byte length of chunks as a file
* is added to ipfs.
*/
progress?: ProgressHandler

/**
* If the serialized node is larger than this it might be converted to a HAMT
* sharded directory. Default: 256KiB
Expand Down Expand Up @@ -252,16 +249,18 @@ export async function * importer (source: ImportCandidateStream, blockstore: Blo
cidVersion,
rawLeaves,
leafType,
progress: options.progress
onProgress: options.onProgress
}),
blockWriteConcurrency,
reduceSingleLeafToSelf,
cidVersion
cidVersion,
onProgress: options.onProgress
})
const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({
wrapWithDirectory,
shardSplitThresholdBytes,
cidVersion
cidVersion,
onProgress: options.onProgress
})

for await (const entry of buildTree(parallelBatch(buildDag(candidates, blockstore), fileImportConcurrency), blockstore)) {
Expand Down
7 changes: 3 additions & 4 deletions packages/ipfs-unixfs-importer/src/utils/persist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { sha256 } from 'multiformats/hashes/sha2'
import type { Blockstore } from '../index.js'
import type { BlockCodec } from 'multiformats/codecs/interface'
import type { Version as CIDVersion } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

export interface PersistOptions {
export interface PersistOptions extends ProgressOptions {
codec?: BlockCodec<any, any>
cidVersion: CIDVersion
signal?: AbortSignal
Expand All @@ -19,9 +20,7 @@ export const persist = async (buffer: Uint8Array, blockstore: Blockstore, option
const multihash = await sha256.digest(buffer)
const cid = CID.create(options.cidVersion, options.codec.code, multihash)

await blockstore.put(cid, buffer, {
signal: options.signal
})
await blockstore.put(cid, buffer, options)

return cid
}
8 changes: 4 additions & 4 deletions packages/ipfs-unixfs-importer/test/benchmark.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-env mocha */

import { importer } from '../src/index.js'
import { importer, ImporterOptions } from '../src/index.js'
import bufferStream from 'it-buffer-stream'
import { MemoryBlockstore } from 'blockstore-core'
import drain from 'it-drain'
Expand Down Expand Up @@ -32,9 +32,9 @@ describe.skip('benchmark', function () {
let lastDate = Date.now()
let lastPercent = 0

const options = {
progress: (prog: number) => {
read += prog
const options: Partial<ImporterOptions> = {
onProgress: (evt) => {
read += evt.detail.bytes

const percent = Math.round((read / size) * 100)

Expand Down