From 080bc2c157be37fb056a9eed846364e088bcee6c Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 10 Jun 2019 16:22:26 +0100 Subject: [PATCH 1/4] feat: reprovider --- package.json | 2 +- .../files-regular/refs-local-pull-stream.js | 18 +- src/core/components/start.js | 10 + src/core/config.js | 4 + src/core/provider/index.js | 93 ++++++++ src/core/provider/queue.js | 110 ++++++++++ src/core/provider/reprovider.js | 89 ++++++++ src/core/runtime/config-browser.js | 4 + src/core/runtime/config-nodejs.js | 4 + src/core/utils.js | 41 ++++ test/core/provider.spec.js | 205 ++++++++++++++++++ 11 files changed, 565 insertions(+), 15 deletions(-) create mode 100644 src/core/provider/index.js create mode 100644 src/core/provider/queue.js create mode 100644 src/core/provider/reprovider.js create mode 100644 test/core/provider.spec.js diff --git a/package.json b/package.json index ce09553dc9..f3ac41fdf0 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "hashlru": "^2.3.0", "human-to-milliseconds": "^2.0.0", "interface-datastore": "~0.6.0", - "ipfs-bitswap": "~0.24.1", + "ipfs-bitswap": "ipfs/js-ipfs-bitswap#feat/use-ipfs-provider", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.15.1", "ipfs-http-client": "^32.0.1", diff --git a/src/core/components/files-regular/refs-local-pull-stream.js b/src/core/components/files-regular/refs-local-pull-stream.js index 5691df2cc6..e0ce6ae479 100644 --- a/src/core/components/files-regular/refs-local-pull-stream.js +++ b/src/core/components/files-regular/refs-local-pull-stream.js @@ -1,9 +1,8 @@ 'use strict' -const CID = require('cids') -const base32 = require('base32.js') const pull = require('pull-stream') const pullDefer = require('pull-defer') +const { blockKeyToCid } = require('../../utils') module.exports = function (self) { return () => { @@ -14,21 +13,12 @@ module.exports = function (self) { return deferred.resolve(pull.error(err)) } - const refs = blocks.map(b => dsKeyToRef(b.key)) + const refs = blocks.map(b => ({ + ref: blockKeyToCid(b.key).toString() + })) deferred.resolve(pull.values(refs)) }) return deferred } } - -function dsKeyToRef (key) { - try { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) - return { ref: new CID(buff).toString() } - } catch (err) { - return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } - } -} diff --git a/src/core/components/start.js b/src/core/components/start.js index 8fc220cdbc..290bd80a93 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -1,11 +1,13 @@ 'use strict' +const get = require('dlv') const series = require('async/series') const Bitswap = require('ipfs-bitswap') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const IPNS = require('../ipns') +const Provider = require('../provider') const routingConfig = require('../ipns/routing/config') const createLibp2pBundle = require('./libp2p') @@ -45,6 +47,8 @@ module.exports = (self) => { libp2p.start(err => { if (err) return cb(err) self.libp2p = libp2p + + self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider')) cb() }) }) @@ -56,9 +60,15 @@ module.exports = (self) => { self._bitswap = new Bitswap( self.libp2p, self._repo.blocks, + self._provider, { statsEnabled: true } ) + if (!get(self._options, 'offline') && + (get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) { + self._provider.start() + } + self._bitswap.start() self._blockService.setExchange(self._bitswap) diff --git a/src/core/config.js b/src/core/config.js index 2fb66bb558..b2493472d9 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -66,6 +66,10 @@ const configSchema = s({ Enabled: 'boolean?' })) })), + Reprovider: optional(s({ + Interval: 'string?', + Strategy: 'string?' + })), Bootstrap: optional(s(['multiaddr-ipfs'])) })), ipld: 'object?', diff --git a/src/core/provider/index.js b/src/core/provider/index.js new file mode 100644 index 0000000000..918a352309 --- /dev/null +++ b/src/core/provider/index.js @@ -0,0 +1,93 @@ +'use strict' + +const errCode = require('err-code') +const human = require('human-to-milliseconds') +const promisify = require('promisify-es6') + +const CID = require('cids') + +const Reprovider = require('./reprovider') + +class Provider { + /** + * Provider goal is to announce blocks to the network. + * It keeps track of which blocks are provided, and allow them to be reprovided + * @param {Libp2p} libp2p + * @param {Blockstore} blockstore + * @param {object} options + * @memberof Provider + */ + constructor (libp2p, blockstore, options) { + this._running = false + + this._contentRouting = libp2p.contentRouting + this._blockstore = blockstore + this._options = options + this.reprovider = undefined + } + + /** + * Begin processing the provider work + * @returns {void} + */ + async start () { + // do not run twice + if (this._running) { + return + } + + this._running = true + + // handle options + const strategy = this._options.strategy || 'all' + const humanInterval = this._options.Interval || '12h' + const interval = await promisify((callback) => human(humanInterval, callback))() + const options = { + interval, + strategy + } + + this.reprovider = new Reprovider(this._contentRouting, this._blockstore, options) + + // Start reprovider + this.reprovider.start() + } + + /** + * Stop the provider + * @returns {void} + */ + stop () { + this._running = true + + // stop the reprovider + this.reprovider.stop() + } + + /** + * Announce block to the network and add and entry to the tracker + * Takes a cid and makes an attempt to announce it to the network + * @param {CID} cid + */ + async provide (cid) { + if (!CID.isCID(cid)) { + throw errCode('invalid CID to provide', 'ERR_INVALID_CID') + } + + await promisify((callback) => { + this._contentRouting.provide(cid, callback) + })() + } + + async findProviders (cid, options) { // eslint-disable-line require-await + if (!CID.isCID(cid)) { + throw errCode('invalid CID to find', 'ERR_INVALID_CID') + } + + return promisify((callback) => { + this._contentRouting.findProviders(cid, options, callback) + })() + } +} + +exports = module.exports = Provider diff --git a/src/core/provider/queue.js b/src/core/provider/queue.js new file mode 100644 index 0000000000..1a54231eab --- /dev/null +++ b/src/core/provider/queue.js @@ -0,0 +1,110 @@ +'use strict' + +const queue = require('async/queue') + +const debug = require('debug') +const log = debug('ipfs:provider') +log.error = debug('ipfs:provider:error') + +class WorkerQueue { + /** + * Creates an instance of a WorkerQueue. + * @param {function} executeWork + * @param {number} [concurrency=3] + */ + constructor (executeWork, concurrency = 3) { + this.executeWork = executeWork + this._concurrency = concurrency + + this.running = false + this.queue = this._setupQueue() + } + + /** + * Create the underlying async queue. + * @returns {queue} + */ + _setupQueue () { + const q = queue(async (block) => { + await this._processNext(block) + }, this._concurrency) + + // If there is an error, stop the worker + q.error = (err) => { + log.error(err) + this.stop(err) + } + + q.buffer = 0 + + return q + } + + /** + * Use the queue from async to keep `concurrency` amount items running + * @param {Block[]} blocks + * @returns {Promise} + */ + async execute (blocks) { + this.running = true + + // store the promise resolution functions to be resolved at end of queue + this.execution = {} + const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject })) + + // When all blocks have been processed, stop the worker + this.queue.drain = () => { + log('queue:drain') + this.stop() + } + + // Fill queue with blocks + this.queue.push(blocks) + + await execPromise + } + + /** + * Stop the worker, optionally an error is thrown if received + * + * @param {object} error + */ + stop (error) { + if (!this.running) { + return + } + + this.running = false + this.queue.kill() + + if (error) { + this.execution && this.execution.reject(error) + } else { + this.execution && this.execution.resolve() + } + } + + /** + * Process the next block in the queue. + * @param {Block} block + */ + async _processNext (block) { + if (!this.running) { + return + } + + // Execute work + log('queue:work') + + let execErr + try { + await this.executeWork(block) + } catch (err) { + execErr = err + } + + log('queue:work:done', execErr) + } +} + +exports = module.exports = WorkerQueue diff --git a/src/core/provider/reprovider.js b/src/core/provider/reprovider.js new file mode 100644 index 0000000000..49c1efc823 --- /dev/null +++ b/src/core/provider/reprovider.js @@ -0,0 +1,89 @@ +'use strict' + +const promisify = require('promisify-es6') +const WorkerQueue = require('./queue') + +const { blockKeyToCid } = require('../utils') + +// const initialDelay = 15000 +const initialDelay = 3000 + +class Reprovider { + /** + * Reprovider goal is to reannounce blocks to the network. + * @param {object} contentRouting + * @param {Blockstore} blockstore + * @param {object} options + * @memberof Reprovider + */ + constructor (contentRouting, blockstore, options) { + this._contentRouting = contentRouting + this._blockstore = blockstore + this._options = options + + this._timeoutId = undefined + this._worker = new WorkerQueue(this._provideBlock) + } + + /** + * Begin processing the reprovider work and waiting for reprovide triggers + * @returns {void} + */ + start () { + // Start doing reprovides after the initial delay + this._timeoutId = setTimeout(() => { + this._runPeriodically() + }, initialDelay) + } + + /** + * Stops the reprovider. Any active reprovide actions should be aborted + * @returns {void} + */ + stop () { + if (this._timeoutId) { + clearTimeout(this._timeoutId) + this._timeoutId = undefined + } + this._worker.stop() + } + + /** + * Run reprovide on every `options.interval` ms + * @returns {void} + */ + async _runPeriodically () { + while (this._timeoutId) { + const blocks = await promisify((callback) => this._blockstore.query({}, callback))() + + // TODO strategy logic here + if (this._options.strategy === 'pinned') { + + } else if (this._options.strategy === 'pinned') { + + } + + await this._worker.execute(blocks) + + // Each subsequent walk should run on a `this._options.interval` interval + await new Promise(resolve => { + this._timeoutId = setTimeout(resolve, this._options.interval) + }) + } + } + + /** + * Do the reprovide work to libp2p content routing + * @param {Block} block + * @returns {void} + */ + async _provideBlock (block) { + const cid = blockKeyToCid(block.key.toBuffer()) + + await promisify((callback) => { + this._contentRouting.provide(cid, callback) + })() + } +} + +exports = module.exports = Reprovider diff --git a/src/core/runtime/config-browser.js b/src/core/runtime/config-browser.js index 537316d431..3ec47ef977 100644 --- a/src/core/runtime/config-browser.js +++ b/src/core/runtime/config-browser.js @@ -27,6 +27,10 @@ module.exports = () => ({ '/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic', '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], + Reprovider: { + Interval: '12h', + Strategy: 'all' + }, Swarm: { ConnMgr: { LowWater: 200, diff --git a/src/core/runtime/config-nodejs.js b/src/core/runtime/config-nodejs.js index 60169ef562..525483a238 100644 --- a/src/core/runtime/config-nodejs.js +++ b/src/core/runtime/config-nodejs.js @@ -40,6 +40,10 @@ module.exports = () => ({ '/dns4/node0.preload.ipfs.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic', '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], + Reprovider: { + Interval: '12h', + Strategy: 'all' + }, Swarm: { ConnMgr: { LowWater: 200, diff --git a/src/core/utils.js b/src/core/utils.js index 98760b0338..d64764e308 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -4,6 +4,11 @@ const promisify = require('promisify-es6') const map = require('async/map') const isIpfs = require('is-ipfs') const CID = require('cids') +const base32 = require('base32.js') + +// const { Key } = require('interface-datastore') + +// const PIN_DS_KEY = new Key('/local/pins') const ERR_BAD_PATH = 'ERR_BAD_PATH' exports.OFFLINE_ERROR = 'This command must be run in online mode. Try running \'ipfs daemon\' first.' @@ -134,7 +139,43 @@ const resolvePath = promisify(function (objectAPI, ipfsPaths, callback) { } }, callback) }) +/** + * Convert a block key to cid + * @param {Key} key form / + * @returns {CID} + */ +function blockKeyToCid (key) { + try { + const decoder = new base32.Decoder() + const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) + return new CID(buff) + } catch (err) { + return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } + } +} + +/* +async function getInternalCidBlocks (ipfs) { + let mh + try { + mh = await promisify((cb) => ipfs.repo.datastore.get(PIN_DS_KEY, cb))() + } catch (err) { + if (err.code === 'ERR_NOT_FOUND') { + return [] + } + throw err + } + + const cid = new CID(mh) + const obj = await promisify((cb) => ipfs.dag.get(cid, '', { preload: false }, cb))() + + // The pinner stores an object that has two links to pin sets: + // 1. The directly pinned CIDs + // 2. The recursively pinned CIDs + // If large enough, these pin sets may have links to buckets to hold the pins +} */ exports.normalizePath = normalizePath exports.parseIpfsPath = parseIpfsPath exports.resolvePath = resolvePath +exports.blockKeyToCid = blockKeyToCid diff --git a/test/core/provider.spec.js b/test/core/provider.spec.js new file mode 100644 index 0000000000..ccbe60d193 --- /dev/null +++ b/test/core/provider.spec.js @@ -0,0 +1,205 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const sinon = require('sinon') + +const CID = require('cids') + +const IPFS = require('../../src') +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc' }) + +const INTERVAL = '10s' +const STRATEGY = 'all' + +const config = { + Bootstrap: [], + Reprovider: { + Interval: INTERVAL, + Strategy: STRATEGY + } +} + +describe('record provider', () => { + // if no dht nor delegated routing enabled + describe('disabled', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should not be running', () => { + expect(node._provider._running).to.equal(false) + expect(node._provider.reprovider).to.not.exist() + }) + }) + + describe('enabled with default configuration', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS, + libp2p: { + config: { + dht: { + enabled: true + } + } + } + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should be running', () => { + expect(node._provider._running).to.equal(true) + expect(node._provider.reprovider).to.exist() + expect(node._provider.reprovider._timeoutId).to.exist() + }) + + it('should use the defaults', () => { + expect(node._provider._options.Interval).to.equal('12h') + expect(node._provider._options.Strategy).to.equal('all') + }) + + it('should be able to provide a valid CIDs', async () => { + const cid = new CID('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ') + + try { + await node._provider.provide(cid) + } catch (err) { + expect(err).to.not.exist() + throw err + } + }) + + it('should thrown providing an invalid CIDs', async () => { + const cid = 'Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ' + + try { + await node._provider.provide(cid) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_CID') + } + }) + + it('should be able to find providers of a valid CID', async () => { + const cid = new CID('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ') + + let providers + try { + providers = await node._provider.findProviders(cid) + } catch (err) { + expect(err).to.not.exist() + throw err + } + + expect(providers).to.exist() + }) + + it('should thrown finding providers of an invalid CID', async () => { + const cid = 'Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ' + + try { + await node._provider.findProviders(cid) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.equal('ERR_INVALID_CID') + } + }) + }) + + describe('enabled with custom config', () => { + let node + let ipfsd + + before(function (done) { + this.timeout(50 * 1000) + + df.spawn({ + exec: IPFS, + config, + libp2p: { + config: { + dht: { + enabled: true + } + } + } + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + node = _ipfsd.api + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + it('should be running', () => { + expect(node._provider._running).to.equal(true) + expect(node._provider.reprovider).to.exist() + expect(node._provider.reprovider._timeoutId).to.exist() + }) + + it('should use the provided configuration', () => { + expect(node._provider._options.Interval).to.equal(INTERVAL) + expect(node._provider._options.Strategy).to.equal(STRATEGY) + }) + + it('should reprovide after tens seconds', function (done) { + this.timeout(20 * 1000) + + const reprovider = node._provider.reprovider + sinon.spy(reprovider, '_runPeriodically') + sinon.spy(reprovider._worker, '_processNext') + + setTimeout(() => { + expect(reprovider._runPeriodically.called).to.equal(true) + expect(reprovider._worker._processNext.called).to.equal(true) + + sinon.restore() + done() + }, 10000) + }) + }) + + describe.skip('reprovide strategies', () => { + + }) +}) From 37ca840c41340a92141b4ed83a0960a8d19d88b5 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 28 Jun 2019 10:56:52 +0100 Subject: [PATCH 2/4] chore: address dirk review --- package.json | 1 + src/core/config.js | 1 + src/core/provider/index.js | 33 ++++++++++++------ src/core/provider/queue.js | 56 ++++++------------------------ src/core/provider/reprovider.js | 42 ++++++++++++---------- src/core/runtime/config-browser.js | 1 + src/core/runtime/config-nodejs.js | 1 + test/core/provider.spec.js | 3 ++ 8 files changed, 64 insertions(+), 74 deletions(-) diff --git a/package.json b/package.json index f3ac41fdf0..e74b8bdefb 100644 --- a/package.json +++ b/package.json @@ -146,6 +146,7 @@ "multihashes": "~0.4.14", "multihashing-async": "~0.6.0", "node-fetch": "^2.3.0", + "p-queue": "^6.0.2", "peer-book": "~0.9.0", "peer-id": "~0.12.0", "peer-info": "~0.15.0", diff --git a/src/core/config.js b/src/core/config.js index b2493472d9..0c240e1050 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -67,6 +67,7 @@ const configSchema = s({ })) })), Reprovider: optional(s({ + Delay: 'string?', Interval: 'string?', Strategy: 'string?' })), diff --git a/src/core/provider/index.js b/src/core/provider/index.js index 918a352309..c6cb8a4761 100644 --- a/src/core/provider/index.js +++ b/src/core/provider/index.js @@ -12,12 +12,14 @@ class Provider { /** * Provider goal is to announce blocks to the network. * It keeps track of which blocks are provided, and allow them to be reprovided - * @param {Libp2p} libp2p - * @param {Blockstore} blockstore - * @param {object} options - * @memberof Provider + * @param {Libp2p} libp2p libp2p instance + * @param {Blockstore} blockstore blockstore instance + * @param {object} options reprovider options + * @param {string} options.delay reprovider initial delay in human friendly time + * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.strategy reprovider strategy */ - constructor (libp2p, blockstore, options) { + constructor (libp2p, blockstore, options = {}) { this._running = false this._contentRouting = libp2p.contentRouting @@ -38,11 +40,14 @@ class Provider { this._running = true - // handle options - const strategy = this._options.strategy || 'all' - const humanInterval = this._options.Interval || '12h' - const interval = await promisify((callback) => human(humanInterval, callback))() + // handle options (config uses uppercase) + const humanDelay = this._options.Delay || this._options.delay || '15s' + const delay = await human(humanDelay) + const humanInterval = this._options.Interval || this._options.interval || '12h' + const interval = await human(humanInterval) + const strategy = this._options.Strategy || this._options.strategy || 'all' const options = { + delay, interval, strategy } @@ -65,7 +70,7 @@ class Provider { } /** - * Announce block to the network and add and entry to the tracker + * Announce block to the network * Takes a cid and makes an attempt to announce it to the network * @param {CID} cid */ @@ -79,6 +84,14 @@ class Provider { })() } + /** + * Find providers of a block in the network + * @param {CID} cid cid of the block + * @param {object} options + * @param {number} options.timeout - how long the query should maximally run, in ms (default: 60000) + * @param {number} options.maxNumProviders - maximum number of providers to find + * @returns {Promise} + */ async findProviders (cid, options) { // eslint-disable-line require-await if (!CID.isCID(cid)) { throw errCode('invalid CID to find', 'ERR_INVALID_CID') diff --git a/src/core/provider/queue.js b/src/core/provider/queue.js index 1a54231eab..52c8492ddc 100644 --- a/src/core/provider/queue.js +++ b/src/core/provider/queue.js @@ -1,6 +1,6 @@ 'use strict' -const queue = require('async/queue') +const { default: PQueue } = require('p-queue') const debug = require('debug') const log = debug('ipfs:provider') @@ -17,71 +17,35 @@ class WorkerQueue { this._concurrency = concurrency this.running = false - this.queue = this._setupQueue() - } - - /** - * Create the underlying async queue. - * @returns {queue} - */ - _setupQueue () { - const q = queue(async (block) => { - await this._processNext(block) - }, this._concurrency) - - // If there is an error, stop the worker - q.error = (err) => { - log.error(err) - this.stop(err) - } - - q.buffer = 0 - - return q + this.queue = new PQueue({ concurrency }) } /** * Use the queue from async to keep `concurrency` amount items running * @param {Block[]} blocks - * @returns {Promise} */ async execute (blocks) { this.running = true - // store the promise resolution functions to be resolved at end of queue - this.execution = {} - const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject })) - - // When all blocks have been processed, stop the worker - this.queue.drain = () => { - log('queue:drain') - this.stop() - } + // Fill queue with the processing blocks function + this.queue.addAll(blocks.map((block) => async () => this._processNext(block))) // eslint-disable-line require-await - // Fill queue with blocks - this.queue.push(blocks) + // Wait for finishing + await this.queue.onIdle() - await execPromise + this.stop() } /** - * Stop the worker, optionally an error is thrown if received - * - * @param {object} error + * Stop the worker */ - stop (error) { + stop () { if (!this.running) { return } this.running = false - this.queue.kill() - - if (error) { - this.execution && this.execution.reject(error) - } else { - this.execution && this.execution.resolve() - } + this.queue.clear() } /** diff --git a/src/core/provider/reprovider.js b/src/core/provider/reprovider.js index 49c1efc823..a23bf9b995 100644 --- a/src/core/provider/reprovider.js +++ b/src/core/provider/reprovider.js @@ -5,16 +5,15 @@ const WorkerQueue = require('./queue') const { blockKeyToCid } = require('../utils') -// const initialDelay = 15000 -const initialDelay = 3000 - class Reprovider { /** * Reprovider goal is to reannounce blocks to the network. * @param {object} contentRouting * @param {Blockstore} blockstore * @param {object} options - * @memberof Reprovider + * @param {string} options.delay reprovider initial delay in human friendly time + * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.strategy reprovider strategy */ constructor (contentRouting, blockstore, options) { this._contentRouting = contentRouting @@ -33,7 +32,7 @@ class Reprovider { // Start doing reprovides after the initial delay this._timeoutId = setTimeout(() => { this._runPeriodically() - }, initialDelay) + }, this._options.delay) } /** @@ -49,27 +48,34 @@ class Reprovider { } /** - * Run reprovide on every `options.interval` ms + * Run reprovide on every `options.interval` ms recursively * @returns {void} */ async _runPeriodically () { - while (this._timeoutId) { - const blocks = await promisify((callback) => this._blockstore.query({}, callback))() - - // TODO strategy logic here - if (this._options.strategy === 'pinned') { + // Verify if stopped + if (!this._timeoutId) return - } else if (this._options.strategy === 'pinned') { + // TODO strategy logic here + const blocks = await promisify((callback) => this._blockstore.query({}, callback))() - } + if (this._options.strategy === 'pinned') { - await this._worker.execute(blocks) + } else if (this._options.strategy === 'pinned') { - // Each subsequent walk should run on a `this._options.interval` interval - await new Promise(resolve => { - this._timeoutId = setTimeout(resolve, this._options.interval) - }) } + + // Verify if stopped + if (!this._timeoutId) return + + await this._worker.execute(blocks) + + // Verify if stopped + if (!this._timeoutId) return + + // Each subsequent walk should run on a `this._options.interval` interval + this._timeoutId = setTimeout(() => { + this._runPeriodically() + }, this._options.interval) } /** diff --git a/src/core/runtime/config-browser.js b/src/core/runtime/config-browser.js index 3ec47ef977..bb673907d0 100644 --- a/src/core/runtime/config-browser.js +++ b/src/core/runtime/config-browser.js @@ -28,6 +28,7 @@ module.exports = () => ({ '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], Reprovider: { + Delay: '15s', Interval: '12h', Strategy: 'all' }, diff --git a/src/core/runtime/config-nodejs.js b/src/core/runtime/config-nodejs.js index 525483a238..45d5a56294 100644 --- a/src/core/runtime/config-nodejs.js +++ b/src/core/runtime/config-nodejs.js @@ -41,6 +41,7 @@ module.exports = () => ({ '/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6' ], Reprovider: { + Delay: '15s', Interval: '12h', Strategy: 'all' }, diff --git a/test/core/provider.spec.js b/test/core/provider.spec.js index ccbe60d193..9f7b5390d4 100644 --- a/test/core/provider.spec.js +++ b/test/core/provider.spec.js @@ -13,12 +13,14 @@ const IPFS = require('../../src') const DaemonFactory = require('ipfsd-ctl') const df = DaemonFactory.create({ type: 'proc' }) +const DELAY = '3s' const INTERVAL = '10s' const STRATEGY = 'all' const config = { Bootstrap: [], Reprovider: { + Delay: DELAY, Interval: INTERVAL, Strategy: STRATEGY } @@ -74,6 +76,7 @@ describe('record provider', () => { expect(err).to.not.exist() ipfsd = _ipfsd node = _ipfsd.api + done() }) }) From ca7ab95e193a13d4dadc3eb7c71c42e9a2a073a1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 28 Jun 2019 15:39:15 +0100 Subject: [PATCH 3/4] chore: apply suggestions from code review Co-Authored-By: dirkmc --- src/core/components/start.js | 18 +++++++++++++----- src/core/provider/index.js | 18 +++++++++++++++++- src/core/provider/reprovider.js | 6 +++--- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/core/components/start.js b/src/core/components/start.js index 290bd80a93..c7f9c10208 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -1,6 +1,7 @@ 'use strict' const get = require('dlv') +const callbackify = require('callbackify') const series = require('async/series') const Bitswap = require('ipfs-bitswap') const setImmediate = require('async/setImmediate') @@ -48,11 +49,23 @@ module.exports = (self) => { if (err) return cb(err) self.libp2p = libp2p + // create provider self._provider = new Provider(libp2p, self._repo.blocks, get(config, 'Reprovider')) cb() }) }) }, + (cb) => { + // start provider if libp2p routing enabled + if (!get(self._options, 'offline') && + (get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) { + const providerStart = callbackify(() => self._provider.start()) + + providerStart(cb) + } else { + cb() + } + }, (cb) => { const ipnsRouting = routingConfig(self) self._ipns = new IPNS(ipnsRouting, self._repo.datastore, self._peerInfo, self._keychain, self._options) @@ -64,11 +77,6 @@ module.exports = (self) => { { statsEnabled: true } ) - if (!get(self._options, 'offline') && - (get(self._options, 'libp2p.config.dht.enabled', false) || get(self._options, 'libp2p.modules.contentRouting', false))) { - self._provider.start() - } - self._bitswap.start() self._blockService.setExchange(self._bitswap) diff --git a/src/core/provider/index.js b/src/core/provider/index.js index c6cb8a4761..308372c5de 100644 --- a/src/core/provider/index.js +++ b/src/core/provider/index.js @@ -3,6 +3,7 @@ const errCode = require('err-code') const human = require('human-to-milliseconds') const promisify = require('promisify-es6') +const assert = require('assert') const CID = require('cids') @@ -26,6 +27,8 @@ class Provider { this._blockstore = blockstore this._options = options this.reprovider = undefined + + this._validateOptions() } /** @@ -63,7 +66,7 @@ class Provider { * @returns {void} */ stop () { - this._running = true + this._running = false // stop the reprovider this.reprovider.stop() @@ -101,6 +104,19 @@ class Provider { this._contentRouting.findProviders(cid, options, callback) })() } + + // Validate Provider options + _validateOptions () { + const delay = (this._options.Delay || this._options.delay) + assert(delay && parseInt(delay) !== 0, '0 delay is not a valid value for reprovider') + + const interval = (this._options.Interval || this._options.interval) + assert(interval && parseInt(interval) !== 0, '0 interval is not a valid value for reprovider') + + const strategy = (this._options.Strategy || this._options.strategy) + assert(strategy && (strategy === 'all' || strategy === 'pinned' || strategy === 'roots'), + 'Reprovider must have one of the following strategies: `all`, `pinned` or `roots`') + } } exports = module.exports = Provider diff --git a/src/core/provider/reprovider.js b/src/core/provider/reprovider.js index a23bf9b995..dbe2ba5bbc 100644 --- a/src/core/provider/reprovider.js +++ b/src/core/provider/reprovider.js @@ -11,8 +11,8 @@ class Reprovider { * @param {object} contentRouting * @param {Blockstore} blockstore * @param {object} options - * @param {string} options.delay reprovider initial delay in human friendly time - * @param {string} options.interval reprovider interval in human friendly time + * @param {string} options.delay reprovider initial delay in milliseconds + * @param {string} options.interval reprovider interval in milliseconds * @param {string} options.strategy reprovider strategy */ constructor (contentRouting, blockstore, options) { @@ -60,7 +60,7 @@ class Reprovider { if (this._options.strategy === 'pinned') { - } else if (this._options.strategy === 'pinned') { + } else if (this._options.strategy === 'roots') { } From 8141f038d45685e654f4a35c834cd96e43795ded Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 8 Jul 2019 17:51:32 +0200 Subject: [PATCH 4/4] chore: use new human-to-milliseconds --- src/core/provider/index.js | 43 +++++++++++++++++++------------------- test/core/provider.spec.js | 11 +++++----- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/core/provider/index.js b/src/core/provider/index.js index 308372c5de..e54a0c88c8 100644 --- a/src/core/provider/index.js +++ b/src/core/provider/index.js @@ -21,14 +21,29 @@ class Provider { * @param {string} options.strategy reprovider strategy */ constructor (libp2p, blockstore, options = {}) { + // Assert options + this._validateOptions(options) + this._running = false this._contentRouting = libp2p.contentRouting this._blockstore = blockstore - this._options = options - this.reprovider = undefined - this._validateOptions() + // handle options (config uses uppercase) + const humanDelay = options.Delay || options.delay || '15s' + const delay = human(humanDelay) + const humanInterval = options.Interval || options.interval || '12h' + const interval = human(humanInterval) + const strategy = options.Strategy || options.strategy || 'all' + + this._options = { + delay, + interval, + strategy + } + + this.reprovider = new Reprovider(this._contentRouting, this._blockstore, this._options) + } /** @@ -43,20 +58,6 @@ class Provider { this._running = true - // handle options (config uses uppercase) - const humanDelay = this._options.Delay || this._options.delay || '15s' - const delay = await human(humanDelay) - const humanInterval = this._options.Interval || this._options.interval || '12h' - const interval = await human(humanInterval) - const strategy = this._options.Strategy || this._options.strategy || 'all' - const options = { - delay, - interval, - strategy - } - - this.reprovider = new Reprovider(this._contentRouting, this._blockstore, options) - // Start reprovider this.reprovider.start() } @@ -106,14 +107,14 @@ class Provider { } // Validate Provider options - _validateOptions () { - const delay = (this._options.Delay || this._options.delay) + _validateOptions (options) { + const delay = (options.Delay || options.delay) assert(delay && parseInt(delay) !== 0, '0 delay is not a valid value for reprovider') - const interval = (this._options.Interval || this._options.interval) + const interval = (options.Interval || options.interval) assert(interval && parseInt(interval) !== 0, '0 interval is not a valid value for reprovider') - const strategy = (this._options.Strategy || this._options.strategy) + const strategy = (options.Strategy || options.strategy) assert(strategy && (strategy === 'all' || strategy === 'pinned' || strategy === 'roots'), 'Reprovider must have one of the following strategies: `all`, `pinned` or `roots`') } diff --git a/test/core/provider.spec.js b/test/core/provider.spec.js index 9f7b5390d4..cce293ee74 100644 --- a/test/core/provider.spec.js +++ b/test/core/provider.spec.js @@ -8,6 +8,7 @@ chai.use(dirtyChai) const sinon = require('sinon') const CID = require('cids') +const human = require('human-to-milliseconds') const IPFS = require('../../src') const DaemonFactory = require('ipfsd-ctl') @@ -52,7 +53,7 @@ describe('record provider', () => { it('should not be running', () => { expect(node._provider._running).to.equal(false) - expect(node._provider.reprovider).to.not.exist() + expect(node._provider.reprovider._timeoutId).to.not.exist() }) }) @@ -92,8 +93,8 @@ describe('record provider', () => { }) it('should use the defaults', () => { - expect(node._provider._options.Interval).to.equal('12h') - expect(node._provider._options.Strategy).to.equal('all') + expect(node._provider._options.interval).to.equal(human('12h')) + expect(node._provider._options.strategy).to.equal('all') }) it('should be able to provide a valid CIDs', async () => { @@ -181,8 +182,8 @@ describe('record provider', () => { }) it('should use the provided configuration', () => { - expect(node._provider._options.Interval).to.equal(INTERVAL) - expect(node._provider._options.Strategy).to.equal(STRATEGY) + expect(node._provider._options.interval).to.equal(human(INTERVAL)) + expect(node._provider._options.strategy).to.equal(STRATEGY) }) it('should reprovide after tens seconds', function (done) {