From 326ba7848a8b0e31f7baac3fb5154b9c1c95b365 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 22 Jul 2021 15:52:05 +0100 Subject: [PATCH 1/3] fix: use streaming methods where possible The base adapter class calls `.put` repeatedly when `.putMany` is invoked in order to reduce the number of methods a datastore has to implement to be compatible with the datastore interface. This hurts performance where datastores wrap other datastores that can take advantage of the streaming methods to exectute certain operations in parallel. The change here is to call streaming methods from streaming methods where it makes sense to do so, instead of letting the adapter class convert streaming method invocations into serial method invocations. --- package.json | 2 ++ src/keytransform.js | 79 +++++++++++++++++++++++++++++++++++++++++++++ src/mount.js | 5 --- src/sharding.js | 34 ++++++++++++++++++- src/tiered.js | 77 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index ccaec9f..4eb81d5 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,8 @@ "it-filter": "^1.0.2", "it-map": "^1.0.5", "it-merge": "^1.0.1", + "it-pipe": "^1.1.0", + "it-pushable": "^1.4.2", "it-take": "^1.0.1", "uint8arrays": "^2.1.5" }, diff --git a/src/keytransform.js b/src/keytransform.js index 9ba7765..a5fb88f 100644 --- a/src/keytransform.js +++ b/src/keytransform.js @@ -2,6 +2,7 @@ const { Adapter } = require('interface-datastore') const map = require('it-map') +const { pipe } = require('it-pipe') /** * @typedef {import('interface-datastore').Datastore} Datastore @@ -10,9 +11,15 @@ const map = require('it-map') * @typedef {import('interface-datastore').Query} Query * @typedef {import('interface-datastore').KeyQuery} KeyQuery * @typedef {import('interface-datastore').Key} Key + * @typedef {import('interface-datastore').Pair} Pair * @typedef {import('./types').KeyTransform} KeyTransform */ +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable + */ + /** * A datastore shim, that wraps around a given datastore, changing * the way keys look to the user, for example namespacing @@ -69,6 +76,78 @@ class KeyTransformDatastore extends Adapter { return this.child.delete(this.transform.convert(key), options) } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, ({ key, value }) => ({ + key: transform.convert(key), + value + })) + }, + async function * (source) { + yield * child.putMany(source, options) + }, + async function * (source) { + yield * map(source, ({ key, value }) => ({ + key: transform.invert(key), + value + })) + } + ) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * getMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, key => transform.convert(key)) + }, + async function * (source) { + yield * child.getMany(source, options) + } + ) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, key => transform.convert(key)) + }, + async function * (source) { + yield * child.deleteMany(source, options) + }, + async function * (source) { + yield * map(source, key => transform.invert(key)) + } + ) + } + /** * @returns {Batch} */ diff --git a/src/mount.js b/src/mount.js index a5ff3a2..1d5972d 100644 --- a/src/mount.js +++ b/src/mount.js @@ -22,11 +22,6 @@ const Keytransform = require('./keytransform') * @typedef {import('./types').KeyTransform} KeyTransform */ -/** - * @template TEntry - * @typedef {import('./types').AwaitIterable} AwaitIterable - */ - /** * A datastore that can combine multiple stores inside various * key prefixes diff --git a/src/sharding.js b/src/sharding.js index a493556..d506394 100644 --- a/src/sharding.js +++ b/src/sharding.js @@ -22,7 +22,12 @@ const shardReadmeKey = new Key(sh.README_FN) */ /** * @template TValue - * @typedef {import('./types').Await } Await + * @typedef {import('interface-store').Await } Await + */ + +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable */ /** @@ -162,6 +167,33 @@ class ShardingDatastore extends Adapter { return this.child.delete(key, options) } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + yield * this.child.putMany(source, options) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * getMany (source, options = {}) { + yield * this.child.getMany(source, options) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + yield * this.child.deleteMany(source, options) + } + batch () { return this.child.batch() } diff --git a/src/tiered.js b/src/tiered.js index a988d7f..b117ea6 100644 --- a/src/tiered.js +++ b/src/tiered.js @@ -2,6 +2,9 @@ const { Adapter, Errors } = require('interface-datastore') const log = require('debug')('datastore:core:tiered') +const pushable = require('it-pushable') +const drain = require('it-drain') + /** * @typedef {import('interface-datastore').Datastore} Datastore * @typedef {import('interface-datastore').Options} Options @@ -9,6 +12,12 @@ const log = require('debug')('datastore:core:tiered') * @typedef {import('interface-datastore').Query} Query * @typedef {import('interface-datastore').KeyQuery} KeyQuery * @typedef {import('interface-datastore').Key} Key + * @typedef {import('interface-datastore').Pair} Pair + */ + +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable */ /** @@ -90,6 +99,74 @@ class TieredDatastore extends Adapter { } } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + let error + const pushables = this.stores.map(store => { + const source = pushable() + + drain(store.putMany(source, options)) + .catch(err => { + // store threw while putting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const pair of source) { + if (error) { + throw error + } + + pushables.forEach(p => p.push(pair)) + + yield pair + } + } finally { + pushables.forEach(p => p.end()) + } + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + let error + const pushables = this.stores.map(store => { + const source = pushable() + + drain(store.deleteMany(source, options)) + .catch(err => { + // store threw while putting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const key of source) { + if (error) { + throw error + } + + pushables.forEach(p => p.push(key)) + + yield key + } + } finally { + pushables.forEach(p => p.end()) + } + } + async close () { await Promise.all(this.stores.map(store => store.close())) } From 26f1534dd6ff227c948ee51fe2439912d3c04f39 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 23 Jul 2021 17:33:50 +0100 Subject: [PATCH 2/3] chore: linting --- src/tiered.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiered.js b/src/tiered.js index b117ea6..3bbb7f5 100644 --- a/src/tiered.js +++ b/src/tiered.js @@ -145,7 +145,7 @@ class TieredDatastore extends Adapter { drain(store.deleteMany(source, options)) .catch(err => { - // store threw while putting, make sure we bubble the error up + // store threw while deleting, make sure we bubble the error up error = err }) From 119b05f19101e0e94dde20cc86cfb6ef2263b6bf Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 23 Jul 2021 17:35:29 +0100 Subject: [PATCH 3/3] chore: add missing dep --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 4eb81d5..8c98f26 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "dependencies": { "debug": "^4.1.1", "interface-datastore": "^5.1.1", + "it-drain": "^1.0.4", "it-filter": "^1.0.2", "it-map": "^1.0.5", "it-merge": "^1.0.1",