From 8bfbb20c91daec683429f5cb8e30f2079c57dafd Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 5 May 2016 21:36:57 +0200 Subject: [PATCH 1/5] feat: add unwantBlocks method --- API.md | 8 + src/index.js | 66 +++++++-- src/wantlist/index.js | 6 + src/wantmanager/index.js | 14 +- test/index-test.js | 313 ++++++++++++++++++++++----------------- 5 files changed, 254 insertions(+), 153 deletions(-) diff --git a/API.md b/API.md index 53c150d7..9768a025 100644 --- a/API.md +++ b/API.md @@ -27,6 +27,14 @@ Fetch a single block. Fetch multiple blocks. +### `unwantBlocks(keys)` + +- `keys: []Multihash` + +Cancel previously requested keys, forcefully. That means they are removed from the +wantlist independent of how many other resources requested these keys. Callbacks +attached to `getBlock` are errored with `Error('manual unwant: key')`. + ### `cancelWants(keys)` - `keys: []Multihash` diff --git a/src/index.js b/src/index.js index 7c968f38..e213113a 100644 --- a/src/index.js +++ b/src/index.js @@ -138,9 +138,9 @@ module.exports = class Bitwap { cb(err, block) } - this.getBlocks([key], (err, res) => { - if (err) { - return done(err) + this.getBlocks([key], (errs, res) => { + if (errs) { + return done(errs[0]) } done(null, res[0]) @@ -154,14 +154,53 @@ module.exports = class Bitwap { getBlocks (keys, cb) { const blocks = [] - const finish = (block) => { - blocks.push(block) - log('finish: %s/%s', blocks.length, keys.length) - if (blocks.length === keys.length) { + const errs = [] + const unwantListeners = {} + const blockListeners = {} + const unwantEvent = (key) => `unwant:${key.toString('hex')}` + const blockEvent = (key) => `block:${key.toString('hex')}` + + const cleanupListeners = () => { + keys.forEach((key) => { + this.notifications.removeListener(unwantEvent(key), unwantListeners[key]) + this.notifications.removeListener(blockEvent(key), blockListeners[key]) + }) + } + + const addListeners = () => { + keys.forEach((key) => { + unwantListeners[key] = () => { + finish(new Error(`manual unwant: ${key.toString('hex')}`)) + } + + blockListeners[key] = (block) => { + finish(null, block) + } + + this.notifications.once(unwantEvent(key), unwantListeners[key]) + this.notifications.once(blockEvent(key), blockListeners[key]) + }) + } + + const finish = (err, block) => { + if (err) { + errs.push(err) + } + if (block) { + blocks.push(block) + } + + if (blocks.length + errs.length === keys.length) { + cleanupListeners() + if (errs.length > 0) { + return cb(errs, blocks) + } cb(null, blocks) } } + addListeners() + keys.forEach((key) => { // Sanity check, we don't want to announce looking for blocks // when we might have them ourselves @@ -175,7 +214,7 @@ module.exports = class Bitwap { this.datastore.get(key, (err, res) => { if (!err && res) { this.wm.cancelWants([key]) - finish(res) + finish(null, res) return } @@ -185,14 +224,19 @@ module.exports = class Bitwap { }) } }) - this.notifications.once(`block:${key.toString('hex')}`, (block) => { - finish(block) - }) }) this.wm.wantBlocks(keys) } + // removes the given keys from the want list independent of any ref counts + unwantBlocks (keys) { + this.wm.unwantBlocks(keys) + keys.forEach((key) => { + this.notifications.emit(`unwant:${key.toString('hex')}`) + }) + } + // removes the given keys from the want list cancelWants (keys) { this.wm.cancelWants(keys) diff --git a/src/wantlist/index.js b/src/wantlist/index.js index dfd912e9..d08f68bd 100644 --- a/src/wantlist/index.js +++ b/src/wantlist/index.js @@ -35,6 +35,12 @@ class Wantlist { this.set.delete(key.toString('hex')) } + removeForce (key) { + if (this.set.has(key)) { + this.set.delete(key) + } + } + entries () { return this.set.entries() } diff --git a/src/wantmanager/index.js b/src/wantmanager/index.js index e618b474..1f0c77d8 100644 --- a/src/wantmanager/index.js +++ b/src/wantmanager/index.js @@ -23,7 +23,7 @@ module.exports = class Wantmanager { return new MsgQueue(peerId, this.network) } - _addEntries (keys, cancel) { + _addEntries (keys, cancel, force) { let i = -1 _(keys) .map((key) => { @@ -33,7 +33,11 @@ module.exports = class Wantmanager { .tap((e) => { // add changes to our wantlist if (e.cancel) { - this.wl.remove(e.key) + if (force) { + this.wl.removeForce(e.key) + } else { + this.wl.remove(e.key) + } } else { this.wl.add(e.key, e.priority) } @@ -90,6 +94,12 @@ module.exports = class Wantmanager { this._addEntries(keys, false) } + // remove blocks of all the given keys without respecting refcounts + unwantBlocks (keys) { + log('unwant blocks:', keys) + this._addEntries(keys, true, true) + } + // cancel wanting all of the given keys cancelWants (keys) { log('cancel wants: ', keys) diff --git a/test/index-test.js b/test/index-test.js index 32e11daa..139de17e 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -120,170 +120,203 @@ module.exports = (repo) => { }, done) }) }) - }) + describe('getBlock', () => { + let store - describe('getBlock', () => { - let store + before((done) => { + repo.create('hello', (err, r) => { + if (err) return done(err) + store = r.datastore + done() + }) + }) - before((done) => { - repo.create('hello', (err, r) => { - if (err) return done(err) - store = r.datastore - done() + after((done) => { + repo.remove(done) }) - }) - after((done) => { - repo.remove(done) - }) + it('block exists locally', (done) => { + const me = PeerId.create({bits: 64}) + const block = new Block('hello') + store.put(block, (err) => { + if (err) throw err + const bs = new Bitswap(me, libp2pMock, store) + + bs.getBlock(block.key, (err, res) => { + if (err) throw err - it('block exists locally', (done) => { - const me = PeerId.create({bits: 64}) - const block = new Block('hello') - store.put(block, (err) => { - if (err) throw err + expect(res).to.be.eql(block) + done() + }) + }) + }) + + // Not sure if I understand what is going on here + // test fails because now the network is not properly mocked + // what are these net.stores and mockNet.bitswaps? + it.skip('block is retrived from peer', (done) => { + const block = new Block('hello world') + + let mockNet + async.waterfall([ + (cb) => utils.createMockNet(repo, 2, cb), + (net, cb) => { + mockNet = net + net.stores[1].put(block, cb) + }, + (val, cb) => { + mockNet.bitswaps[0]._onPeerConnected(mockNet.ids[1]) + mockNet.bitswaps[1]._onPeerConnected(mockNet.ids[0]) + mockNet.bitswaps[0].getBlock(block.key, cb) + }, + (res, cb) => { + expect(res).to.be.eql(res) + cb() + } + ], done) + }) + + it('block is added locally afterwards', (done) => { + const me = PeerId.create({bits: 64}) + const block = new Block('world') const bs = new Bitswap(me, libp2pMock, store) + const net = utils.mockNetwork() + bs.network = net + bs.wm.network = net + bs.engine.network = net bs.start() bs.getBlock(block.key, (err, res) => { if (err) throw err - expect(res).to.be.eql(block) done() }) + setTimeout(() => { + bs.hasBlock(block, () => {}) + }, 200) }) - }) - // Not sure if I understand what is going on here - // test fails because now the network is not properly mocked - // what are these net.stores and mockNet.bitswaps? - it.skip('block is retrived from peer', (done) => { - const block = new Block('hello world') - - let mockNet - async.waterfall([ - (cb) => utils.createMockNet(repo, 2, cb), - (net, cb) => { - mockNet = net - net.stores[1].put(block, cb) - }, - (val, cb) => { - mockNet.bitswaps[0]._onPeerConnected(mockNet.ids[1]) - mockNet.bitswaps[1]._onPeerConnected(mockNet.ids[0]) - mockNet.bitswaps[0].getBlock(block.key, cb) - }, - (res, cb) => { - expect(res).to.be.eql(res) - cb() + it('block is sent after local add', (done) => { + const me = PeerId.create({bits: 64}) + const other = PeerId.create({bits: 64}) + const block = new Block('hello world local add') + let bs1 + let bs2 + let n1 + let n2 + + n1 = { + connectTo (id, cb) { + let err + if (id.toHexString() !== other.toHexString()) { + err = new Error('unkown peer') + } + async.setImmediate(() => cb(err)) + }, + sendMessage (id, msg, cb) { + if (id.toHexString() === other.toHexString()) { + bs2._receiveMessage(me, msg, cb) + } else { + async.setImmediate(() => cb(new Error('unkown peer'))) + } + } } - ], done) + n2 = { + connectTo (id, cb) { + let err + if (id.toHexString() !== me.toHexString()) { + err = new Error('unkown peer') + } + async.setImmediate(() => cb(err)) + }, + sendMessage (id, msg, cb) { + if (id.toHexString() === me.toHexString()) { + bs1._receiveMessage(other, msg, cb) + } else { + async.setImmediate(() => cb(new Error('unkown peer'))) + } + } + } + bs1 = new Bitswap(me, libp2pMock, store) + utils.applyNetwork(bs1, n1) + + let store2 + + async.waterfall([ + (cb) => repo.create('world', cb), + (repo, cb) => { + store2 = repo.datastore + bs2 = new Bitswap(other, libp2pMock, store2) + utils.applyNetwork(bs2, n2) + bs1._onPeerConnected(other) + bs2._onPeerConnected(me) + bs1.getBlock(block.key, cb) + + setTimeout(() => { + bs2.hasBlock(block) + }, 1000) + }, + (res, cb) => { + expect(res).to.be.eql(res) + cb() + } + ], done) + }) }) - it('block is added locally afterwards', (done) => { - const me = PeerId.create({bits: 64}) - const block = new Block('world') - const bs = new Bitswap(me, libp2pMock, store) - const net = utils.mockNetwork() - bs.network = net - bs.wm.network = net - bs.engine.network = net - bs.start() - - bs.getBlock(block.key, (err, res) => { - if (err) throw err - expect(res).to.be.eql(block) - done() + describe('stat', () => { + it('has initial stats', () => { + const me = PeerId.create({bits: 64}) + const bs = new Bitswap(me, libp2pMock, {}) + + const stats = bs.stat() + expect(stats).to.have.property('wantlist') + expect(stats).to.have.property('blocksReceived', 0) + expect(stats).to.have.property('dupBlksReceived', 0) + expect(stats).to.have.property('dupDataReceived', 0) + expect(stats).to.have.property('peers') }) - setTimeout(() => { - bs.hasBlock(block, () => {}) - }, 200) }) - it('block is sent after local add', (done) => { - const me = PeerId.create({bits: 64}) - const other = PeerId.create({bits: 64}) - const block = new Block('hello world local add') - let bs1 - let bs2 - let n1 - let n2 - - n1 = { - connectTo (id, cb) { - let err - if (id.toHexString() !== other.toHexString()) { - err = new Error('unkown peer') - } - async.setImmediate(() => cb(err)) - }, - sendMessage (id, msg, cb) { - if (id.toHexString() === other.toHexString()) { - bs2._receiveMessage(me, msg, cb) - } else { - async.setImmediate(() => cb(new Error('unkown peer'))) - } - }, - start () { - } - } - n2 = { - connectTo (id, cb) { - let err - if (id.toHexString() !== me.toHexString()) { - err = new Error('unkown peer') - } - async.setImmediate(() => cb(err)) - }, - sendMessage (id, msg, cb) { - if (id.toHexString() === me.toHexString()) { - bs1._receiveMessage(other, msg, cb) - } else { - async.setImmediate(() => cb(new Error('unkown peer'))) + describe('unwantBlocks', () => { + let store + beforeEach((done) => { + repo.create('hello', (err, r) => { + if (err) return done(err) + store = r.datastore + done() + }) + }) + + afterEach((done) => { + repo.remove(done) + }) + + it('removes blocks that are wanted multiple times', (done) => { + const me = PeerId.create({bits: 64}) + const bs = new Bitswap(me, libp2pMock, store) + bs.start() + const b = new Block('hello') + + let i = 0 + const finish = () => { + i++ + if (i === 2) { + done() } - }, - start () { } - } - bs1 = new Bitswap(me, libp2pMock, store) - utils.applyNetwork(bs1, n1) - bs1.start() - - let store2 - - async.waterfall([ - (cb) => repo.create('world', cb), - (repo, cb) => { - store2 = repo.datastore - bs2 = new Bitswap(other, libp2pMock, store2) - utils.applyNetwork(bs2, n2) - bs2.start() - bs1._onPeerConnected(other) - bs2._onPeerConnected(me) - bs1.getBlock(block.key, cb) - - setTimeout(() => { - bs2.hasBlock(block) - }, 1000) - }, - (res, cb) => { - expect(res).to.be.eql(res) - cb() - } - ], done) - }) - }) - describe('stat', () => { - it('has initial stats', () => { - const me = PeerId.create({bits: 64}) - const bs = new Bitswap(me, libp2pMock, {}) - - const stats = bs.stat() - expect(stats).to.have.property('wantlist') - expect(stats).to.have.property('blocksReceived', 0) - expect(stats).to.have.property('dupBlksReceived', 0) - expect(stats).to.have.property('dupDataReceived', 0) - expect(stats).to.have.property('peers') + bs.getBlock(b.key, (err, res) => { + expect(err.message).to.be.eql(`manual unwant: ${b.key.toString('hex')}`) + finish() + }) + bs.getBlock(b.key, (err, res) => { + expect(err.message).to.be.eql(`manual unwant: ${b.key.toString('hex')}`) + finish() + }) + + bs.unwantBlocks([b.key]) + }) }) }) } From 6506f5503ec97274b51ca58adeb01be776ceab76 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 8 May 2016 10:11:02 +0200 Subject: [PATCH 2/5] refactor: getBlocks returns a result object --- API.md | 9 ++++++++- src/index.js | 35 ++++++++++++++--------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/API.md b/API.md index 9768a025..cbb5baa0 100644 --- a/API.md +++ b/API.md @@ -25,7 +25,14 @@ Fetch a single block. - `keys: []Multihash` - `cb: Function` -Fetch multiple blocks. +Fetch multiple blocks. The `cb` is called with a result object of the form +```js +{ + [key1]: {error: errorOrUndefined, block: blockOrUndefined}, + [key2]: {error: errorOrUndefined, block: blockOrUndefined}, + ... +} +``` ### `unwantBlocks(keys)` diff --git a/src/index.js b/src/index.js index e213113a..e064494f 100644 --- a/src/index.js +++ b/src/index.js @@ -138,12 +138,11 @@ module.exports = class Bitwap { cb(err, block) } - this.getBlocks([key], (errs, res) => { - if (errs) { - return done(errs[0]) - } + this.getBlocks([key], (results) => { + const err = results[key].error + const block = results[key].block - done(null, res[0]) + done(err, block) }) } @@ -153,8 +152,7 @@ module.exports = class Bitwap { } getBlocks (keys, cb) { - const blocks = [] - const errs = [] + const results = {} const unwantListeners = {} const blockListeners = {} const unwantEvent = (key) => `unwant:${key.toString('hex')}` @@ -170,11 +168,11 @@ module.exports = class Bitwap { const addListeners = () => { keys.forEach((key) => { unwantListeners[key] = () => { - finish(new Error(`manual unwant: ${key.toString('hex')}`)) + finish(key, new Error(`manual unwant: ${key.toString('hex')}`)) } blockListeners[key] = (block) => { - finish(null, block) + finish(key, null, block) } this.notifications.once(unwantEvent(key), unwantListeners[key]) @@ -182,20 +180,15 @@ module.exports = class Bitwap { }) } - const finish = (err, block) => { - if (err) { - errs.push(err) - } - if (block) { - blocks.push(block) + const finish = (key, err, block) => { + results[key] = { + error: err, + block: block } - if (blocks.length + errs.length === keys.length) { + if (Object.keys(results).length === keys.length) { cleanupListeners() - if (errs.length > 0) { - return cb(errs, blocks) - } - cb(null, blocks) + cb(results) } } @@ -214,7 +207,7 @@ module.exports = class Bitwap { this.datastore.get(key, (err, res) => { if (!err && res) { this.wm.cancelWants([key]) - finish(null, res) + finish(key, null, res) return } From d21de6a1b7d1eb2c62c551df775a58c7e4cca919 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 8 May 2016 10:41:05 +0200 Subject: [PATCH 3/5] fix unwant testing --- src/index.js | 2 +- test/index-test.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index e064494f..54741b33 100644 --- a/src/index.js +++ b/src/index.js @@ -206,8 +206,8 @@ module.exports = class Bitwap { if (exists) { this.datastore.get(key, (err, res) => { if (!err && res) { - this.wm.cancelWants([key]) finish(key, null, res) + this.wm.cancelWants([key]) return } diff --git a/test/index-test.js b/test/index-test.js index 139de17e..465b4969 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -296,7 +296,7 @@ module.exports = (repo) => { const me = PeerId.create({bits: 64}) const bs = new Bitswap(me, libp2pMock, store) bs.start() - const b = new Block('hello') + const b = new Block(`hello ${Math.random()}`) let i = 0 const finish = () => { From dd3efc679e671f350f3b29631ded4b0a93986b88 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 8 May 2016 16:41:05 +0200 Subject: [PATCH 4/5] fix: remove broken log line --- package.json | 2 +- src/decision/engine.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/package.json b/package.json index 4961b646..3eeb7b6e 100644 --- a/package.json +++ b/package.json @@ -71,4 +71,4 @@ "David Dias ", "Friedel Ziegelmayer " ] -} \ No newline at end of file +} diff --git a/src/decision/engine.js b/src/decision/engine.js index 66d324fe..f46bf4f2 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -55,7 +55,6 @@ module.exports = class Engine { if (!nextTask) return push(null, _.nil) this.datastore.get(nextTask.entry.key, (err, block) => { - log('fetched: %s', block.key.toString('hex'), block.data.toString()) if (err || !block) { nextTask.done() } else { From 9e7793bea7ee4c1476d6b868d1906e6b4ab0553a Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 9 May 2016 11:53:47 +0200 Subject: [PATCH 5/5] fix for codereview --- API.md | 2 ++ src/index.js | 4 ++-- test/index-test.js | 6 ++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/API.md b/API.md index cbb5baa0..d02bcf94 100644 --- a/API.md +++ b/API.md @@ -34,6 +34,8 @@ Fetch multiple blocks. The `cb` is called with a result object of the form } ``` +Where `key` is the multihash of the block. + ### `unwantBlocks(keys)` - `keys: []Multihash` diff --git a/src/index.js b/src/index.js index 54741b33..45037a1a 100644 --- a/src/index.js +++ b/src/index.js @@ -195,8 +195,8 @@ module.exports = class Bitwap { addListeners() keys.forEach((key) => { - // Sanity check, we don't want to announce looking for blocks - // when we might have them ourselves + // We don't want to announce looking for blocks + // when we might have them ourselves. this.datastore.has(key, (err, exists) => { if (err) { log('error in datastore.has: ', err.message) diff --git a/test/index-test.js b/test/index-test.js index 465b4969..63cae277 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -202,10 +202,8 @@ module.exports = (repo) => { const block = new Block('hello world local add') let bs1 let bs2 - let n1 - let n2 - n1 = { + const n1 = { connectTo (id, cb) { let err if (id.toHexString() !== other.toHexString()) { @@ -221,7 +219,7 @@ module.exports = (repo) => { } } } - n2 = { + const n2 = { connectTo (id, cb) { let err if (id.toHexString() !== me.toHexString()) {