From 26d68927681a826c2a45674082c003a75988bb90 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 29 Mar 2019 07:53:08 +0000 Subject: [PATCH 01/18] refactor: wip switch to it-ws and async iterators License: MIT Signed-off-by: Alan Shaw --- package.json | 4 +- src/index.js | 39 +++------------ src/listener.js | 32 +++++------- test/node.js | 129 +++++++++++++++++++----------------------------- 4 files changed, 73 insertions(+), 131 deletions(-) diff --git a/package.json b/package.json index 6a32f17..3869ca8 100644 --- a/package.json +++ b/package.json @@ -43,9 +43,9 @@ "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", + "it-ws": "^2.1.0", "mafmt": "^6.0.7", - "multiaddr-to-uri": "^5.0.0", - "pull-ws": "hugomrdias/pull-ws#fix/bundle-size" + "multiaddr-to-uri": "^5.0.0" }, "devDependencies": { "aegir": "^20.0.0", diff --git a/src/index.js b/src/index.js index b3053b0..e6b2947 100644 --- a/src/index.js +++ b/src/index.js @@ -1,47 +1,24 @@ 'use strict' -const connect = require('pull-ws/client') +const connect = require('it-ws/client') const mafmt = require('mafmt') const withIs = require('class-is') -const Connection = require('interface-connection').Connection - const toUri = require('multiaddr-to-uri') -const debug = require('debug') -const log = debug('libp2p:websockets:dialer') +const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || function () { } - + async dial (ma, options) { + log('dialing %s', ma) const url = toUri(ma) - log('dialing %s', url) - const socket = connect(url, { - binary: true, - onConnect: (err) => { - callback(err) - } - }) - - const conn = new Connection(socket) - conn.getObservedAddrs = (cb) => cb(null, [ma]) - conn.close = (cb) => socket.close(cb) - - return conn + const socket = connect(url, { binary: true }) + socket.getObservedAddrs = () => [ma] + log('connected %s', ma) + return socket } createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = {} - } - return createListener(options, handler) } diff --git a/src/listener.js b/src/listener.js index 7c5538b..64252b1 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,43 +1,35 @@ 'use strict' -const Connection = require('interface-connection').Connection const multiaddr = require('multiaddr') const os = require('os') -function noop () {} - -const createServer = require('pull-ws/server') || noop +const createServer = require('it-ws/server') module.exports = (options, handler) => { - const listener = createServer(options, (socket) => { - socket.getObservedAddrs = (callback) => { - // TODO research if we can reuse the address in anyway - return callback(null, []) - } - - handler(new Connection(socket)) + const server = createServer(options, socket => { + socket.getObservedAddrs = () => [] + handler(socket) }) let listeningMultiaddr - listener._listen = listener.listen - listener.listen = (ma, callback) => { - callback = callback || noop + const listen = server.listen + server.listen = ma => { listeningMultiaddr = ma if (ma.protoNames().includes('ipfs')) { ma = ma.decapsulate('ipfs') } - listener._listen(ma.toOptions(), callback) + return listen(ma.toOptions()) } - listener.getAddrs = (callback) => { + server.getAddrs = async () => { const multiaddrs = [] - const address = listener.address() + const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } const ipfsId = listeningMultiaddr.getPeerId() @@ -65,8 +57,8 @@ module.exports = (options, handler) => { } } - callback(null, multiaddrs) + return multiaddrs } - return listener + return server } diff --git a/test/node.js b/test/node.js index f6ba702..d67852e 100644 --- a/test/node.js +++ b/test/node.js @@ -30,19 +30,18 @@ describe('listen', () => { ws = new WS() }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -59,14 +58,12 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it.skip('close listener with connections, through timeout', (done) => { @@ -82,73 +79,53 @@ describe('listen', () => { // TODO 0.0.0.0 not supported yet }) - it('getAddrs', (done) => { - const listener = ws.createListener((conn) => { - }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + it('getAddrs', async () => { + const listener = ws.createListener((conn) => { }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) - it('getAddrs on port 0 listen', (done) => { + it('getAddrs on port 0 listen', async () => { const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0', (done) => { + it('getAddrs from listening on 0.0.0.0', async () => { const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs preserves IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) }) @@ -160,19 +137,18 @@ describe('listen', () => { ws = new WS() }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -189,14 +165,11 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) }) }) From 18086666ffa97ce94f29566623eb5ed0719916e8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 1 Apr 2019 14:27:25 +0100 Subject: [PATCH 02/18] feat: adaper with passing tests --- package.json | 1 + src/adapter.js | 73 +++++ src/index.js | 17 +- test/adapter/compliance.node.js | 24 ++ test/adapter/node.js | 563 ++++++++++++++++++++++++++++++++ 5 files changed, 669 insertions(+), 9 deletions(-) create mode 100644 src/adapter.js create mode 100644 test/adapter/compliance.node.js create mode 100644 test/adapter/node.js diff --git a/package.json b/package.json index 3869ca8..6dc91cb 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..15015b9 --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,73 @@ +'use strict' + +const { Connection } = require('interface-connection') +const withIs = require('class-is') +const toPull = require('async-iterator-to-pull-stream') +const WebSockets = require('./') +const noop = () => {} + +function callbackify (fn) { + return async function (...args) { + let cb = args.pop() + if (typeof cb !== 'function') { + args.push(cb) + cb = noop + } + let res + try { + res = await fn(...args) + } catch (err) { + return cb(err) + } + cb(null, res) + } +} + +// Legacy adapter to old transport & connection interface +class WebSocketsAdapter extends WebSockets { + dial (ma, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + callback = callback || noop + + const socket = super.dial(ma, options) + const conn = new Connection(toPull.duplex(socket)) + + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + + socket.connected().then(callback).catch(callback) + + return conn + } + + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + const server = super.createListener(options, socket => { + const conn = new Connection(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + handler(conn) + }) + + const proxy = { + listen: callbackify(server.listen.bind(server)), + close: callbackify(server.close.bind(server)), + getAddrs: callbackify(server.getAddrs.bind(server)), + getObservedAddrs: callbackify(() => server.getObservedAddrs()) + } + + return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) + } +} + +module.exports = withIs(WebSocketsAdapter, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/src/index.js b/src/index.js index e6b2947..0237dc6 100644 --- a/src/index.js +++ b/src/index.js @@ -9,10 +9,9 @@ const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - async dial (ma, options) { + dial (ma, options) { log('dialing %s', ma) - const url = toUri(ma) - const socket = connect(url, { binary: true }) + const socket = connect(toUri(ma), { binary: true }) socket.getObservedAddrs = () => [ma] log('connected %s', ma) return socket @@ -23,9 +22,7 @@ class WebSockets { } filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { if (ma.protoNames().includes('p2p-circuit')) { @@ -36,10 +33,12 @@ class WebSockets { ma = ma.decapsulate('ipfs') } - return mafmt.WebSockets.matches(ma) || - mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma) }) } } -module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' }) +module.exports = withIs(WebSockets, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js new file mode 100644 index 0000000..e940090 --- /dev/null +++ b/test/adapter/compliance.node.js @@ -0,0 +1,24 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') +const WS = require('../../src/adapter') + +describe('compliance', () => { + tests({ + setup (callback) { + let ws = new WS() + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/dns4/ipfs.io/tcp/9092/ws'), + multiaddr('/dns4/ipfs.io/tcp/9092/wss') + ] + callback(null, ws, addrs) + }, + teardown (callback) { + callback() + } + }) +}) diff --git a/test/adapter/node.js b/test/adapter/node.js new file mode 100644 index 0000000..7537ba9 --- /dev/null +++ b/test/adapter/node.js @@ -0,0 +1,563 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + +const WS = require('../../src/adapter') + +require('./compliance.node') + +describe('instantiate the transport', () => { + it('create', () => { + const ws = new WS() + expect(ws).to.exist() + }) +}) + +describe('listen', () => { + describe('ip4', () => { + let ws + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it.skip('close listener with connections, through timeout', (done) => { + // TODO `ws` closes all anyway, we need to make it not close + // first - https://github.com/diasdavid/simple-websocket-server + }) + + it.skip('listen on port 0', (done) => { + // TODO port 0 not supported yet + }) + + it.skip('listen on any Interface', (done) => { + // TODO 0.0.0.0 not supported yet + }) + + it('getAddrs', (done) => { + const listener = ws.createListener((conn) => { + }) + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + + it('getAddrs on port 0 listen', (done) => { + const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs preserves IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + }) + + describe('ip6', () => { + let ws + const ma = multiaddr('/ip6/::1/tcp/9091/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + }) +}) + +describe('dial', () => { + describe('ip4', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) + + describe('ip6', () => { + let ws + let listener + const ma = multiaddr('/ip6/::1/tcp/9091') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) +}) + +describe('filter addrs', () => { + let ws + + before(() => { + ws = new WS() + }) + + describe('filter valid addrs for this transport', function () { + it('should fail invalid WS addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma2 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma3 = multiaddr('/ip6/::1/tcp/80') + const ma4 = multiaddr('/dnsaddr/ipfs.io/tcp/80') + + const valid = ws.filter([ma1, ma2, ma3, ma4]) + expect(valid.length).to.equal(0) + }) + + it('should filter correct ipv4 addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv4 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/80/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 address', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns address', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/ws') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws') + const ma3 = multiaddr('/dnsaddr/ipfs.io/tcp/80/wss') + + const valid = ws.filter([ma1, ma2, ma3]) + expect(valid.length).to.equal(3) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + expect(valid[2]).to.deep.equal(ma3) + }) + + it('should filter correct dns address with ipfs id', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns4 address', function () { + const ma1 = multiaddr('/dns4/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns4/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address with ipfs id', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter mixed addresses', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma3 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma4 = multiaddr('/dns6/ipfs.io/ws') + const mh5 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + + '/p2p-circuit/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2, ma3, ma4, mh5]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma4) + }) + }) + + it('filter a single addr for this transport', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter(ma) + expect(valid.length).to.equal(1) + expect(valid[0]).to.deep.equal(ma) + done() + }) +}) + +describe('valid Connection', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') + + it('get observed addrs', (done) => { + let dialerObsAddrs + let listenerObsAddrs + + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + dialerObsAddrs = addrs + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + listenerObsAddrs = addrs + + listener.close(onClose) + + function onClose () { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) + done() + } + }) + } + }) + }) + + it('get Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + listener.close(done) + }) + } + }) + }) + + it('set Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + conn.setPeerInfo('a') + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('a') + }) + + pull(conn, conn) + }) + + listener.listen(ma, onListen) + + function onListen () { + const conn = ws.dial(ma) + conn.setPeerInfo('b') + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('b') + listener.close(done) + }) + } + } + }) +}) + +describe.skip('turbolence', () => { + it('dialer - emits error on the other end is terminated abruptly', (done) => { + }) + it('listener - emits error on the other end is terminated abruptly', (done) => { + }) +}) From 7713c6f829a3c82a520ea491f68c6676de73982f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 2 Apr 2019 10:05:45 +0100 Subject: [PATCH 03/18] refactor: async dial that resolves on connection open License: MIT Signed-off-by: Alan Shaw --- src/adapter.js | 20 ++++++++++++++------ src/index.js | 3 ++- src/listener.js | 1 - 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/adapter.js b/src/adapter.js index 15015b9..76453bf 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -3,6 +3,8 @@ const { Connection } = require('interface-connection') const withIs = require('class-is') const toPull = require('async-iterator-to-pull-stream') +const error = require('pull-stream/sources/error') +const drain = require('pull-stream/sinks/drain') const WebSockets = require('./') const noop = () => {} @@ -33,13 +35,19 @@ class WebSocketsAdapter extends WebSockets { callback = callback || noop - const socket = super.dial(ma, options) - const conn = new Connection(toPull.duplex(socket)) + const conn = new Connection() - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - conn.close = callbackify(socket.close.bind(socket)) - - socket.connected().then(callback).catch(callback) + super.dial(ma, options) + .then(socket => { + conn.setInnerConn(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + callback(null, conn) + }) + .catch(err => { + conn.setInnerConn({ sink: drain(), source: error(err) }) + callback(err) + }) return conn } diff --git a/src/index.js b/src/index.js index 0237dc6..c1f620d 100644 --- a/src/index.js +++ b/src/index.js @@ -9,9 +9,10 @@ const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - dial (ma, options) { + async dial (ma, options) { log('dialing %s', ma) const socket = connect(toUri(ma), { binary: true }) + await socket.connected() socket.getObservedAddrs = () => [ma] log('connected %s', ma) return socket diff --git a/src/listener.js b/src/listener.js index 64252b1..505e811 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,7 +2,6 @@ const multiaddr = require('multiaddr') const os = require('os') - const createServer = require('it-ws/server') module.exports = (options, handler) => { From 083a806f32bfdaf7e7c8a011e83a434a7bb1bc88 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 2 Apr 2019 15:18:30 +0100 Subject: [PATCH 04/18] fix: listener params and adapter tests License: MIT Signed-off-by: Alan Shaw --- package.json | 6 +- src/listener.js | 7 + test/adapter/compliance.node.js | 4 +- test/adapter/compliance/dial-test.js | 73 +++++++++ test/adapter/compliance/index.js | 12 ++ test/adapter/compliance/listen-test.js | 124 ++++++++++++++ test/adapter/index.js | 2 + test/adapter/node.js | 17 +- test/node.js | 213 +++++-------------------- 9 files changed, 274 insertions(+), 184 deletions(-) create mode 100644 test/adapter/compliance/dial-test.js create mode 100644 test/adapter/compliance/index.js create mode 100644 test/adapter/compliance/listen-test.js create mode 100644 test/adapter/index.js diff --git a/package.json b/package.json index 6dc91cb..e55279e 100644 --- a/package.json +++ b/package.json @@ -53,9 +53,13 @@ "chai": "^4.2.0", "dirty-chai": "^2.0.1", "interface-transport": "~0.3.7", + "it-goodbye": "^1.0.0", + "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", "pull-goodbye": "0.0.2", - "pull-stream": "^3.6.9" + "pull-serializer": "~0.3.2", + "pull-stream": "^3.6.9", + "streaming-iterables": "^4.0.2" }, "contributors": [ "Chris Campbell ", diff --git a/src/listener.js b/src/listener.js index 505e811..8250bcc 100644 --- a/src/listener.js +++ b/src/listener.js @@ -5,6 +5,13 @@ const os = require('os') const createServer = require('it-ws/server') module.exports = (options, handler) => { + if (typeof options === 'function') { + handler = options + options = {} + } + + options = options || {} + const server = createServer(options, socket => { socket.getObservedAddrs = () => [] handler(socket) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js index e940090..b309240 100644 --- a/test/adapter/compliance.node.js +++ b/test/adapter/compliance.node.js @@ -1,11 +1,11 @@ /* eslint-env mocha */ 'use strict' -const tests = require('interface-transport') +const tests = require('./compliance') const multiaddr = require('multiaddr') const WS = require('../../src/adapter') -describe('compliance', () => { +describe('adapter compliance', () => { tests({ setup (callback) { let ws = new WS() diff --git a/test/adapter/compliance/dial-test.js b/test/adapter/compliance/dial-test.js new file mode 100644 index 0000000..85c1a6c --- /dev/null +++ b/test/adapter/compliance/dial-test.js @@ -0,0 +1,73 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') +const serializer = require('pull-serializer') + +module.exports = (common) => { + describe('dial', () => { + let addrs + let transport + let listener + + before((done) => { + common.setup((err, _transport, _addrs) => { + if (err) return done(err) + transport = _transport + addrs = _addrs + done() + }) + }) + + after((done) => { + common.teardown(done) + }) + + beforeEach((done) => { + listener = transport.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(addrs[0], done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('simple', (done) => { + const s = serializer(goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, values) => { + expect(err).to.not.exist() + expect( + values + ).to.be.eql( + ['hey'] + ) + done() + }) + })) + + pull( + s, + transport.dial(addrs[0]), + s + ) + }) + + it('to non existent listener', (done) => { + pull( + transport.dial(addrs[1]), + pull.onEnd((err) => { + expect(err).to.exist() + done() + }) + ) + }) + }) +} diff --git a/test/adapter/compliance/index.js b/test/adapter/compliance/index.js new file mode 100644 index 0000000..e8173e2 --- /dev/null +++ b/test/adapter/compliance/index.js @@ -0,0 +1,12 @@ +/* eslint-env mocha */ +'use strict' + +const dial = require('./dial-test') +const listen = require('./listen-test') + +module.exports = (common) => { + describe('interface-transport', () => { + dial(common) + listen(common) + }) +} diff --git a/test/adapter/compliance/listen-test.js b/test/adapter/compliance/listen-test.js new file mode 100644 index 0000000..082361a --- /dev/null +++ b/test/adapter/compliance/listen-test.js @@ -0,0 +1,124 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const pull = require('pull-stream') + +module.exports = (common) => { + describe('listen', () => { + let addrs + let transport + + before((done) => { + common.setup((err, _transport, _addrs) => { + if (err) return done(err) + transport = _transport + addrs = _addrs + done() + }) + }) + + after((done) => { + common.teardown(done) + }) + + it('simple', (done) => { + const listener = transport.createListener((conn) => {}) + listener.listen(addrs[0], () => { + listener.close(done) + }) + }) + + it('close listener with connections, through timeout', (done) => { + const finish = plan(3, done) + const listener = transport.createListener((conn) => { + pull(conn, conn) + }) + + listener.listen(addrs[0], () => { + const socket1 = transport.dial(addrs[0], () => { + listener.close(finish) + }) + + pull( + transport.dial(addrs[0]), + pull.onEnd(() => { + finish() + }) + ) + + pull( + pull.values([Buffer.from('Some data that is never handled')]), + socket1, + pull.onEnd(() => { + finish() + }) + ) + }) + }) + + describe('events', () => { + // eslint-disable-next-line + // TODO: figure out why it fails in the full test suite + it.skip('connection', (done) => { + const finish = plan(2, done) + + const listener = transport.createListener() + + listener.on('connection', (conn) => { + expect(conn).to.exist() + finish() + }) + + listener.listen(addrs[0], () => { + transport.dial(addrs[0], () => { + listener.close(finish) + }) + }) + }) + + it('listening', (done) => { + const listener = transport.createListener() + listener.on('listening', () => { + listener.close(done) + }) + listener.listen(addrs[0]) + }) + + // eslint-disable-next-line + // TODO: how to get the listener to emit an error? + it.skip('error', (done) => { + const listener = transport.createListener() + listener.on('error', (err) => { + expect(err).to.exist() + listener.close(done) + }) + }) + + it('close', (done) => { + const finish = plan(2, done) + const listener = transport.createListener() + listener.on('close', finish) + + listener.listen(addrs[0], () => { + listener.close(finish) + }) + }) + }) + }) +} + +function plan (n, done) { + let i = 0 + return (err) => { + if (err) return done(err) + i++ + + if (i === n) done() + } +} diff --git a/test/adapter/index.js b/test/adapter/index.js new file mode 100644 index 0000000..24be09b --- /dev/null +++ b/test/adapter/index.js @@ -0,0 +1,2 @@ +require('./compliance.node') +require('./node') diff --git a/test/adapter/node.js b/test/adapter/node.js index 7537ba9..9441cbd 100644 --- a/test/adapter/node.js +++ b/test/adapter/node.js @@ -14,14 +14,14 @@ const WS = require('../../src/adapter') require('./compliance.node') -describe('instantiate the transport', () => { +describe('adapter instantiate the transport', () => { it('create', () => { const ws = new WS() expect(ws).to.exist() }) }) -describe('listen', () => { +describe('adapter listen', () => { describe('ip4', () => { let ws const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') @@ -201,7 +201,7 @@ describe('listen', () => { }) }) -describe('dial', () => { +describe('adapter dial', () => { describe('ip4', () => { let ws let listener @@ -305,7 +305,7 @@ describe('dial', () => { }) }) -describe('filter addrs', () => { +describe('adapter filter addrs', () => { let ws before(() => { @@ -440,7 +440,7 @@ describe('filter addrs', () => { }) }) -describe('valid Connection', () => { +describe('adapter valid Connection', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') it('get observed addrs', (done) => { @@ -554,10 +554,3 @@ describe('valid Connection', () => { } }) }) - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - }) - it('listener - emits error on the other end is terminated abruptly', (done) => { - }) -}) diff --git a/test/node.js b/test/node.js index d67852e..6ff4d48 100644 --- a/test/node.js +++ b/test/node.js @@ -7,12 +7,13 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const goodbye = require('it-goodbye') +const { collect, consume } = require('streaming-iterables') +const pipe = require('it-pipe') const WS = require('../src') -require('./compliance.node') +// require('./compliance.node') describe('instantiate the transport', () => { it('create', () => { @@ -180,49 +181,31 @@ describe('dial', () => { let listener const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - beforeEach((done) => { + beforeEach(() => { ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) - it('dial with IPFS Id', (done) => { + it('dial with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) }) @@ -231,49 +214,34 @@ describe('dial', () => { let listener const ma = multiaddr('/ip6/::1/tcp/9091') - beforeEach((done) => { + beforeEach(() => { ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) - it('dial with IPFS Id', (done) => { + it('dial with IPFS Id', async () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) + const conn = await ws.dial(ma) const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) + source: ['hey'], + sink: collect }) - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.be.eql(['hey']) }) }) }) @@ -416,121 +384,28 @@ describe('filter addrs', () => { describe('valid Connection', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') - it('get observed addrs', (done) => { + it('get observed addrs', async () => { let dialerObsAddrs let listenerObsAddrs const ws = new WS() - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - listenerObsAddrs = addrs - - listener.close(onClose) - - function onClose () { - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) - done() - } - }) - } - }) - }) - - it('get Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { + const listener = ws.createListener(async conn => { expect(conn).to.exist() - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - - pull(conn, conn) + dialerObsAddrs = await conn.getObservedAddrs() + pipe(conn, conn) }) - listener.listen(ma, () => { - const conn = ws.dial(ma) + await listener.listen(ma) + const conn = await ws.dial(ma) - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) + await pipe([], conn, consume) - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - listener.close(done) - }) - } - }) - }) + listenerObsAddrs = await conn.getObservedAddrs() - it('set Peer Info', (done) => { - const ws = new WS() + await listener.close() - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('a') - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('a') - }) - - pull(conn, conn) - }) - - listener.listen(ma, onListen) - - function onListen () { - const conn = ws.dial(ma) - conn.setPeerInfo('b') - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('b') - listener.close(done) - }) - } - } - }) -}) - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - }) - it('listener - emits error on the other end is terminated abruptly', (done) => { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) }) }) From 71965faae5cef6061e8aa5deece611467da73deb Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 3 Apr 2019 10:38:35 +0100 Subject: [PATCH 05/18] test: add interface tests License: MIT Signed-off-by: Alan Shaw --- .aegir.js | 11 ++++-- src/listener.js | 4 +- test/adapter/browser.js | 81 +++++++++++++++++++++++++++++++++++++++++ test/adapter/index.js | 2 + test/browser.js | 68 ++++++++++++++-------------------- test/compliance.node.js | 10 ++--- test/node.js | 2 +- 7 files changed, 124 insertions(+), 54 deletions(-) create mode 100644 test/adapter/browser.js diff --git a/.aegir.js b/.aegir.js index f43cfb2..3f672dc 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,21 +1,24 @@ 'use strict' const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') const WS = require('./src') let listener function boot (done) { + console.log('boot!') const ws = new WS() const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - listener = ws.createListener((conn) => pull(conn, conn)) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + listener.listen(ma).then(() => done()).catch(done) + listener.on('error', console.error) } function shutdown (done) { - listener.close(done) + console.log('shutdown') + listener.close().then(done).catch(done) } module.exports = { diff --git a/src/listener.js b/src/listener.js index 8250bcc..13c2d05 100644 --- a/src/listener.js +++ b/src/listener.js @@ -12,10 +12,10 @@ module.exports = (options, handler) => { options = options || {} - const server = createServer(options, socket => { + const server = createServer(options, handler ? socket => { socket.getObservedAddrs = () => [] handler(socket) - }) + } : null) let listeningMultiaddr diff --git a/test/adapter/browser.js b/test/adapter/browser.js new file mode 100644 index 0000000..39cc612 --- /dev/null +++ b/test/adapter/browser.js @@ -0,0 +1,81 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + +const WS = require('../../src/adapter') + +describe('adapter libp2p-websockets', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') + let ws + let conn + + beforeEach((done) => { + ws = new WS() + expect(ws).to.exist() + conn = ws.dial(ma, (err, res) => { + expect(err).to.not.exist() + done() + }) + }) + + it('echo', (done) => { + const message = 'Hello World!' + + const s = goodbye({ + source: pull.values([message]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist() + expect(results).to.eql([message]) + done() + }) + }) + + pull(s, conn, s) + }) + + describe('stress', () => { + it('one big write', (done) => { + const rawMessage = Buffer.allocUnsafe(1000000).fill('a') + + const s = goodbye({ + source: pull.values([rawMessage]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist() + expect(results).to.eql([rawMessage]) + done() + }) + }) + pull(s, conn, s) + }) + + it('many writes', function (done) { + this.timeout(100000) + const s = goodbye({ + source: pull( + pull.infinite(), + pull.take(20000), + pull.map((val) => Buffer.from(val.toString())) + ), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + expect(result).to.have.length(20000) + done() + }) + }) + + pull(s, conn, s) + }) + }) +}) + +it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() +}) diff --git a/test/adapter/index.js b/test/adapter/index.js index 24be09b..a09dc02 100644 --- a/test/adapter/index.js +++ b/test/adapter/index.js @@ -1,2 +1,4 @@ +'use strict' + require('./compliance.node') require('./node') diff --git a/test/browser.js b/test/browser.js index bc4e9db..f627fb3 100644 --- a/test/browser.js +++ b/test/browser.js @@ -7,71 +7,57 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const pipe = require('it-pipe') +const goodbye = require('it-goodbye') +const { collect, take } = require('streaming-iterables') const WS = require('../src') +// require('./adapter/browser') + describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws let conn - beforeEach((done) => { + beforeEach(async () => { ws = new WS() - expect(ws).to.exist() - conn = ws.dial(ma, (err, res) => { - expect(err).to.not.exist() - done() - }) + conn = await ws.dial(ma) }) - it('echo', (done) => { + it('echo', async () => { const message = 'Hello World!' + const s = goodbye({ source: [message], sink: collect }) - const s = goodbye({ - source: pull.values([message]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([message]) - done() - }) - }) - - pull(s, conn, s) + const results = await pipe(s, conn, s) + expect(results).to.eql([message]) }) describe('stress', () => { - it('one big write', (done) => { + it('one big write', async () => { const rawMessage = Buffer.allocUnsafe(1000000).fill('a') - const s = goodbye({ - source: pull.values([rawMessage]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([rawMessage]) - done() - }) - }) - pull(s, conn, s) + const s = goodbye({ source: [rawMessage], sink: collect }) + + const results = await pipe(s, conn, s) + expect(results).to.eql([rawMessage]) }) - it('many writes', function (done) { - this.timeout(10000) + it('many writes', async function () { + this.timeout(100000) const s = goodbye({ - source: pull( - pull.infinite(), - pull.take(1000), - pull.map((val) => Buffer.from(val.toString())) + source: pipe( + { + [Symbol.iterator] () { return this }, + next: () => ({ done: false, value: Buffer.from(Math.random().toString()) }) + }, + take(20000) ), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - expect(result).to.have.length(1000) - done() - }) + sink: collect }) - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.have.length(20000) }) }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js index a4977d9..e625142 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -5,9 +5,9 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') const WS = require('../src') -describe('compliance', () => { +describe('adapter compliance', () => { tests({ - setup (callback) { + async setup () { const ws = new WS() const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), @@ -15,10 +15,8 @@ describe('compliance', () => { multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] - callback(null, ws, addrs) + return { transport: ws, addrs } }, - teardown (callback) { - callback() - } + async teardown () {} }) }) diff --git a/test/node.js b/test/node.js index 6ff4d48..95add5c 100644 --- a/test/node.js +++ b/test/node.js @@ -13,7 +13,7 @@ const pipe = require('it-pipe') const WS = require('../src') -// require('./compliance.node') +require('./compliance.node') describe('instantiate the transport', () => { it('create', () => { From 0a97cc8c3feb4075f3c4f7bef7c9519f14509ab3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 17 Apr 2019 12:34:56 +0100 Subject: [PATCH 06/18] fix: tests License: MIT Signed-off-by: Alan Shaw --- .aegir.js | 3 --- package.json | 2 +- src/index.js | 2 +- test/adapter/browser.js | 6 +++--- test/browser.js | 10 +++++----- test/node.js | 8 ++++---- 6 files changed, 14 insertions(+), 17 deletions(-) diff --git a/.aegir.js b/.aegir.js index 3f672dc..ddf424a 100644 --- a/.aegir.js +++ b/.aegir.js @@ -2,13 +2,11 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') - const WS = require('./src') let listener function boot (done) { - console.log('boot!') const ws = new WS() const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener(conn => pipe(conn, conn)) @@ -17,7 +15,6 @@ function boot (done) { } function shutdown (done) { - console.log('shutdown') listener.close().then(done).catch(done) } diff --git a/package.json b/package.json index e55279e..a620634 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "chai": "^4.2.0", "dirty-chai": "^2.0.1", "interface-transport": "~0.3.7", - "it-goodbye": "^1.0.0", + "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", "pull-goodbye": "0.0.2", diff --git a/src/index.js b/src/index.js index c1f620d..ab4bbf9 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,7 @@ const createListener = require('./listener') class WebSockets { async dial (ma, options) { log('dialing %s', ma) - const socket = connect(toUri(ma), { binary: true }) + const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) await socket.connected() socket.getObservedAddrs = () => [ma] log('connected %s', ma) diff --git a/test/adapter/browser.js b/test/adapter/browser.js index 39cc612..52a1513 100644 --- a/test/adapter/browser.js +++ b/test/adapter/browser.js @@ -74,8 +74,8 @@ describe('adapter libp2p-websockets', () => { pull(s, conn, s) }) }) -}) -it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() + }) }) diff --git a/test/browser.js b/test/browser.js index f627fb3..78134c7 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,7 +13,7 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') -// require('./adapter/browser') +require('./adapter/browser') describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') @@ -26,7 +26,7 @@ describe('libp2p-websockets', () => { }) it('echo', async () => { - const message = 'Hello World!' + const message = Buffer.from('Hello World!') const s = goodbye({ source: [message], sink: collect }) const results = await pipe(s, conn, s) @@ -60,8 +60,8 @@ describe('libp2p-websockets', () => { expect(result).to.have.length(20000) }) }) -}) -it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() + }) }) diff --git a/test/node.js b/test/node.js index 95add5c..b7fe695 100644 --- a/test/node.js +++ b/test/node.js @@ -195,7 +195,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) it('dial with IPFS Id', async () => { @@ -205,7 +205,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) }) @@ -228,7 +228,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) it('dial with IPFS Id', async () => { @@ -241,7 +241,7 @@ describe('dial', () => { }) const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) }) }) From b600f6820f1a06d1ab225cbcf2fd3669fd8681d9 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 09:52:49 +0100 Subject: [PATCH 07/18] feat: abortable dials License: MIT Signed-off-by: Alan Shaw --- package.json | 2 ++ src/index.js | 50 ++++++++++++++++++++++++++++++++++++++++++++++---- test/node.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index a620634..157e240 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "abortable-iterator": "^1.0.4", "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", @@ -49,6 +50,7 @@ "multiaddr-to-uri": "^5.0.0" }, "devDependencies": { + "abort-controller": "^3.0.0", "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index ab4bbf9..16110e3 100644 --- a/src/index.js +++ b/src/index.js @@ -5,17 +5,59 @@ const mafmt = require('mafmt') const withIs = require('class-is') const toUri = require('multiaddr-to-uri') const log = require('debug')('libp2p:websockets:transport') - +const abortable = require('abortable-iterator') const createListener = require('./listener') +const { AbortError } = abortable class WebSockets { async dial (ma, options) { + options = options || {} log('dialing %s', ma) + const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) - await socket.connected() - socket.getObservedAddrs = () => [ma] + const getObservedAddrs = () => [ma] + + if (!options.signal) { + socket.getObservedAddrs = getObservedAddrs + await socket.connected() + log('connected %s', ma) + return socket + } + + // Allow abort via signal during connect + let onAbort + const abort = new Promise((resolve, reject) => { + onAbort = () => { + socket.close() + reject(new AbortError('connection aborted')) + } + + // Already aborted? + if (options.signal.aborted) return onAbort() + options.signal.addEventListener('abort', onAbort) + }) + + try { + await Promise.race([abort, socket.connected()]) + } finally { + options.signal.removeEventListener('abort', onAbort) + } + log('connected %s', ma) - return socket + return { + sink: async source => { + try { + await socket.sink(abortable(source, options.signal)) + } catch (err) { + // Re-throw non-aborted errors + if (err.type !== 'aborted') throw err + // Otherwise, this is fine... + await socket.close() + } + }, + source: abortable(socket.source, options.signal), + getObservedAddrs + } } createListener (options, handler) { diff --git a/test/node.js b/test/node.js index b7fe695..14574a0 100644 --- a/test/node.js +++ b/test/node.js @@ -10,6 +10,7 @@ const multiaddr = require('multiaddr') const goodbye = require('it-goodbye') const { collect, consume } = require('streaming-iterables') const pipe = require('it-pipe') +const AbortController = require('abort-controller') const WS = require('../src') @@ -207,6 +208,49 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) + + it('should be abortable after connect', async () => { + const controller = new AbortController() + const conn = await ws.dial(ma, { signal: controller.signal }) + const s = goodbye({ + source: { + [Symbol.asyncIterator] () { + return this + }, + next () { + return new Promise(resolve => { + setTimeout(() => resolve(Math.random()), 1000) + }) + } + }, + sink: consume + }) + + setTimeout(() => controller.abort(), 500) + + try { + await pipe(s, conn, s) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) + + it('should be abortable before connect', async () => { + const controller = new AbortController() + controller.abort() // Abort before connect + + try { + await ws.dial(ma, { signal: controller.signal }) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) }) describe('ip6', () => { From 01a82178572a9b4dfa1c067bb9260fb2ecbd7322 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 10:22:24 +0100 Subject: [PATCH 08/18] chore: update interface-transport dependency Async await changes merged but not released :( License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 157e240..1e0339b 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.7", + "interface-transport": "~0.6.1", "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", From e2af4a620ad84b8026934bcf6f1fee0dde45ec91 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 10:53:59 +0100 Subject: [PATCH 09/18] fix: describe name License: MIT Signed-off-by: Alan Shaw --- test/compliance.node.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/compliance.node.js b/test/compliance.node.js index e625142..18ef713 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -5,7 +5,7 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') const WS = require('../src') -describe('adapter compliance', () => { +describe('compliance', () => { tests({ async setup () { const ws = new WS() From 3e50f5f0ee87f5f430434be909ab8dd4789b19db Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 13:31:26 +0100 Subject: [PATCH 10/18] fix: tests License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- src/index.js | 2 +- src/listener.js | 2 +- test/compliance.node.js | 43 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 1e0339b..c2fe525 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { - "abortable-iterator": "^1.0.4", + "abortable-iterator": "^2.0.0", "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", diff --git a/src/index.js b/src/index.js index 16110e3..bf93f81 100644 --- a/src/index.js +++ b/src/index.js @@ -28,8 +28,8 @@ class WebSockets { let onAbort const abort = new Promise((resolve, reject) => { onAbort = () => { + reject(new AbortError()) socket.close() - reject(new AbortError('connection aborted')) } // Already aborted? diff --git a/src/listener.js b/src/listener.js index 13c2d05..6334934 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,7 +2,7 @@ const multiaddr = require('multiaddr') const os = require('os') -const createServer = require('it-ws/server') +const { createServer } = require('it-ws') module.exports = (options, handler) => { if (typeof options === 'function') { diff --git a/test/compliance.node.js b/test/compliance.node.js index 18ef713..13e4605 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -3,6 +3,7 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') +const http = require('http') const WS = require('../src') describe('compliance', () => { @@ -15,7 +16,47 @@ describe('compliance', () => { multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] - return { transport: ws, addrs } + + let delayMs = 0 + const delayedCreateListener = (options, handler) => { + if (typeof options === 'function') { + handler = options + options = {} + } + + options = options || {} + + // A server that will delay the upgrade event by delayMs + options.server = new Proxy(http.createServer(), { + get (server, prop) { + if (prop === 'on') { + return (event, handler) => { + server.on(event, (...args) => { + if (event !== 'upgrade' || !delayMs) { + return handler(...args) + } + setTimeout(() => handler(...args), delayMs) + }) + } + } + return server[prop] + } + }) + + return ws.createListener(options, handler) + } + + const wsProxy = new Proxy(ws, { + get: (_, prop) => prop === 'createListener' ? delayedCreateListener : ws[prop] + }) + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (ms) { delayMs = ms }, + restore () { delayMs = 0 } + } + + return { transport: wsProxy, addrs, connector } }, async teardown () {} }) From a0b039d3a8d1b97c75aa076aeaf3a0fef1c51cb4 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:03:10 +0100 Subject: [PATCH 11/18] refactor: use adapter class in interface-transport License: MIT Signed-off-by: Alan Shaw --- package.json | 1 - src/adapter.js | 72 +++----------------------------------------------- 2 files changed, 4 insertions(+), 69 deletions(-) diff --git a/package.json b/package.json index c2fe525..e460d58 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,6 @@ "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { "abortable-iterator": "^2.0.0", - "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", diff --git a/src/adapter.js b/src/adapter.js index 76453bf..c27d009 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -1,77 +1,13 @@ 'use strict' -const { Connection } = require('interface-connection') +const { Adapter } = require('interface-transport') const withIs = require('class-is') -const toPull = require('async-iterator-to-pull-stream') -const error = require('pull-stream/sources/error') -const drain = require('pull-stream/sinks/drain') const WebSockets = require('./') -const noop = () => {} - -function callbackify (fn) { - return async function (...args) { - let cb = args.pop() - if (typeof cb !== 'function') { - args.push(cb) - cb = noop - } - let res - try { - res = await fn(...args) - } catch (err) { - return cb(err) - } - cb(null, res) - } -} // Legacy adapter to old transport & connection interface -class WebSocketsAdapter extends WebSockets { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || noop - - const conn = new Connection() - - super.dial(ma, options) - .then(socket => { - conn.setInnerConn(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - conn.close = callbackify(socket.close.bind(socket)) - callback(null, conn) - }) - .catch(err => { - conn.setInnerConn({ sink: drain(), source: error(err) }) - callback(err) - }) - - return conn - } - - createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = {} - } - - const server = super.createListener(options, socket => { - const conn = new Connection(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - handler(conn) - }) - - const proxy = { - listen: callbackify(server.listen.bind(server)), - close: callbackify(server.close.bind(server)), - getAddrs: callbackify(server.getAddrs.bind(server)), - getObservedAddrs: callbackify(() => server.getObservedAddrs()) - } - - return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) +class WebSocketsAdapter extends Adapter { + constructor () { + super(new WebSockets()) } } From 583f8e6e7d4418a4056529fb7f01e580478d9192 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:08:17 +0100 Subject: [PATCH 12/18] fix: remove async from non-async function License: MIT Signed-off-by: Alan Shaw --- src/listener.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/listener.js b/src/listener.js index 6334934..48d3300 100644 --- a/src/listener.js +++ b/src/listener.js @@ -30,7 +30,7 @@ module.exports = (options, handler) => { return listen(ma.toOptions()) } - server.getAddrs = async () => { + server.getAddrs = () => { const multiaddrs = [] const address = server.address() From aa8eeb8e90dcbee14723f34ef44ddfa55a682008 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:27:15 +0100 Subject: [PATCH 13/18] chore: cleanup License: MIT Signed-off-by: Alan Shaw --- .gitignore | 2 +- ci/Jenkinsfile | 2 -- package.json | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) delete mode 100644 ci/Jenkinsfile diff --git a/.gitignore b/.gitignore index c2b6311..f338286 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ lib-cov # Coverage directory used by tools like istanbul coverage +.nyc_output # Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) .grunt @@ -40,4 +41,3 @@ node_modules *.swp dist - diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile deleted file mode 100644 index a7da2e5..0000000 --- a/ci/Jenkinsfile +++ /dev/null @@ -1,2 +0,0 @@ -// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -javascript() diff --git a/package.json b/package.json index e460d58..e502598 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,7 @@ "release": "aegir release -t node -t browser ", "release-minor": "aegir release --type minor -t node -t browser", "release-major": "aegir release --type major -t node -t browser", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=lcov --reporter=text npm run test:node" }, "browser": { "src/listener": "./src/listener.browser.js" From eb071fbdbf34be260b64e4c3defcbeba119d5014 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:28:46 +0100 Subject: [PATCH 14/18] chore: remove old interface-transport tests License: MIT Signed-off-by: Alan Shaw --- package.json | 3 - test/adapter/browser.js | 81 ---- test/adapter/compliance.node.js | 24 -- test/adapter/compliance/dial-test.js | 73 ---- test/adapter/compliance/index.js | 12 - test/adapter/compliance/listen-test.js | 124 ------ test/adapter/index.js | 4 - test/adapter/node.js | 556 ------------------------- test/browser.js | 2 - 9 files changed, 879 deletions(-) delete mode 100644 test/adapter/browser.js delete mode 100644 test/adapter/compliance.node.js delete mode 100644 test/adapter/compliance/dial-test.js delete mode 100644 test/adapter/compliance/index.js delete mode 100644 test/adapter/compliance/listen-test.js delete mode 100644 test/adapter/index.js delete mode 100644 test/adapter/node.js diff --git a/package.json b/package.json index e502598..28b5051 100644 --- a/package.json +++ b/package.json @@ -56,9 +56,6 @@ "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", - "pull-goodbye": "0.0.2", - "pull-serializer": "~0.3.2", - "pull-stream": "^3.6.9", "streaming-iterables": "^4.0.2" }, "contributors": [ diff --git a/test/adapter/browser.js b/test/adapter/browser.js deleted file mode 100644 index 52a1513..0000000 --- a/test/adapter/browser.js +++ /dev/null @@ -1,81 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') - -const WS = require('../../src/adapter') - -describe('adapter libp2p-websockets', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - let ws - let conn - - beforeEach((done) => { - ws = new WS() - expect(ws).to.exist() - conn = ws.dial(ma, (err, res) => { - expect(err).to.not.exist() - done() - }) - }) - - it('echo', (done) => { - const message = 'Hello World!' - - const s = goodbye({ - source: pull.values([message]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([message]) - done() - }) - }) - - pull(s, conn, s) - }) - - describe('stress', () => { - it('one big write', (done) => { - const rawMessage = Buffer.allocUnsafe(1000000).fill('a') - - const s = goodbye({ - source: pull.values([rawMessage]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([rawMessage]) - done() - }) - }) - pull(s, conn, s) - }) - - it('many writes', function (done) { - this.timeout(100000) - const s = goodbye({ - source: pull( - pull.infinite(), - pull.take(20000), - pull.map((val) => Buffer.from(val.toString())) - ), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - expect(result).to.have.length(20000) - done() - }) - }) - - pull(s, conn, s) - }) - }) - - it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() - }) -}) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js deleted file mode 100644 index b309240..0000000 --- a/test/adapter/compliance.node.js +++ /dev/null @@ -1,24 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const tests = require('./compliance') -const multiaddr = require('multiaddr') -const WS = require('../../src/adapter') - -describe('adapter compliance', () => { - tests({ - setup (callback) { - let ws = new WS() - const addrs = [ - multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), - multiaddr('/dns4/ipfs.io/tcp/9092/ws'), - multiaddr('/dns4/ipfs.io/tcp/9092/wss') - ] - callback(null, ws, addrs) - }, - teardown (callback) { - callback() - } - }) -}) diff --git a/test/adapter/compliance/dial-test.js b/test/adapter/compliance/dial-test.js deleted file mode 100644 index 85c1a6c..0000000 --- a/test/adapter/compliance/dial-test.js +++ /dev/null @@ -1,73 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') -const serializer = require('pull-serializer') - -module.exports = (common) => { - describe('dial', () => { - let addrs - let transport - let listener - - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) - }) - - after((done) => { - common.teardown(done) - }) - - beforeEach((done) => { - listener = transport.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(addrs[0], done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('simple', (done) => { - const s = serializer(goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, values) => { - expect(err).to.not.exist() - expect( - values - ).to.be.eql( - ['hey'] - ) - done() - }) - })) - - pull( - s, - transport.dial(addrs[0]), - s - ) - }) - - it('to non existent listener', (done) => { - pull( - transport.dial(addrs[1]), - pull.onEnd((err) => { - expect(err).to.exist() - done() - }) - ) - }) - }) -} diff --git a/test/adapter/compliance/index.js b/test/adapter/compliance/index.js deleted file mode 100644 index e8173e2..0000000 --- a/test/adapter/compliance/index.js +++ /dev/null @@ -1,12 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const dial = require('./dial-test') -const listen = require('./listen-test') - -module.exports = (common) => { - describe('interface-transport', () => { - dial(common) - listen(common) - }) -} diff --git a/test/adapter/compliance/listen-test.js b/test/adapter/compliance/listen-test.js deleted file mode 100644 index 082361a..0000000 --- a/test/adapter/compliance/listen-test.js +++ /dev/null @@ -1,124 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const pull = require('pull-stream') - -module.exports = (common) => { - describe('listen', () => { - let addrs - let transport - - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) - }) - - after((done) => { - common.teardown(done) - }) - - it('simple', (done) => { - const listener = transport.createListener((conn) => {}) - listener.listen(addrs[0], () => { - listener.close(done) - }) - }) - - it('close listener with connections, through timeout', (done) => { - const finish = plan(3, done) - const listener = transport.createListener((conn) => { - pull(conn, conn) - }) - - listener.listen(addrs[0], () => { - const socket1 = transport.dial(addrs[0], () => { - listener.close(finish) - }) - - pull( - transport.dial(addrs[0]), - pull.onEnd(() => { - finish() - }) - ) - - pull( - pull.values([Buffer.from('Some data that is never handled')]), - socket1, - pull.onEnd(() => { - finish() - }) - ) - }) - }) - - describe('events', () => { - // eslint-disable-next-line - // TODO: figure out why it fails in the full test suite - it.skip('connection', (done) => { - const finish = plan(2, done) - - const listener = transport.createListener() - - listener.on('connection', (conn) => { - expect(conn).to.exist() - finish() - }) - - listener.listen(addrs[0], () => { - transport.dial(addrs[0], () => { - listener.close(finish) - }) - }) - }) - - it('listening', (done) => { - const listener = transport.createListener() - listener.on('listening', () => { - listener.close(done) - }) - listener.listen(addrs[0]) - }) - - // eslint-disable-next-line - // TODO: how to get the listener to emit an error? - it.skip('error', (done) => { - const listener = transport.createListener() - listener.on('error', (err) => { - expect(err).to.exist() - listener.close(done) - }) - }) - - it('close', (done) => { - const finish = plan(2, done) - const listener = transport.createListener() - listener.on('close', finish) - - listener.listen(addrs[0], () => { - listener.close(finish) - }) - }) - }) - }) -} - -function plan (n, done) { - let i = 0 - return (err) => { - if (err) return done(err) - i++ - - if (i === n) done() - } -} diff --git a/test/adapter/index.js b/test/adapter/index.js deleted file mode 100644 index a09dc02..0000000 --- a/test/adapter/index.js +++ /dev/null @@ -1,4 +0,0 @@ -'use strict' - -require('./compliance.node') -require('./node') diff --git a/test/adapter/node.js b/test/adapter/node.js deleted file mode 100644 index 9441cbd..0000000 --- a/test/adapter/node.js +++ /dev/null @@ -1,556 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 6] */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') - -const WS = require('../../src/adapter') - -require('./compliance.node') - -describe('adapter instantiate the transport', () => { - it('create', () => { - const ws = new WS() - expect(ws).to.exist() - }) -}) - -describe('adapter listen', () => { - describe('ip4', () => { - let ws - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - - beforeEach(() => { - ws = new WS() - }) - - it('listen, check for callback', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it('listen, check for listening event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.close(done) - }) - - listener.listen(ma) - }) - - it('listen, check for the close event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.on('close', done) - listener.close() - }) - - listener.listen(ma) - }) - - it('listen on addr with /ipfs/QmHASH', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it.skip('close listener with connections, through timeout', (done) => { - // TODO `ws` closes all anyway, we need to make it not close - // first - https://github.com/diasdavid/simple-websocket-server - }) - - it.skip('listen on port 0', (done) => { - // TODO port 0 not supported yet - }) - - it.skip('listen on any Interface', (done) => { - // TODO 0.0.0.0 not supported yet - }) - - it('getAddrs', (done) => { - const listener = ws.createListener((conn) => { - }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) - }) - - it('getAddrs on port 0 listen', (done) => { - const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) - }) - - it('getAddrs from listening on 0.0.0.0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - listener.close(done) - }) - }) - }) - - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) - }) - - it('getAddrs preserves IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) - }) - }) - - describe('ip6', () => { - let ws - const ma = multiaddr('/ip6/::1/tcp/9091/ws') - - beforeEach(() => { - ws = new WS() - }) - - it('listen, check for callback', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it('listen, check for listening event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.close(done) - }) - - listener.listen(ma) - }) - - it('listen, check for the close event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.on('close', done) - listener.close() - }) - - listener.listen(ma) - }) - - it('listen on addr with /ipfs/QmHASH', (done) => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - }) -}) - -describe('adapter dial', () => { - describe('ip4', () => { - let ws - let listener - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('dial', (done) => { - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - }) - - describe('ip6', () => { - let ws - let listener - const ma = multiaddr('/ip6/::1/tcp/9091') - - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('dial', (done) => { - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - }) -}) - -describe('adapter filter addrs', () => { - let ws - - before(() => { - ws = new WS() - }) - - describe('filter valid addrs for this transport', function () { - it('should fail invalid WS addresses', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/9090') - const ma2 = multiaddr('/ip4/127.0.0.1/udp/9090') - const ma3 = multiaddr('/ip6/::1/tcp/80') - const ma4 = multiaddr('/dnsaddr/ipfs.io/tcp/80') - - const valid = ws.filter([ma1, ma2, ma3, ma4]) - expect(valid.length).to.equal(0) - }) - - it('should filter correct ipv4 addresses', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv4 addresses with ipfs id', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/80/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv6 address', function () { - const ma1 = multiaddr('/ip6/::1/tcp/80/ws') - const ma2 = multiaddr('/ip6/::1/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv6 addresses with ipfs id', function () { - const ma1 = multiaddr('/ip6/::1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip6/::1/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns address', function () { - const ma1 = multiaddr('/dnsaddr/ipfs.io/ws') - const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws') - const ma3 = multiaddr('/dnsaddr/ipfs.io/tcp/80/wss') - - const valid = ws.filter([ma1, ma2, ma3]) - expect(valid.length).to.equal(3) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - expect(valid[2]).to.deep.equal(ma3) - }) - - it('should filter correct dns address with ipfs id', function () { - const ma1 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns4 address', function () { - const ma1 = multiaddr('/dns4/ipfs.io/tcp/80/ws') - const ma2 = multiaddr('/dns4/ipfs.io/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns6 address', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws') - const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns6 address with ipfs id', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter mixed addresses', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/9090') - const ma3 = multiaddr('/ip4/127.0.0.1/udp/9090') - const ma4 = multiaddr('/dns6/ipfs.io/ws') - const mh5 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + - '/p2p-circuit/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2, ma3, ma4, mh5]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma4) - }) - }) - - it('filter a single addr for this transport', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter(ma) - expect(valid.length).to.equal(1) - expect(valid[0]).to.deep.equal(ma) - done() - }) -}) - -describe('adapter valid Connection', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') - - it('get observed addrs', (done) => { - let dialerObsAddrs - let listenerObsAddrs - - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - listenerObsAddrs = addrs - - listener.close(onClose) - - function onClose () { - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) - done() - } - }) - } - }) - }) - - it('get Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - listener.close(done) - }) - } - }) - }) - - it('set Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('a') - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('a') - }) - - pull(conn, conn) - }) - - listener.listen(ma, onListen) - - function onListen () { - const conn = ws.dial(ma) - conn.setPeerInfo('b') - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('b') - listener.close(done) - }) - } - } - }) -}) diff --git a/test/browser.js b/test/browser.js index 78134c7..c3f1ff5 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,8 +13,6 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') -require('./adapter/browser') - describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws From cc53e7ac548960e4068eb9ed20e0417b384d5662 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:30:53 +0100 Subject: [PATCH 15/18] refactor: remove test covered by interface-transport tests License: MIT Signed-off-by: Alan Shaw --- test/node.js | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/test/node.js b/test/node.js index 14574a0..5ed3c9d 100644 --- a/test/node.js +++ b/test/node.js @@ -237,20 +237,6 @@ describe('dial', () => { throw new Error('connection was not aborted') }) - - it('should be abortable before connect', async () => { - const controller = new AbortController() - controller.abort() // Abort before connect - - try { - await ws.dial(ma, { signal: controller.signal }) - } catch (err) { - expect(err.type).to.equal('aborted') - return - } - - throw new Error('connection was not aborted') - }) }) describe('ip6', () => { From 20358d3d7dfdf4235b7b14e012162c7eaab593f7 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:42:17 +0100 Subject: [PATCH 16/18] refactor: use abort error from interface-transport License: MIT Signed-off-by: Alan Shaw --- src/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.js b/src/index.js index bf93f81..26de44a 100644 --- a/src/index.js +++ b/src/index.js @@ -6,8 +6,8 @@ const withIs = require('class-is') const toUri = require('multiaddr-to-uri') const log = require('debug')('libp2p:websockets:transport') const abortable = require('abortable-iterator') +const { AbortError } = require('interface-transport') const createListener = require('./listener') -const { AbortError } = abortable class WebSockets { async dial (ma, options) { From c97a0f37dd54bbe43aef96f72039089dcc36ee1a Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 20 Sep 2019 15:27:52 +0200 Subject: [PATCH 17/18] refactory: async with multiaddr conn --- .aegir.js | 6 +- .gitignore | 41 +----------- .travis.yml | 2 +- README.md | 40 ++++++------ package.json | 24 +++---- src/adapter.js | 17 ----- src/constants.js | 8 +++ src/index.js | 115 ++++++++++++++++++++++++---------- src/listener.js | 75 ++++++++++++++++------ src/socket-to-conn.js | 68 ++++++++++++++++++++ test/browser.js | 9 ++- test/compliance.node.js | 8 +-- test/fixtures/certificate.pem | 13 ++++ test/fixtures/key.pem | 15 +++++ test/node.js | 102 +++++++++++++++++++++--------- 15 files changed, 360 insertions(+), 183 deletions(-) delete mode 100644 src/adapter.js create mode 100644 src/constants.js create mode 100644 src/socket-to-conn.js create mode 100644 test/fixtures/certificate.pem create mode 100644 test/fixtures/key.pem diff --git a/.aegir.js b/.aegir.js index ddf424a..4d2f8bd 100644 --- a/.aegir.js +++ b/.aegir.js @@ -4,10 +4,14 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') const WS = require('./src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} let listener function boot (done) { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener(conn => pipe(conn, conn)) listener.listen(ma).then(() => done()).catch(done) diff --git a/.gitignore b/.gitignore index f338286..9faa8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,43 +1,4 @@ -docs +node_modules package-lock.json -yarn.lock - -# Logs -logs -*.log -npm-debug.log* - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul coverage .nyc_output - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory -node_modules - -# Optional npm cache directory -.npm - -# Optional REPL history -.node_repl_history - -# Vim editor swap files -*.swp - -dist diff --git a/.travis.yml b/.travis.yml index dba6e9d..ff17c4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ jobs: - stage: check script: - - npx aegir commitlint --travis + - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/README.md b/README.md index 57f4969..b99fd8a 100644 --- a/README.md +++ b/README.md @@ -40,37 +40,33 @@ ```js const WS = require('libp2p-websockets') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') +const addr = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') -const ws = new WS() +const ws = new WS({ upgrader }) const listener = ws.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - pull( - ws.dial(mh), - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) -}) +await listener.listen(addr) +console.log('listening') + +const socket = await ws.dial(addr) +const values = await pipe( + socket, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` ## API diff --git a/package.json b/package.json index 28b5051..2fc1407 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,7 @@ "dist" ], "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -39,24 +38,25 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { - "abortable-iterator": "^2.0.0", + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "debug": "^4.1.1", - "interface-connection": "~0.3.3", - "it-ws": "^2.1.0", - "mafmt": "^6.0.7", + "err-code": "^2.0.0", + "it-ws": "vasco-santos/it-ws#feat/add-properties-and-functions-to-client-and-server", + "libp2p-utils": "~0.1.0", + "mafmt": "^7.0.0", + "multiaddr": "^7.1.0", "multiaddr-to-uri": "^5.0.0" }, "devDependencies": { "abort-controller": "^3.0.0", - "aegir": "^20.0.0", + "aegir": "^20.3.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.6.1", - "it-goodbye": "^2.0.0", - "it-pipe": "^1.0.0", - "multiaddr": "^6.0.6", - "streaming-iterables": "^4.0.2" + "interface-transport": "^0.7.0", + "it-goodbye": "^2.0.1", + "it-pipe": "^1.0.1", + "streaming-iterables": "^4.1.0" }, "contributors": [ "Chris Campbell ", diff --git a/src/adapter.js b/src/adapter.js deleted file mode 100644 index c27d009..0000000 --- a/src/adapter.js +++ /dev/null @@ -1,17 +0,0 @@ -'use strict' - -const { Adapter } = require('interface-transport') -const withIs = require('class-is') -const WebSockets = require('./') - -// Legacy adapter to old transport & connection interface -class WebSocketsAdapter extends Adapter { - constructor () { - super(new WebSockets()) - } -} - -module.exports = withIs(WebSocketsAdapter, { - className: 'WebSockets', - symbolName: '@libp2p/js-libp2p-websockets/websockets' -}) diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..b7ab8fe --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// p2p multi-address code +exports.CODE_P2P = 421 +exports.CODE_CIRCUIT = 290 + +// Time to wait for a connection to close gracefully before destroying it manually +exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index 26de44a..3cbed18 100644 --- a/src/index.js +++ b/src/index.js @@ -4,24 +4,69 @@ const connect = require('it-ws/client') const mafmt = require('mafmt') const withIs = require('class-is') const toUri = require('multiaddr-to-uri') -const log = require('debug')('libp2p:websockets:transport') -const abortable = require('abortable-iterator') -const { AbortError } = require('interface-transport') +const { AbortError } = require('abortable-iterator') + +const log = require('debug')('libp2p:websockets') +const assert = require('assert') + const createListener = require('./listener') +const toConnection = require('./socket-to-conn') +const { CODE_CIRCUIT, CODE_P2P } = require('./constants') +/** + * @class WebSockets + */ class WebSockets { - async dial (ma, options) { - options = options || {} + /** + * @constructor + * @param {object} options + * @param {Upgrader} options.upgrader + */ + constructor ({ upgrader }) { + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') + this._upgrader = upgrader + } + + /** + * @async + * @param {Multiaddr} ma + * @param {object} [options] + * @param {AbortSignal} [options.signal] Used to abort dial requests + * @returns {Connection} An upgraded Connection + */ + async dial (ma, options = {}) { log('dialing %s', ma) - const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) - const getObservedAddrs = () => [ma] + const stream = await this._connect(ma, options) + const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + + const conn = await this._upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + return conn + } + + /** + * @private + * @param {Multiaddr} ma + * @param {object} [options] + * @param {AbortSignal} [options.signal] Used to abort dial requests + * @returns {Promise} Resolves a TCP Socket + */ + async _connect (ma, options = {}) { + if (options.signal && options.signal.aborted) { + throw new AbortError() + } + const cOpts = ma.toOptions() + log('dialing %s:%s', cOpts.host, cOpts.port) + + const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options)) if (!options.signal) { - socket.getObservedAddrs = getObservedAddrs - await socket.connected() + await rawSocket.connected() + log('connected %s', ma) - return socket + return rawSocket } // Allow abort via signal during connect @@ -29,7 +74,7 @@ class WebSockets { const abort = new Promise((resolve, reject) => { onAbort = () => { reject(new AbortError()) - socket.close() + rawSocket.close() } // Already aborted? @@ -38,45 +83,49 @@ class WebSockets { }) try { - await Promise.race([abort, socket.connected()]) + await Promise.race([abort, rawSocket.connected()]) } finally { options.signal.removeEventListener('abort', onAbort) } log('connected %s', ma) - return { - sink: async source => { - try { - await socket.sink(abortable(source, options.signal)) - } catch (err) { - // Re-throw non-aborted errors - if (err.type !== 'aborted') throw err - // Otherwise, this is fine... - await socket.close() - } - }, - source: abortable(socket.source, options.signal), - getObservedAddrs - } + return rawSocket } + /** + * Creates a Websockets listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + * @param {object} [options] + * @param {http.Server} [options.server] A pre-created Node.js HTTP/S server. + * @param {function (Connection)} handler + * @returns {Listener} A Websockets listener + */ createListener (options, handler) { - return createListener(options, handler) + if (typeof options === 'function') { + handler = options + options = {} + } + options = options || {} + + return createListener({ handler, upgrader: this._upgrader }, options) } + /** + * Takes a list of `Multiaddr`s and returns only valid Websockets addresses + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} Valid Websockets multiaddrs + */ filter (multiaddrs) { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { - if (ma.protoNames().includes('p2p-circuit')) { + if (ma.protoNames().includes(CODE_CIRCUIT)) { return false } - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') - } - - return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) || + mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P)) }) } } diff --git a/src/listener.js b/src/listener.js index 48d3300..309fa8b 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,36 +1,61 @@ 'use strict' -const multiaddr = require('multiaddr') +const EventEmitter = require('events') const os = require('os') +const multiaddr = require('multiaddr') const { createServer } = require('it-ws') -module.exports = (options, handler) => { - if (typeof options === 'function') { - handler = options - options = {} - } +const log = require('debug')('libp2p:websockets:listener') + +const { CODE_P2P } = require('./constants') +const toConnection = require('./socket-to-conn') + +module.exports = ({ handler, upgrader }, options = {}) => { + const listener = new EventEmitter() + + const server = createServer(options, async (stream, req) => { + const maConn = toConnection(stream, { + socket: req.socket + }) + + log('new inbound connection %s', maConn.remoteAddr) + + const conn = await upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) + + trackConn(server, maConn) - options = options || {} + if (handler) handler(conn) + listener.emit('connection', conn) + }) - const server = createServer(options, handler ? socket => { - socket.getObservedAddrs = () => [] - handler(socket) - } : null) + server + .on('listening', () => listener.emit('listening')) + .on('error', err => listener.emit('error', err)) + .on('close', () => listener.emit('close')) - let listeningMultiaddr + // Keep track of open connections to destroy in case of timeout + server.__connections = [] - const listen = server.listen - server.listen = ma => { + let peerId, listeningMultiaddr + + listener.close = () => { + server.__connections.forEach(maConn => maConn.close()) + return server.close() + } + + listener.listen = (ma) => { listeningMultiaddr = ma + peerId = listeningMultiaddr.getPeerId() - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') + if (peerId) { + ma = ma.decapsulateCode(CODE_P2P) } - return listen(ma.toOptions()) + return server.listen(ma.toOptions()) } - server.getAddrs = () => { + listener.getAddrs = () => { const multiaddrs = [] const address = server.address() @@ -46,7 +71,7 @@ module.exports = (options, handler) => { let m = listeningMultiaddr.decapsulate('tcp') m = m.encapsulate('/tcp/' + address.port + '/ws') if (listeningMultiaddr.getPeerId()) { - m = m.encapsulate('/ipfs/' + ipfsId) + m = m.encapsulate('/p2p/' + ipfsId) } if (m.toString().indexOf('0.0.0.0') !== -1) { @@ -66,5 +91,15 @@ module.exports = (options, handler) => { return multiaddrs } - return server + return listener +} + +function trackConn (server, maConn) { + server.__connections.push(maConn) + + const untrackConn = () => { + server.__connections = server.__connections.filter(c => c !== maConn) + } + + maConn.conn.once('close', untrackConn) } diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js new file mode 100644 index 0000000..84f984e --- /dev/null +++ b/src/socket-to-conn.js @@ -0,0 +1,68 @@ +'use strict' + +const abortable = require('abortable-iterator') +const { CLOSE_TIMEOUT } = require('./constants') +const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') + +const log = require('debug')('libp2p:websockets:socket') + +// Convert a stream into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = (stream, options = {}) => { + const socket = options.socket + + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await stream.sink(source) + } catch (err) { + // Re-throw non-aborted errors + if (err.type !== 'aborted') throw err + // Otherwise, this is fine... + await stream.close() + } + }, + + source: options.signal ? abortable(stream.source, options.signal) : stream.source, + + conn: socket, + + localAddr: undefined, + + // If the remote address was passed, use it - it may have the peer ID encapsulated + remoteAddr: options.remoteAddr || toMultiaddr(stream.remoteAddress, stream.remotePort), + + timeline: { open: Date.now() }, + + close () { + return new Promise(async (resolve) => { // eslint-disable-line no-async-promise-executor + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + + socket.terminate() + maConn.timeline.close = Date.now() + return resolve() + }, CLOSE_TIMEOUT) + + await stream.close() + + clearTimeout(timeout) + maConn.timeline.close = Date.now() + + resolve() + }) + } + } + + return maConn +} diff --git a/test/browser.js b/test/browser.js index c3f1ff5..f7079f1 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,13 +13,18 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws let conn beforeEach(async () => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) conn = await ws.dial(ma) }) @@ -60,6 +65,6 @@ describe('libp2p-websockets', () => { }) it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + expect(new WS({ upgrader: mockUpgrader }).createListener).to.throw() }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js index 13e4605..591026c 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -6,13 +6,13 @@ const multiaddr = require('multiaddr') const http = require('http') const WS = require('../src') -describe('compliance', () => { +describe('interface-transport compliance', () => { tests({ - async setup () { - const ws = new WS() + async setup ({ upgrader }) { // eslint-disable-line require-await + const ws = new WS({ upgrader }) const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/ip4/127.0.0.1/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] diff --git a/test/fixtures/certificate.pem b/test/fixtures/certificate.pem new file mode 100644 index 0000000..840776c --- /dev/null +++ b/test/fixtures/certificate.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICATCCAWoCCQDPufXH86n2QzANBgkqhkiG9w0BAQUFADBFMQswCQYDVQQGEwJu +bzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMB4XDTEyMDEwMTE0NDQwMFoXDTIwMDMxOTE0NDQwMFowRTELMAkG +A1UEBhMCbm8xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0 +IFdpZGdpdHMgUHR5IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAtrQ7 ++r//2iV/B6F+4boH0XqFn7alcV9lpjvAmwRXNKnxAoa0f97AjYPGNLKrjpkNXXhB +JROIdbRbZnCNeC5fzX1a+JCo7KStzBXuGSZr27TtFmcV4H+9gIRIcNHtZmJLnxbJ +sIhkGR8yVYdmJZe4eT5ldk1zoB1adgPF1hZhCBMCAwEAATANBgkqhkiG9w0BAQUF +AAOBgQCeWBEHYJ4mCB5McwSSUox0T+/mJ4W48L/ZUE4LtRhHasU9hiW92xZkTa7E +QLcoJKQiWfiLX2ysAro0NX4+V8iqLziMqvswnPzz5nezaOLE/9U/QvH3l8qqNkXu +rNbsW1h/IO6FV8avWFYVFoutUwOaZ809k7iMh2F2JMgXQ5EymQ== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/test/fixtures/key.pem b/test/fixtures/key.pem new file mode 100644 index 0000000..3649a93 --- /dev/null +++ b/test/fixtures/key.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQC2tDv6v//aJX8HoX7hugfReoWftqVxX2WmO8CbBFc0qfEChrR/ +3sCNg8Y0squOmQ1deEElE4h1tFtmcI14Ll/NfVr4kKjspK3MFe4ZJmvbtO0WZxXg +f72AhEhw0e1mYkufFsmwiGQZHzJVh2Yll7h5PmV2TXOgHVp2A8XWFmEIEwIDAQAB +AoGAAlVY8sHi/aE+9xT77twWX3mGHV0SzdjfDnly40fx6S1Gc7bOtVdd9DC7pk6l +3ENeJVR02IlgU8iC5lMHq4JEHPE272jtPrLlrpWLTGmHEqoVFv9AITPqUDLhB9Kk +Hjl7h8NYBKbr2JHKICr3DIPKOT+RnXVb1PD4EORbJ3ooYmkCQQDfknUnVxPgxUGs +ouABw1WJIOVgcCY/IFt4Ihf6VWTsxBgzTJKxn3HtgvE0oqTH7V480XoH0QxHhjLq +DrgobWU9AkEA0TRJ8/ouXGnFEPAXjWr9GdPQRZ1Use2MrFjneH2+Sxc0CmYtwwqL +Kr5kS6mqJrxprJeluSjBd+3/ElxURrEXjwJAUvmlN1OPEhXDmRHd92mKnlkyKEeX +OkiFCiIFKih1S5Y/sRJTQ0781nyJjtJqO7UyC3pnQu1oFEePL+UEniRztQJAMfav +AtnpYKDSM+1jcp7uu9BemYGtzKDTTAYfoiNF42EzSJiGrWJDQn4eLgPjY0T0aAf/ +yGz3Z9ErbhMm/Ysl+QJBAL4kBxRT8gM4ByJw4sdOvSeCCANFq8fhbgm8pGWlCPb5 +JGmX3/GHFM8x2tbWMGpyZP1DLtiNEFz7eCGktWK5rqE= +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/test/node.js b/test/node.js index 5ed3c9d..83d8f4d 100644 --- a/test/node.js +++ b/test/node.js @@ -2,6 +2,9 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' +const https = require('https') +const fs = require('fs') + const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect @@ -16,9 +19,14 @@ const WS = require('../src') require('./compliance.node') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('instantiate the transport', () => { it('create', () => { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) expect(ws).to.exist() }) }) @@ -29,7 +37,7 @@ describe('listen', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) it('listen, check for promise', async () => { @@ -91,7 +99,7 @@ describe('listen', () => { }) it('getAddrs on port 0 listen', async () => { - const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) + const addr = multiaddr('/ip4/127.0.0.1/tcp/0/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -101,7 +109,7 @@ describe('listen', () => { }) it('getAddrs from listening on 0.0.0.0', async () => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) + const addr = multiaddr('/ip4/0.0.0.0/tcp/9003/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -110,7 +118,7 @@ describe('listen', () => { }) it('getAddrs from listening on 0.0.0.0 and port 0', async () => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) + const addr = multiaddr('/ip4/0.0.0.0/tcp/0/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -119,8 +127,8 @@ describe('listen', () => { await listener.close() }) - it('getAddrs preserves IPFS Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('getAddrs preserves p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = ws.createListener((conn) => { }) await listener.listen(ma) @@ -136,7 +144,7 @@ describe('listen', () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) it('listen, check for promise', async () => { @@ -183,7 +191,7 @@ describe('dial', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) listener = ws.createListener(conn => pipe(conn, conn)) return listener.listen(ma) }) @@ -199,8 +207,8 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial with IPFS Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const conn = await ws.dial(ma) const s = goodbye({ source: ['hey'], sink: collect }) @@ -239,13 +247,41 @@ describe('dial', () => { }) }) + describe('ip4 with wss', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/wss') + + const server = https.createServer({ + cert: fs.readFileSync('./test/fixtures/certificate.pem'), + key: fs.readFileSync('./test/fixtures/key.pem') + }) + + beforeEach(() => { + ws = new WS({ upgrader: mockUpgrader }) + listener = ws.createListener({ server }, conn => pipe(conn, conn)) + return listener.listen(ma) + }) + + afterEach(() => listener.close()) + + it('dial', async () => { + const conn = await ws.dial(ma, { websocket: { rejectUnauthorized: false } }) + const s = goodbye({ source: ['hey'], sink: collect }) + + const result = await pipe(s, conn, s) + + expect(result).to.be.eql([Buffer.from('hey')]) + }) + }) + describe('ip6', () => { let ws let listener const ma = multiaddr('/ip6/::1/tcp/9091') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) listener = ws.createListener(conn => pipe(conn, conn)) return listener.listen(ma) }) @@ -261,8 +297,8 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial with IPFS Id', async () => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const conn = await ws.dial(ma) const s = goodbye({ @@ -280,7 +316,7 @@ describe('filter addrs', () => { let ws before(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) describe('filter valid addrs for this transport', function () { @@ -411,31 +447,35 @@ describe('filter addrs', () => { }) }) -describe('valid Connection', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') +describe.skip('valid localAddr and remoteAddr', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') - it('get observed addrs', async () => { - let dialerObsAddrs - let listenerObsAddrs + it('should resolve port 0', async () => { + const ws = new WS({ upgrader: mockUpgrader }) - const ws = new WS() + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise(resolve => { handled = resolve }) + const handler = conn => handled(conn) - const listener = ws.createListener(async conn => { - expect(conn).to.exist() - dialerObsAddrs = await conn.getObservedAddrs() - pipe(conn, conn) - }) + const listener = ws.createListener(handler) + // Listen on the multiaddr await listener.listen(ma) - const conn = await ws.dial(ma) - await pipe([], conn, consume) + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // Dial to that address + const dialerConn = await ws.dial(localAddrs[0]) - listenerObsAddrs = await conn.getObservedAddrs() + // Wait for the incoming dial to be handled + const listenerConn = await handlerPromise + // close the listener await listener.close() - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) + expect(dialerConn.localAddr.toString()).to.equal(listenerConn.remoteAddr.toString()) + expect(dialerConn.remoteAddr.toString()).to.equal(listenerConn.localAddr.toString()) }) }) From f610735dbc0ffdf11527e9a821993ed93a9d149c Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 24 Sep 2019 19:56:32 +0200 Subject: [PATCH 18/18] chore: address review --- package.json | 5 +-- src/index.js | 11 +++--- src/listener.js | 20 ++--------- src/socket-to-conn.js | 57 +++++++++++++----------------- test/browser.js | 2 +- test/node.js | 82 ++++++++++++------------------------------- 6 files changed, 60 insertions(+), 117 deletions(-) diff --git a/package.json b/package.json index 2fc1407..d795309 100644 --- a/package.json +++ b/package.json @@ -42,11 +42,12 @@ "class-is": "^1.1.0", "debug": "^4.1.1", "err-code": "^2.0.0", - "it-ws": "vasco-santos/it-ws#feat/add-properties-and-functions-to-client-and-server", + "it-ws": "vasco-santos/it-ws#v2.1.1-rc.0", "libp2p-utils": "~0.1.0", "mafmt": "^7.0.0", "multiaddr": "^7.1.0", - "multiaddr-to-uri": "^5.0.0" + "multiaddr-to-uri": "^5.0.0", + "p-timeout": "^3.2.0" }, "devDependencies": { "abort-controller": "^3.0.0", diff --git a/src/index.js b/src/index.js index 3cbed18..e152c14 100644 --- a/src/index.js +++ b/src/index.js @@ -37,8 +37,8 @@ class WebSockets { async dial (ma, options = {}) { log('dialing %s', ma) - const stream = await this._connect(ma, options) - const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal }) + const socket = await this._connect(ma, options) + const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal }) log('new outbound connection %s', maConn.remoteAddr) const conn = await this._upgrader.upgradeOutbound(maConn) @@ -51,7 +51,7 @@ class WebSockets { * @param {Multiaddr} ma * @param {object} [options] * @param {AbortSignal} [options.signal] Used to abort dial requests - * @returns {Promise} Resolves a TCP Socket + * @returns {Promise} Resolves a extended duplex iterable on top of a WebSocket */ async _connect (ma, options = {}) { if (options.signal && options.signal.aborted) { @@ -101,12 +101,11 @@ class WebSockets { * @param {function (Connection)} handler * @returns {Listener} A Websockets listener */ - createListener (options, handler) { + createListener (options = {}, handler) { if (typeof options === 'function') { handler = options options = {} } - options = options || {} return createListener({ handler, upgrader: this._upgrader }, options) } @@ -120,7 +119,7 @@ class WebSockets { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { - if (ma.protoNames().includes(CODE_CIRCUIT)) { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { return false } diff --git a/src/listener.js b/src/listener.js index 309fa8b..1104a8b 100644 --- a/src/listener.js +++ b/src/listener.js @@ -7,16 +7,13 @@ const { createServer } = require('it-ws') const log = require('debug')('libp2p:websockets:listener') -const { CODE_P2P } = require('./constants') const toConnection = require('./socket-to-conn') module.exports = ({ handler, upgrader }, options = {}) => { const listener = new EventEmitter() - const server = createServer(options, async (stream, req) => { - const maConn = toConnection(stream, { - socket: req.socket - }) + const server = createServer(options, async (stream) => { + const maConn = toConnection(stream) log('new inbound connection %s', maConn.remoteAddr) @@ -37,7 +34,7 @@ module.exports = ({ handler, upgrader }, options = {}) => { // Keep track of open connections to destroy in case of timeout server.__connections = [] - let peerId, listeningMultiaddr + let listeningMultiaddr listener.close = () => { server.__connections.forEach(maConn => maConn.close()) @@ -46,11 +43,6 @@ module.exports = ({ handler, upgrader }, options = {}) => { listener.listen = (ma) => { listeningMultiaddr = ma - peerId = listeningMultiaddr.getPeerId() - - if (peerId) { - ma = ma.decapsulateCode(CODE_P2P) - } return server.listen(ma.toOptions()) } @@ -96,10 +88,4 @@ module.exports = ({ handler, upgrader }, options = {}) => { function trackConn (server, maConn) { server.__connections.push(maConn) - - const untrackConn = () => { - server.__connections = server.__connections.filter(c => c !== maConn) - } - - maConn.conn.once('close', untrackConn) } diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js index 84f984e..d819995 100644 --- a/src/socket-to-conn.js +++ b/src/socket-to-conn.js @@ -4,13 +4,15 @@ const abortable = require('abortable-iterator') const { CLOSE_TIMEOUT } = require('./constants') const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') -const log = require('debug')('libp2p:websockets:socket') +const pTimeout = require('p-timeout') + +const debug = require('debug') +const log = debug('libp2p:websockets:socket') +log.error = debug('libp2p:websockets:socket:error') // Convert a stream into a MultiaddrConnection // https://github.com/libp2p/interface-transport#multiaddrconnection -module.exports = (stream, options = {}) => { - const socket = options.socket - +module.exports = (socket, options = {}) => { const maConn = { async sink (source) { if (options.signal) { @@ -18,49 +20,40 @@ module.exports = (stream, options = {}) => { } try { - await stream.sink(source) + await socket.sink(source) } catch (err) { - // Re-throw non-aborted errors - if (err.type !== 'aborted') throw err - // Otherwise, this is fine... - await stream.close() + if (err.type !== 'aborted') { + log.error(err) + } } }, - source: options.signal ? abortable(stream.source, options.signal) : stream.source, + source: options.signal ? abortable(socket.source, options.signal) : socket.source, conn: socket, - localAddr: undefined, + localAddr: options.localAddr || (socket.localAddress && socket.localPort + ? toMultiaddr(socket.localAddress, socket.localPort) : undefined), // If the remote address was passed, use it - it may have the peer ID encapsulated - remoteAddr: options.remoteAddr || toMultiaddr(stream.remoteAddress, stream.remotePort), + remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort), timeline: { open: Date.now() }, - close () { - return new Promise(async (resolve) => { // eslint-disable-line no-async-promise-executor - const start = Date.now() - - // Attempt to end the socket. If it takes longer to close than the - // timeout, destroy it manually. - const timeout = setTimeout(() => { - const { host, port } = maConn.remoteAddr.toOptions() - log('timeout closing socket to %s:%s after %dms, destroying it manually', - host, port, Date.now() - start) - - socket.terminate() - maConn.timeline.close = Date.now() - return resolve() - }, CLOSE_TIMEOUT) + async close () { + const start = Date.now() - await stream.close() + try { + await pTimeout(socket.close(), CLOSE_TIMEOUT) + } catch (err) { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) - clearTimeout(timeout) + socket.destroy() + } finally { maConn.timeline.close = Date.now() - - resolve() - }) + } } } diff --git a/test/browser.js b/test/browser.js index f7079f1..b3492a6 100644 --- a/test/browser.js +++ b/test/browser.js @@ -47,7 +47,7 @@ describe('libp2p-websockets', () => { }) it('many writes', async function () { - this.timeout(100000) + this.timeout(10000) const s = goodbye({ source: pipe( { diff --git a/test/node.js b/test/node.js index 83d8f4d..ec80930 100644 --- a/test/node.js +++ b/test/node.js @@ -11,9 +11,8 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') const goodbye = require('it-goodbye') -const { collect, consume } = require('streaming-iterables') +const { collect } = require('streaming-iterables') const pipe = require('it-pipe') -const AbortController = require('abort-controller') const WS = require('../src') @@ -217,33 +216,31 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) - it('should be abortable after connect', async () => { - const controller = new AbortController() - const conn = await ws.dial(ma, { signal: controller.signal }) - const s = goodbye({ - source: { - [Symbol.asyncIterator] () { - return this - }, - next () { - return new Promise(resolve => { - setTimeout(() => resolve(Math.random()), 1000) - }) - } - }, - sink: consume - }) + it('should resolve port 0', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') + const ws = new WS({ upgrader: mockUpgrader }) + + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise(resolve => { handled = resolve }) + const handler = conn => handled(conn) + + const listener = ws.createListener(handler) + + // Listen on the multiaddr + await listener.listen(ma) - setTimeout(() => controller.abort(), 500) + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) - try { - await pipe(s, conn, s) - } catch (err) { - expect(err.type).to.equal('aborted') - return - } + // Dial to that address + await ws.dial(localAddrs[0]) - throw new Error('connection was not aborted') + // Wait for the incoming dial to be handled + await handlerPromise + + // close the listener + await listener.close() }) }) @@ -446,36 +443,3 @@ describe('filter addrs', () => { done() }) }) - -describe.skip('valid localAddr and remoteAddr', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') - - it('should resolve port 0', async () => { - const ws = new WS({ upgrader: mockUpgrader }) - - // Create a Promise that resolves when a connection is handled - let handled - const handlerPromise = new Promise(resolve => { handled = resolve }) - const handler = conn => handled(conn) - - const listener = ws.createListener(handler) - - // Listen on the multiaddr - await listener.listen(ma) - - const localAddrs = listener.getAddrs() - expect(localAddrs.length).to.equal(1) - - // Dial to that address - const dialerConn = await ws.dial(localAddrs[0]) - - // Wait for the incoming dial to be handled - const listenerConn = await handlerPromise - - // close the listener - await listener.close() - - expect(dialerConn.localAddr.toString()).to.equal(listenerConn.remoteAddr.toString()) - expect(dialerConn.remoteAddr.toString()).to.equal(listenerConn.localAddr.toString()) - }) -})