From 20ea0932339d9fa102b591033107a0a9a4f38659 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 13 Nov 2019 22:05:39 +0000 Subject: [PATCH 1/9] refactor: convert dht.findPeer to async/await --- src/dht/find-peer.js | 36 ++++++++++++++ src/dht/{findprovs.js => find-provs.js} | 0 src/dht/findpeer.js | 63 ------------------------- src/dht/index.js | 7 +-- 4 files changed, 40 insertions(+), 66 deletions(-) create mode 100644 src/dht/find-peer.js rename src/dht/{findprovs.js => find-provs.js} (100%) delete mode 100644 src/dht/findpeer.js diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js new file mode 100644 index 000000000..2e1c835e1 --- /dev/null +++ b/src/dht/find-peer.js @@ -0,0 +1,36 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const errCode = require('err-code') +const configure = require('../lib/configure') + +module.exports = configure(({ ky }) => { + return async (peerId, options) => { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${peerId}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/findpeer', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }).json() + + // 2 = FinalPeer + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 + if (res.Type !== 2) { + throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') + } + + const { ID, Addrs } = res.Responses[0] + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + + return peerInfo + } +}) diff --git a/src/dht/findprovs.js b/src/dht/find-provs.js similarity index 100% rename from src/dht/findprovs.js rename to src/dht/find-provs.js diff --git a/src/dht/findpeer.js b/src/dht/findpeer.js deleted file mode 100644 index 8bcc80fb5..000000000 --- a/src/dht/findpeer.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - -const multiaddr = require('multiaddr') -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const errcode = require('err-code') - -module.exports = (send) => { - return promisify((peerId, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - // Inconsistent return values in the browser - if (Array.isArray(res)) { - res = res.find(r => r.Type === 2) - } - - // Type 2 keys - // 2 = FinalPeer - // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 - if (!res || res.Type !== 2) { - const errMsg = 'key was not found (type 4)' - return callback(errcode(new Error(errMsg), 'ERR_KEY_TYPE_4_NOT_FOUND')) - } - - const responseReceived = res.Responses[0] - const peerInfo = new PeerInfo(PeerId.createFromB58String(responseReceived.ID)) - - responseReceived.Addrs.forEach((addr) => { - const ma = multiaddr(addr) - - peerInfo.multiaddrs.add(ma) - }) - - callback(null, peerInfo) - } - - send({ - path: 'dht/findpeer', - args: peerId.toString(), - qs: opts - }, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} diff --git a/src/dht/index.js b/src/dht/index.js index b4ab8c640..ab3a2b888 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -1,15 +1,16 @@ 'use strict' +const callbackify = require('callbackify') const moduleConfig = require('../utils/module-config') -module.exports = (arg) => { +module.exports = (arg, config) => { const send = moduleConfig(arg) return { get: require('./get')(send), put: require('./put')(send), - findProvs: require('./findprovs')(send), - findPeer: require('./findpeer')(send), + findProvs: require('./find-provs')(send), + findPeer: callbackify.variadic(require('./find-peer')(config)), provide: require('./provide')(send), // find closest peerId to given peerId query: require('./query')(send) From 4c4d11c57c57c3c8020f3b79812d5351e5c651f3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 13 Nov 2019 22:45:41 +0000 Subject: [PATCH 2/9] refactor: convert dht.findProvs to async/await --- src/dht/find-peer.js | 30 ++++++++------- src/dht/find-provs.js | 88 ++++++++++++++++--------------------------- src/dht/index.js | 26 ++++++++++++- 3 files changed, 72 insertions(+), 72 deletions(-) diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index 2e1c835e1..09263deab 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -3,11 +3,13 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') -const errCode = require('err-code') +const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { - return async (peerId, options) => { + return (peerId, options) => (async function * () { options = options || {} const searchParams = new URLSearchParams(options.searchParams) @@ -19,18 +21,18 @@ module.exports = configure(({ ky }) => { signal: options.signal, headers: options.headers, searchParams - }).json() + }) - // 2 = FinalPeer - // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 - if (res.Type !== 2) { - throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') + for await (let message of ndjson(toIterable(res.body))) { + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + return peerInfo + }) + } + yield message } - - const { ID, Addrs } = res.Responses[0] - const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) - Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) - - return peerInfo - } + })() }) diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 695ef04df..63d4cad95 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -1,63 +1,39 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - -const multiaddr = require('multiaddr') const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') + +module.exports = configure(({ ky }) => { + return (cid, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${cid}`) + if (options.numProviders) searchParams.set('num-providers', options.numProviders) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/findprovs', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) -module.exports = (send) => { - return promisify((cid, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - // Inconsistent return values in the browser vs node - if (!Array.isArray(res)) { - res = [res] - } - - const responses = [] - res.forEach(result => { - // 4 = Provider - if (result.Type !== 4) return - result.Responses.forEach(response => { - const peerInfo = new PeerInfo(PeerId.createFromB58String(response.ID)) - - if (response.Addrs) { - response.Addrs.forEach((addr) => { - const ma = multiaddr(addr) - peerInfo.multiaddrs.add(ma) - }) - } - - responses.push(peerInfo) + for await (let message of ndjson(toIterable(res.body))) { + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + return peerInfo }) - }) - - callback(null, responses) - } - - send({ - path: 'dht/findprovs', - args: cid.toString(), - qs: opts - }, (err, result) => { - if (err) { - return callback(err) } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} + yield message + } + })() +}) diff --git a/src/dht/index.js b/src/dht/index.js index ab3a2b888..f894120be 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -1,16 +1,38 @@ 'use strict' const callbackify = require('callbackify') +const errCode = require('err-code') const moduleConfig = require('../utils/module-config') module.exports = (arg, config) => { const send = moduleConfig(arg) + const findProvs = require('./find-provs')(config) + const findPeer = require('./find-peer')(config) return { get: require('./get')(send), put: require('./put')(send), - findProvs: require('./find-provs')(send), - findPeer: callbackify.variadic(require('./find-peer')(config)), + findProvs: callbackify.variadic(async (cid, options) => { + const providers = [] + for await (const message of findProvs(cid, options)) { + // 4 = Provider + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 + if (message.type === 4) { + providers.push(...message.responses) + } + } + return providers + }), + findPeer: callbackify.variadic(async (peerId, options) => { + for await (const message of findPeer(peerId, options)) { + // 2 = FinalPeer + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 + if (message.type === 2) { + return message.responses[0] + } + } + throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') + }), provide: require('./provide')(send), // find closest peerId to given peerId query: require('./query')(send) From 7a020c5f29567fd3d64ced724876bfa18a1d2e04 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 10:03:55 +0000 Subject: [PATCH 3/9] fix: the Addrs prop may be null --- src/dht/find-peer.js | 2 +- src/dht/find-provs.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index 09263deab..e1902475a 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -28,7 +28,7 @@ module.exports = configure(({ ky }) => { if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) - Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) return peerInfo }) } diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 63d4cad95..71601fb4a 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -29,7 +29,7 @@ module.exports = configure(({ ky }) => { if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) - Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) return peerInfo }) } From c11fd4ca9b1a7e05ea6a8e2162baae12a503bfd2 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 12:09:19 +0000 Subject: [PATCH 4/9] feat: convert dht.get to async/await --- src/dht/find-peer.js | 17 ++++++------ src/dht/find-provs.js | 17 ++++++------ src/dht/get.js | 64 ++++++++++++++++--------------------------- src/dht/index.js | 27 +++++++----------- 4 files changed, 52 insertions(+), 73 deletions(-) diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index e1902475a..e24029909 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -4,9 +4,9 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:find-peer') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') -const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { return (peerId, options) => (async function * () { @@ -23,16 +23,17 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res.body))) { - message = toCamel(message) - if (message.responses) { - message.responses = message.responses.map(({ ID, Addrs }) => { + for await (const message of ndjson(toIterable(res.body))) { + log(message) + // 2 = FinalPeer + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 + if (message.Type === 2 && message.Responses) { + for (const { ID, Addrs } of message.Responses) { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) - return peerInfo - }) + yield peerInfo + } } - yield message } })() }) diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 71601fb4a..299091de0 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -4,9 +4,9 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:find-provs') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') -const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { return (cid, options) => (async function * () { @@ -24,16 +24,17 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res.body))) { - message = toCamel(message) - if (message.responses) { - message.responses = message.responses.map(({ ID, Addrs }) => { + for await (const message of ndjson(toIterable(res.body))) { + log(message) + // 4 = Provider + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 + if (message.Type === 4 && message.Responses) { + for (const { ID, Addrs } of message.Responses) { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) - return peerInfo - }) + yield peerInfo + } } - yield message } })() }) diff --git a/src/dht/get.js b/src/dht/get.js index ab64fa892..4c09f0cdd 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -1,48 +1,32 @@ 'use strict' -const promisify = require('promisify-es6') +const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:get') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') -module.exports = (send) => { - return promisify((key, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (key, options) => (async function * () { + options = options || {} - function handleResult (done, err, res) { - if (err) { - return done(err) - } - if (!res) { - return done(new Error('empty response')) - } - if (res.length === 0) { - return done(new Error('no value returned for key')) - } + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${key}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) - // Inconsistent return values in the browser vs node - if (Array.isArray(res)) { - res = res[0] - } + const res = await ky.get('dht/get', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) - if (res.Type === 5) { - done(null, res.Extra) - } else { - done(new Error('key was not found (type 6)')) + for await (const message of ndjson(toIterable(res.body))) { + log(message) + // 5 = Value + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21 + if (message.Type === 5) { + yield message.Extra } } - - send({ - path: 'dht/get', - args: key, - qs: opts - }, handleResult.bind(null, callback)) - }) -} + })() +}) diff --git a/src/dht/index.js b/src/dht/index.js index f894120be..8af07193d 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -2,34 +2,27 @@ const callbackify = require('callbackify') const errCode = require('err-code') +const { collectify } = require('../lib/converters') const moduleConfig = require('../utils/module-config') module.exports = (arg, config) => { const send = moduleConfig(arg) + const get = require('./get')(config) const findProvs = require('./find-provs')(config) const findPeer = require('./find-peer')(config) return { - get: require('./get')(send), - put: require('./put')(send), - findProvs: callbackify.variadic(async (cid, options) => { - const providers = [] - for await (const message of findProvs(cid, options)) { - // 4 = Provider - // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 - if (message.type === 4) { - providers.push(...message.responses) - } + get: callbackify.variadic(async (key, options) => { + for await (const value of get(key, options)) { + return value } - return providers + throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND') }), + put: require('./put')(send), + findProvs: callbackify.variadic(collectify(findProvs)), findPeer: callbackify.variadic(async (peerId, options) => { - for await (const message of findPeer(peerId, options)) { - // 2 = FinalPeer - // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 - if (message.type === 2) { - return message.responses[0] - } + for await (const peerInfo of findPeer(peerId, options)) { + return peerInfo } throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') }), From 5cf569e2a7efd42ec1ff9aa8403cab8f9c003543 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 13:46:52 +0000 Subject: [PATCH 5/9] refactor: convert dht.provide to async/await --- src/dht/find-peer.js | 2 +- src/dht/find-provs.js | 2 +- src/dht/index.js | 5 ++-- src/dht/provide.js | 65 +++++++++++++++++++++++-------------------- 4 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index e24029909..ab5b6a23c 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -30,7 +30,7 @@ module.exports = configure(({ ky }) => { if (message.Type === 2 && message.Responses) { for (const { ID, Addrs } of message.Responses) { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) - if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) yield peerInfo } } diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 299091de0..cc8449063 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -31,7 +31,7 @@ module.exports = configure(({ ky }) => { if (message.Type === 4 && message.Responses) { for (const { ID, Addrs } of message.Responses) { const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) - if (Addrs) Addrs.forEach(addr => peerInfo.multiaddrs.add(multiaddr(addr))) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) yield peerInfo } } diff --git a/src/dht/index.js b/src/dht/index.js index 8af07193d..5158d7964 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -8,7 +8,6 @@ const moduleConfig = require('../utils/module-config') module.exports = (arg, config) => { const send = moduleConfig(arg) const get = require('./get')(config) - const findProvs = require('./find-provs')(config) const findPeer = require('./find-peer')(config) return { @@ -19,14 +18,14 @@ module.exports = (arg, config) => { throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND') }), put: require('./put')(send), - findProvs: callbackify.variadic(collectify(findProvs)), + findProvs: callbackify.variadic(collectify(require('./find-provs')(config))), findPeer: callbackify.variadic(async (peerId, options) => { for await (const peerInfo of findPeer(peerId, options)) { return peerInfo } throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') }), - provide: require('./provide')(send), + provide: callbackify.variadic(collectify(require('./provide')(config))), // find closest peerId to given peerId query: require('./query')(send) } diff --git a/src/dht/provide.js b/src/dht/provide.js index 08bcad6d7..1b18c6f73 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -1,37 +1,42 @@ 'use strict' -const promisify = require('promisify-es6') -const CID = require('cids') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:provide') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') -module.exports = (send) => { - return promisify((cids, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (cids, options) => (async function * () { + cids = Array.isArray(cids) ? cids : [cids] + options = options || {} - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + cids.forEach(cid => searchParams.append('arg', `${cid}`)) + if (options.recursive != null) searchParams.set('recursive', options.recursive) + if (options.verbose != null) searchParams.set('verbose', options.verbose) - if (!Array.isArray(cids)) { - cids = [cids] - } + const res = await ky.get('dht/provide', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) - // Validate CID(s) and serialize - try { - cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc')) - } catch (err) { - return callback(err) + for await (let message of ndjson(toIterable(res.body))) { + log(message) + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + return peerInfo + }) + } + yield message } - - send({ - path: 'dht/provide', - args: cids, - qs: opts - }, callback) - }) -} + })() +}) From 45fc00fee8884a5da4c4499fef01c0371ad4aeed Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 14:43:27 +0000 Subject: [PATCH 6/9] refactor: convert dht.put to async/await --- src/dht/index.js | 2 +- src/dht/put.js | 58 +++++++++++++++++--------- src/lib/encode-buffer-uri-component.js | 23 ++++++++++ src/pubsub/publish.js | 24 +---------- 4 files changed, 63 insertions(+), 44 deletions(-) create mode 100644 src/lib/encode-buffer-uri-component.js diff --git a/src/dht/index.js b/src/dht/index.js index 5158d7964..50b4920f4 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -17,7 +17,7 @@ module.exports = (arg, config) => { } throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND') }), - put: require('./put')(send), + put: callbackify.variadic(collectify(require('./put')(config))), findProvs: callbackify.variadic(collectify(require('./find-provs')(config))), findPeer: callbackify.variadic(async (peerId, options) => { for await (const peerInfo of findPeer(peerId, options)) { diff --git a/src/dht/put.js b/src/dht/put.js index c0937d158..f2a271e8f 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -1,25 +1,43 @@ 'use strict' -const promisify = require('promisify-es6') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:put') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') +const toCamel = require('../lib/object-to-camel') -module.exports = (send) => { - return promisify((key, value, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (key, value, options) => (async function * () { + options = options || {} - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + key = Buffer.isBuffer(key) ? encodeBufferURIComponent(key) : encodeURIComponent(key) + value = Buffer.isBuffer(value) ? encodeBufferURIComponent(value) : encodeURIComponent(value) - send({ - path: 'dht/put', - args: [key, value], - qs: opts - }, callback) - }) -} + const url = `dht/put?arg=${key}&arg=${value}&${searchParams}` + const res = await ky.get(url, { + timeout: options.timeout, + signal: options.signal, + headers: options.headers + }) + + for await (let message of ndjson(toIterable(res.body))) { + log(message) + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + return peerInfo + }) + } + yield message + } + })() +}) diff --git a/src/lib/encode-buffer-uri-component.js b/src/lib/encode-buffer-uri-component.js new file mode 100644 index 000000000..228858f57 --- /dev/null +++ b/src/lib/encode-buffer-uri-component.js @@ -0,0 +1,23 @@ +// https://github.com/ipfs/js-ipfs-http-client/issues/569 +module.exports = function encodeBuffer (buf) { + let uriEncoded = '' + for (const byte of buf) { + // https://tools.ietf.org/html/rfc3986#page-14 + // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), + // underscore (%5F), or tilde (%7E) + if ( + (byte >= 0x41 && byte <= 0x5A) || + (byte >= 0x61 && byte <= 0x7A) || + (byte >= 0x30 && byte <= 0x39) || + (byte === 0x2D) || + (byte === 0x2E) || + (byte === 0x5F) || + (byte === 0x7E) + ) { + uriEncoded += String.fromCharCode(byte) + } else { + uriEncoded += `%${byte.toString(16).padStart(2, '0')}` + } + } + return uriEncoded +} diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index a41c8fba0..10a5017cc 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -2,6 +2,7 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') +const encodeBuffer = require('../lib/encode-buffer-uri-component') module.exports = configure(({ ky }) => { return async (topic, data, options) => { @@ -20,26 +21,3 @@ module.exports = configure(({ ky }) => { return res } }) - -function encodeBuffer (buf) { - let uriEncoded = '' - for (const byte of buf) { - // https://tools.ietf.org/html/rfc3986#page-14 - // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), - // underscore (%5F), or tilde (%7E) - if ( - (byte >= 0x41 && byte <= 0x5A) || - (byte >= 0x61 && byte <= 0x7A) || - (byte >= 0x30 && byte <= 0x39) || - (byte === 0x2D) || - (byte === 0x2E) || - (byte === 0x5F) || - (byte === 0x7E) - ) { - uriEncoded += String.fromCharCode(byte) - } else { - uriEncoded += `%${byte.toString(16).padStart(2, '0')}` - } - } - return uriEncoded -} From 08f196b20c8e87d487568e213df20139e988a5c8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 14:58:38 +0000 Subject: [PATCH 7/9] refactor: convert dht.query to async/await --- src/dht/index.js | 6 ++-- src/dht/query.js | 59 ++++++++++++++++---------------------- src/utils/load-commands.js | 4 +-- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/src/dht/index.js b/src/dht/index.js index 50b4920f4..5478876ff 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -3,10 +3,8 @@ const callbackify = require('callbackify') const errCode = require('err-code') const { collectify } = require('../lib/converters') -const moduleConfig = require('../utils/module-config') -module.exports = (arg, config) => { - const send = moduleConfig(arg) +module.exports = config => { const get = require('./get')(config) const findPeer = require('./find-peer')(config) @@ -27,6 +25,6 @@ module.exports = (arg, config) => { }), provide: callbackify.variadic(collectify(require('./provide')(config))), // find closest peerId to given peerId - query: require('./query')(send) + query: callbackify.variadic(collectify(require('./query')(config))) } } diff --git a/src/dht/query.js b/src/dht/query.js index 2dbc62a47..412b86d57 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -1,41 +1,30 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const ndjson = require('iterable-ndjson') +const log = require('debug')('ipfs-http-client:dht:query') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') + +module.exports = configure(({ ky }) => { + return (peerId, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${peerId}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/query', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) -module.exports = (send) => { - return promisify((peerId, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - const peerIds = res.map((r) => (new PeerInfo(PeerId.createFromB58String(r.ID)))) - - callback(null, peerIds) + for await (const message of ndjson(toIterable(res.body))) { + log(message) + yield new PeerInfo(PeerId.createFromB58String(message.ID)) } - - send({ - path: 'dht/query', - args: peerId, - qs: opts - }, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} + })() +}) diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 242b52463..bd67dd7ce 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -91,7 +91,8 @@ function requireCommands (send, config) { getEndpointConfig: require('../get-endpoint-config')(config), bitswap: require('../bitswap')(config), block: require('../block')(config), - dag: require('../dag')(config) + dag: require('../dag')(config), + dht: require('../dht')(config) } Object.assign(cmds.refs, { @@ -111,7 +112,6 @@ function requireCommands (send, config) { // Network bootstrap: require('../bootstrap'), - dht: require('../dht'), name: require('../name'), ping: require('../ping'), pingReadableStream: require('../ping-readable-stream'), From a50bdc9babc1f9c3a22e643cd6264a77e1863fd8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 15:06:55 +0000 Subject: [PATCH 8/9] chore: appease linter --- src/lib/encode-buffer-uri-component.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib/encode-buffer-uri-component.js b/src/lib/encode-buffer-uri-component.js index 228858f57..c231970cb 100644 --- a/src/lib/encode-buffer-uri-component.js +++ b/src/lib/encode-buffer-uri-component.js @@ -1,3 +1,5 @@ +'use strict' + // https://github.com/ipfs/js-ipfs-http-client/issues/569 module.exports = function encodeBuffer (buf) { let uriEncoded = '' From 43f21e27d7e2e7a2ed27a8ece3c0ad4e262a2090 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 14 Nov 2019 17:23:40 +0000 Subject: [PATCH 9/9] refactor: remove logging --- src/dht/find-peer.js | 2 -- src/dht/find-provs.js | 2 -- src/dht/get.js | 2 -- src/dht/provide.js | 2 -- src/dht/put.js | 2 -- src/dht/query.js | 2 -- 6 files changed, 12 deletions(-) diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index ab5b6a23c..953e9fc78 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -4,7 +4,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:find-peer') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') @@ -24,7 +23,6 @@ module.exports = configure(({ ky }) => { }) for await (const message of ndjson(toIterable(res.body))) { - log(message) // 2 = FinalPeer // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 if (message.Type === 2 && message.Responses) { diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index cc8449063..b04d001d8 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -4,7 +4,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:find-provs') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') @@ -25,7 +24,6 @@ module.exports = configure(({ ky }) => { }) for await (const message of ndjson(toIterable(res.body))) { - log(message) // 4 = Provider // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 if (message.Type === 4 && message.Responses) { diff --git a/src/dht/get.js b/src/dht/get.js index 4c09f0cdd..8a7b8c6a3 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -1,7 +1,6 @@ 'use strict' const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:get') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') @@ -21,7 +20,6 @@ module.exports = configure(({ ky }) => { }) for await (const message of ndjson(toIterable(res.body))) { - log(message) // 5 = Value // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21 if (message.Type === 5) { diff --git a/src/dht/provide.js b/src/dht/provide.js index 1b18c6f73..1f05710ec 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -4,7 +4,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:provide') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') const toCamel = require('../lib/object-to-camel') @@ -27,7 +26,6 @@ module.exports = configure(({ ky }) => { }) for await (let message of ndjson(toIterable(res.body))) { - log(message) message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/put.js b/src/dht/put.js index f2a271e8f..50137e528 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -4,7 +4,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:put') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') @@ -28,7 +27,6 @@ module.exports = configure(({ ky }) => { }) for await (let message of ndjson(toIterable(res.body))) { - log(message) message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/query.js b/src/dht/query.js index 412b86d57..8ebca71f7 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -3,7 +3,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const ndjson = require('iterable-ndjson') -const log = require('debug')('ipfs-http-client:dht:query') const configure = require('../lib/configure') const toIterable = require('../lib/stream-to-iterable') @@ -23,7 +22,6 @@ module.exports = configure(({ ky }) => { }) for await (const message of ndjson(toIterable(res.body))) { - log(message) yield new PeerInfo(PeerId.createFromB58String(message.ID)) } })()