Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit a1dfb3b

Browse files
committed
chore: address review
1 parent c97a0f3 commit a1dfb3b

File tree

6 files changed

+58
-115
lines changed

6 files changed

+58
-115
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
"libp2p-utils": "~0.1.0",
4747
"mafmt": "^7.0.0",
4848
"multiaddr": "^7.1.0",
49-
"multiaddr-to-uri": "^5.0.0"
49+
"multiaddr-to-uri": "^5.0.0",
50+
"p-timeout": "^3.2.0"
5051
},
5152
"devDependencies": {
5253
"abort-controller": "^3.0.0",

src/index.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class WebSockets {
3737
async dial (ma, options = {}) {
3838
log('dialing %s', ma)
3939

40-
const stream = await this._connect(ma, options)
41-
const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal })
40+
const socket = await this._connect(ma, options)
41+
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
4242
log('new outbound connection %s', maConn.remoteAddr)
4343

4444
const conn = await this._upgrader.upgradeOutbound(maConn)
@@ -51,7 +51,7 @@ class WebSockets {
5151
* @param {Multiaddr} ma
5252
* @param {object} [options]
5353
* @param {AbortSignal} [options.signal] Used to abort dial requests
54-
* @returns {Promise<Socket>} Resolves a TCP Socket
54+
* @returns {Promise<WebSocket>} Resolves a extended duplex iterable on top of a WebSocket
5555
*/
5656
async _connect (ma, options = {}) {
5757
if (options.signal && options.signal.aborted) {
@@ -101,12 +101,11 @@ class WebSockets {
101101
* @param {function (Connection)} handler
102102
* @returns {Listener} A Websockets listener
103103
*/
104-
createListener (options, handler) {
104+
createListener (options = {}, handler) {
105105
if (typeof options === 'function') {
106106
handler = options
107107
options = {}
108108
}
109-
options = options || {}
110109

111110
return createListener({ handler, upgrader: this._upgrader }, options)
112111
}

src/listener.js

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,13 @@ const { createServer } = require('it-ws')
77

88
const log = require('debug')('libp2p:websockets:listener')
99

10-
const { CODE_P2P } = require('./constants')
1110
const toConnection = require('./socket-to-conn')
1211

1312
module.exports = ({ handler, upgrader }, options = {}) => {
1413
const listener = new EventEmitter()
1514

16-
const server = createServer(options, async (stream, req) => {
17-
const maConn = toConnection(stream, {
18-
socket: req.socket
19-
})
15+
const server = createServer(options, async (stream) => {
16+
const maConn = toConnection(stream)
2017

2118
log('new inbound connection %s', maConn.remoteAddr)
2219

@@ -37,7 +34,7 @@ module.exports = ({ handler, upgrader }, options = {}) => {
3734
// Keep track of open connections to destroy in case of timeout
3835
server.__connections = []
3936

40-
let peerId, listeningMultiaddr
37+
let listeningMultiaddr
4138

4239
listener.close = () => {
4340
server.__connections.forEach(maConn => maConn.close())
@@ -46,11 +43,6 @@ module.exports = ({ handler, upgrader }, options = {}) => {
4643

4744
listener.listen = (ma) => {
4845
listeningMultiaddr = ma
49-
peerId = listeningMultiaddr.getPeerId()
50-
51-
if (peerId) {
52-
ma = ma.decapsulateCode(CODE_P2P)
53-
}
5446

5547
return server.listen(ma.toOptions())
5648
}
@@ -96,10 +88,4 @@ module.exports = ({ handler, upgrader }, options = {}) => {
9688

9789
function trackConn (server, maConn) {
9890
server.__connections.push(maConn)
99-
100-
const untrackConn = () => {
101-
server.__connections = server.__connections.filter(c => c !== maConn)
102-
}
103-
104-
maConn.conn.once('close', untrackConn)
10591
}

src/socket-to-conn.js

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,63 +4,56 @@ const abortable = require('abortable-iterator')
44
const { CLOSE_TIMEOUT } = require('./constants')
55
const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr')
66

7-
const log = require('debug')('libp2p:websockets:socket')
7+
const pTimeout = require('p-timeout')
8+
9+
const debug = require('debug')
10+
const log = debug('libp2p:websockets:socket')
11+
log.error = debug('libp2p:websockets:socket:error')
812

913
// Convert a stream into a MultiaddrConnection
1014
// https://github.com/libp2p/interface-transport#multiaddrconnection
11-
module.exports = (stream, options = {}) => {
12-
const socket = options.socket
13-
15+
module.exports = (socket, options = {}) => {
1416
const maConn = {
1517
async sink (source) {
1618
if (options.signal) {
1719
source = abortable(source, options.signal)
1820
}
1921

2022
try {
21-
await stream.sink(source)
23+
await socket.sink(source)
2224
} catch (err) {
23-
// Re-throw non-aborted errors
24-
if (err.type !== 'aborted') throw err
25-
// Otherwise, this is fine...
26-
await stream.close()
25+
if (err.type !== 'aborted') {
26+
log.error(err)
27+
}
2728
}
2829
},
2930

30-
source: options.signal ? abortable(stream.source, options.signal) : stream.source,
31+
source: options.signal ? abortable(socket.source, options.signal) : socket.source,
3132

3233
conn: socket,
3334

34-
localAddr: undefined,
35+
localAddr: options.localAddr || (socket.localAddress && socket.localPort
36+
? toMultiaddr(socket.localAddress, socket.localPort) : undefined),
3537

3638
// If the remote address was passed, use it - it may have the peer ID encapsulated
37-
remoteAddr: options.remoteAddr || toMultiaddr(stream.remoteAddress, stream.remotePort),
39+
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),
3840

3941
timeline: { open: Date.now() },
4042

41-
close () {
42-
return new Promise(async (resolve) => { // eslint-disable-line no-async-promise-executor
43-
const start = Date.now()
44-
45-
// Attempt to end the socket. If it takes longer to close than the
46-
// timeout, destroy it manually.
47-
const timeout = setTimeout(() => {
48-
const { host, port } = maConn.remoteAddr.toOptions()
49-
log('timeout closing socket to %s:%s after %dms, destroying it manually',
50-
host, port, Date.now() - start)
51-
52-
socket.terminate()
53-
maConn.timeline.close = Date.now()
54-
return resolve()
55-
}, CLOSE_TIMEOUT)
43+
async close () {
44+
const start = Date.now()
5645

57-
await stream.close()
46+
try {
47+
await pTimeout(socket.close(), CLOSE_TIMEOUT)
48+
} catch (err) {
49+
const { host, port } = maConn.remoteAddr.toOptions()
50+
log('timeout closing socket to %s:%s after %dms, destroying it manually',
51+
host, port, Date.now() - start)
5852

59-
clearTimeout(timeout)
53+
socket.destroy()
54+
} finally {
6055
maConn.timeline.close = Date.now()
61-
62-
resolve()
63-
})
56+
}
6457
}
6558
}
6659

test/browser.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ describe('libp2p-websockets', () => {
4747
})
4848

4949
it('many writes', async function () {
50-
this.timeout(100000)
50+
this.timeout(10000)
5151
const s = goodbye({
5252
source: pipe(
5353
{

test/node.js

Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ const expect = chai.expect
1111
chai.use(dirtyChai)
1212
const multiaddr = require('multiaddr')
1313
const goodbye = require('it-goodbye')
14-
const { collect, consume } = require('streaming-iterables')
14+
const { collect } = require('streaming-iterables')
1515
const pipe = require('it-pipe')
16-
const AbortController = require('abort-controller')
1716

1817
const WS = require('../src')
1918

@@ -217,33 +216,31 @@ describe('dial', () => {
217216
expect(result).to.be.eql([Buffer.from('hey')])
218217
})
219218

220-
it('should be abortable after connect', async () => {
221-
const controller = new AbortController()
222-
const conn = await ws.dial(ma, { signal: controller.signal })
223-
const s = goodbye({
224-
source: {
225-
[Symbol.asyncIterator] () {
226-
return this
227-
},
228-
next () {
229-
return new Promise(resolve => {
230-
setTimeout(() => resolve(Math.random()), 1000)
231-
})
232-
}
233-
},
234-
sink: consume
235-
})
219+
it('should resolve port 0', async () => {
220+
const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws')
221+
const ws = new WS({ upgrader: mockUpgrader })
222+
223+
// Create a Promise that resolves when a connection is handled
224+
let handled
225+
const handlerPromise = new Promise(resolve => { handled = resolve })
226+
const handler = conn => handled(conn)
227+
228+
const listener = ws.createListener(handler)
229+
230+
// Listen on the multiaddr
231+
await listener.listen(ma)
236232

237-
setTimeout(() => controller.abort(), 500)
233+
const localAddrs = listener.getAddrs()
234+
expect(localAddrs.length).to.equal(1)
238235

239-
try {
240-
await pipe(s, conn, s)
241-
} catch (err) {
242-
expect(err.type).to.equal('aborted')
243-
return
244-
}
236+
// Dial to that address
237+
await ws.dial(localAddrs[0])
245238

246-
throw new Error('connection was not aborted')
239+
// Wait for the incoming dial to be handled
240+
await handlerPromise
241+
242+
// close the listener
243+
await listener.close()
247244
})
248245
})
249246

@@ -446,36 +443,3 @@ describe('filter addrs', () => {
446443
done()
447444
})
448445
})
449-
450-
describe.skip('valid localAddr and remoteAddr', () => {
451-
const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws')
452-
453-
it('should resolve port 0', async () => {
454-
const ws = new WS({ upgrader: mockUpgrader })
455-
456-
// Create a Promise that resolves when a connection is handled
457-
let handled
458-
const handlerPromise = new Promise(resolve => { handled = resolve })
459-
const handler = conn => handled(conn)
460-
461-
const listener = ws.createListener(handler)
462-
463-
// Listen on the multiaddr
464-
await listener.listen(ma)
465-
466-
const localAddrs = listener.getAddrs()
467-
expect(localAddrs.length).to.equal(1)
468-
469-
// Dial to that address
470-
const dialerConn = await ws.dial(localAddrs[0])
471-
472-
// Wait for the incoming dial to be handled
473-
const listenerConn = await handlerPromise
474-
475-
// close the listener
476-
await listener.close()
477-
478-
expect(dialerConn.localAddr.toString()).to.equal(listenerConn.remoteAddr.toString())
479-
expect(dialerConn.remoteAddr.toString()).to.equal(listenerConn.localAddr.toString())
480-
})
481-
})

0 commit comments

Comments
 (0)