diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js new file mode 100644 index 000000000..953e9fc78 --- /dev/null +++ b/src/dht/find-peer.js @@ -0,0 +1,37 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') + +module.exports = configure(({ ky }) => { + return (peerId, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${peerId}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/findpeer', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) + + for await (const message of ndjson(toIterable(res.body))) { + // 2 = FinalPeer + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 + if (message.Type === 2 && message.Responses) { + for (const { ID, Addrs } of message.Responses) { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + yield peerInfo + } + } + } + })() +}) diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js new file mode 100644 index 000000000..b04d001d8 --- /dev/null +++ b/src/dht/find-provs.js @@ -0,0 +1,38 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') + +module.exports = configure(({ ky }) => { + return (cid, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${cid}`) + if (options.numProviders) searchParams.set('num-providers', options.numProviders) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/findprovs', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) + + for await (const message of ndjson(toIterable(res.body))) { + // 4 = Provider + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 + if (message.Type === 4 && message.Responses) { + for (const { ID, Addrs } of message.Responses) { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + yield peerInfo + } + } + } + })() +}) diff --git a/src/dht/findpeer.js b/src/dht/findpeer.js deleted file mode 100644 index 8bcc80fb5..000000000 --- a/src/dht/findpeer.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - -const multiaddr = require('multiaddr') -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const errcode = require('err-code') - -module.exports = (send) => { - return promisify((peerId, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - // Inconsistent return values in the browser - if (Array.isArray(res)) { - res = res.find(r => r.Type === 2) - } - - // Type 2 keys - // 2 = FinalPeer - // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 - if (!res || res.Type !== 2) { - const errMsg = 'key was not found (type 4)' - return callback(errcode(new Error(errMsg), 'ERR_KEY_TYPE_4_NOT_FOUND')) - } - - const responseReceived = res.Responses[0] - const peerInfo = new PeerInfo(PeerId.createFromB58String(responseReceived.ID)) - - responseReceived.Addrs.forEach((addr) => { - const ma = multiaddr(addr) - - peerInfo.multiaddrs.add(ma) - }) - - callback(null, peerInfo) - } - - send({ - path: 'dht/findpeer', - args: peerId.toString(), - qs: opts - }, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} diff --git a/src/dht/findprovs.js b/src/dht/findprovs.js deleted file mode 100644 index 695ef04df..000000000 --- a/src/dht/findprovs.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - -const multiaddr = require('multiaddr') -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') - -module.exports = (send) => { - return promisify((cid, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - // Inconsistent return values in the browser vs node - if (!Array.isArray(res)) { - res = [res] - } - - const responses = [] - res.forEach(result => { - // 4 = Provider - if (result.Type !== 4) return - result.Responses.forEach(response => { - const peerInfo = new PeerInfo(PeerId.createFromB58String(response.ID)) - - if (response.Addrs) { - response.Addrs.forEach((addr) => { - const ma = multiaddr(addr) - peerInfo.multiaddrs.add(ma) - }) - } - - responses.push(peerInfo) - }) - }) - - callback(null, responses) - } - - send({ - path: 'dht/findprovs', - args: cid.toString(), - qs: opts - }, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} diff --git a/src/dht/get.js b/src/dht/get.js index ab64fa892..8a7b8c6a3 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -1,48 +1,30 @@ 'use strict' -const promisify = require('promisify-es6') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') -module.exports = (send) => { - return promisify((key, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (key, options) => (async function * () { + options = options || {} - function handleResult (done, err, res) { - if (err) { - return done(err) - } - if (!res) { - return done(new Error('empty response')) - } - if (res.length === 0) { - return done(new Error('no value returned for key')) - } + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${key}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) - // Inconsistent return values in the browser vs node - if (Array.isArray(res)) { - res = res[0] - } + const res = await ky.get('dht/get', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) - if (res.Type === 5) { - done(null, res.Extra) - } else { - done(new Error('key was not found (type 6)')) + for await (const message of ndjson(toIterable(res.body))) { + // 5 = Value + // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21 + if (message.Type === 5) { + yield message.Extra } } - - send({ - path: 'dht/get', - args: key, - qs: opts - }, handleResult.bind(null, callback)) - }) -} + })() +}) diff --git a/src/dht/index.js b/src/dht/index.js index b4ab8c640..5478876ff 100644 --- a/src/dht/index.js +++ b/src/dht/index.js @@ -1,17 +1,30 @@ 'use strict' -const moduleConfig = require('../utils/module-config') +const callbackify = require('callbackify') +const errCode = require('err-code') +const { collectify } = require('../lib/converters') -module.exports = (arg) => { - const send = moduleConfig(arg) +module.exports = config => { + const get = require('./get')(config) + const findPeer = require('./find-peer')(config) return { - get: require('./get')(send), - put: require('./put')(send), - findProvs: require('./findprovs')(send), - findPeer: require('./findpeer')(send), - provide: require('./provide')(send), + get: callbackify.variadic(async (key, options) => { + for await (const value of get(key, options)) { + return value + } + throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND') + }), + put: callbackify.variadic(collectify(require('./put')(config))), + findProvs: callbackify.variadic(collectify(require('./find-provs')(config))), + findPeer: callbackify.variadic(async (peerId, options) => { + for await (const peerInfo of findPeer(peerId, options)) { + return peerInfo + } + throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND') + }), + provide: callbackify.variadic(collectify(require('./provide')(config))), // find closest peerId to given peerId - query: require('./query')(send) + query: callbackify.variadic(collectify(require('./query')(config))) } } diff --git a/src/dht/provide.js b/src/dht/provide.js index 08bcad6d7..1f05710ec 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -1,37 +1,40 @@ 'use strict' -const promisify = require('promisify-es6') -const CID = require('cids') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') -module.exports = (send) => { - return promisify((cids, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (cids, options) => (async function * () { + cids = Array.isArray(cids) ? cids : [cids] + options = options || {} - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + cids.forEach(cid => searchParams.append('arg', `${cid}`)) + if (options.recursive != null) searchParams.set('recursive', options.recursive) + if (options.verbose != null) searchParams.set('verbose', options.verbose) - if (!Array.isArray(cids)) { - cids = [cids] - } + const res = await ky.get('dht/provide', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) - // Validate CID(s) and serialize - try { - cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc')) - } catch (err) { - return callback(err) + for await (let message of ndjson(toIterable(res.body))) { + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + return peerInfo + }) + } + yield message } - - send({ - path: 'dht/provide', - args: cids, - qs: opts - }, callback) - }) -} + })() +}) diff --git a/src/dht/put.js b/src/dht/put.js index c0937d158..50137e528 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -1,25 +1,41 @@ 'use strict' -const promisify = require('promisify-es6') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') +const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') +const toCamel = require('../lib/object-to-camel') -module.exports = (send) => { - return promisify((key, value, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return (key, value, options) => (async function * () { + options = options || {} - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } + const searchParams = new URLSearchParams(options.searchParams) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + key = Buffer.isBuffer(key) ? encodeBufferURIComponent(key) : encodeURIComponent(key) + value = Buffer.isBuffer(value) ? encodeBufferURIComponent(value) : encodeURIComponent(value) - send({ - path: 'dht/put', - args: [key, value], - qs: opts - }, callback) - }) -} + const url = `dht/put?arg=${key}&arg=${value}&${searchParams}` + const res = await ky.get(url, { + timeout: options.timeout, + signal: options.signal, + headers: options.headers + }) + + for await (let message of ndjson(toIterable(res.body))) { + message = toCamel(message) + if (message.responses) { + message.responses = message.responses.map(({ ID, Addrs }) => { + const peerInfo = new PeerInfo(PeerId.createFromB58String(ID)) + if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a))) + return peerInfo + }) + } + yield message + } + })() +}) diff --git a/src/dht/query.js b/src/dht/query.js index 2dbc62a47..8ebca71f7 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -1,41 +1,28 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') - const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const ndjson = require('iterable-ndjson') +const configure = require('../lib/configure') +const toIterable = require('../lib/stream-to-iterable') + +module.exports = configure(({ ky }) => { + return (peerId, options) => (async function * () { + options = options || {} + + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', `${peerId}`) + if (options.verbose != null) searchParams.set('verbose', options.verbose) + + const res = await ky.get('dht/query', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) -module.exports = (send) => { - return promisify((peerId, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } - - const handleResult = (res, callback) => { - const peerIds = res.map((r) => (new PeerInfo(PeerId.createFromB58String(r.ID)))) - - callback(null, peerIds) + for await (const message of ndjson(toIterable(res.body))) { + yield new PeerInfo(PeerId.createFromB58String(message.ID)) } - - send({ - path: 'dht/query', - args: peerId, - qs: opts - }, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, handleResult, callback) - }) - }) -} + })() +}) diff --git a/src/lib/encode-buffer-uri-component.js b/src/lib/encode-buffer-uri-component.js new file mode 100644 index 000000000..c231970cb --- /dev/null +++ b/src/lib/encode-buffer-uri-component.js @@ -0,0 +1,25 @@ +'use strict' + +// https://github.com/ipfs/js-ipfs-http-client/issues/569 +module.exports = function encodeBuffer (buf) { + let uriEncoded = '' + for (const byte of buf) { + // https://tools.ietf.org/html/rfc3986#page-14 + // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), + // underscore (%5F), or tilde (%7E) + if ( + (byte >= 0x41 && byte <= 0x5A) || + (byte >= 0x61 && byte <= 0x7A) || + (byte >= 0x30 && byte <= 0x39) || + (byte === 0x2D) || + (byte === 0x2E) || + (byte === 0x5F) || + (byte === 0x7E) + ) { + uriEncoded += String.fromCharCode(byte) + } else { + uriEncoded += `%${byte.toString(16).padStart(2, '0')}` + } + } + return uriEncoded +} diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index a41c8fba0..10a5017cc 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -2,6 +2,7 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') +const encodeBuffer = require('../lib/encode-buffer-uri-component') module.exports = configure(({ ky }) => { return async (topic, data, options) => { @@ -20,26 +21,3 @@ module.exports = configure(({ ky }) => { return res } }) - -function encodeBuffer (buf) { - let uriEncoded = '' - for (const byte of buf) { - // https://tools.ietf.org/html/rfc3986#page-14 - // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), - // underscore (%5F), or tilde (%7E) - if ( - (byte >= 0x41 && byte <= 0x5A) || - (byte >= 0x61 && byte <= 0x7A) || - (byte >= 0x30 && byte <= 0x39) || - (byte === 0x2D) || - (byte === 0x2E) || - (byte === 0x5F) || - (byte === 0x7E) - ) { - uriEncoded += String.fromCharCode(byte) - } else { - uriEncoded += `%${byte.toString(16).padStart(2, '0')}` - } - } - return uriEncoded -} diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 34a7591e5..fae44d26c 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -93,7 +93,8 @@ function requireCommands (send, config) { block: require('../block')(config), bootstrap: require('../bootstrap')(config), config: require('../config')(config), - dag: require('../dag')(config) + dag: require('../dag')(config), + dht: require('../dht')(config) } Object.assign(cmds.refs, { @@ -112,7 +113,6 @@ function requireCommands (send, config) { pin: require('../pin'), // Network - dht: require('../dht'), name: require('../name'), ping: require('../ping'), pingReadableStream: require('../ping-readable-stream'),