diff --git a/.aegir.cjs b/.aegir.cjs new file mode 100644 index 0000000..d7af96f --- /dev/null +++ b/.aegir.cjs @@ -0,0 +1,63 @@ +'use strict' + +const wrtc = require('wrtc') + +// TODO: Temporary fix per wrtc issue +// https://github.com/node-webrtc/node-webrtc/issues/636#issuecomment-774171409 +process.on('beforeExit', (code) => process.exit(code)) + +const ECHO_PROTOCOL = '/echo/1.0.0' + +async function before () { + const { WebRTCDirect } = await import('./dist/src/index.js') + const { pipe } = await import('it-pipe') + const { Multiaddr } = await import('@multiformats/multiaddr') + const { mockUpgrader, mockRegistrar } = await import('@libp2p/interface-compliance-tests/mocks') + + const REMOTE_MULTIADDR_IP4 = new Multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') + const REMOTE_MULTIADDR_IP6 = new Multiaddr('/ip6/::1/tcp/12346/http/p2p-webrtc-direct') + + const registrar = mockRegistrar() + void registrar.handle(ECHO_PROTOCOL, ({ stream }) => { + void pipe( + stream, + stream + ).catch() + }) + const upgrader = mockUpgrader({ + registrar + }) + + const wd = new WebRTCDirect({ + wrtc + }) + + const listeners = await Promise.all( + [REMOTE_MULTIADDR_IP4, REMOTE_MULTIADDR_IP6].map(async ma => { + const listener = wd.createListener({ + upgrader + }) + await listener.listen(ma) + + return listener + }) + ) + + return { + listeners + } +} + +async function after (testOptions, beforeReturn) { + await Promise.all( + beforeReturn.listeners.map(listener => listener.close()) + ) +} + +/** @type {import('aegir').PartialOptions} */ +module.exports = { + test: { + before, + after + } +} diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index c4f0352..0000000 --- a/.aegir.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const WebRTCDirect = require('./src') -const pipe = require('it-pipe') -const { Multiaddr } = require('multiaddr') - -const ma = new Multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') - -const mockUpgrader = { - upgradeInbound: maConn => maConn, - upgradeOutbound: maConn => maConn -} - -function before () { - const wd = new WebRTCDirect({ upgrader: mockUpgrader }) - const listener = wd.createListener((conn) => pipe(conn, conn)) - listener.on('listening', () => { - console.log('listener started on:', ma.toString()) - }) - listener.on('error', console.error) - listener.listen(ma) - - return { - listener - } -} - -async function after (testOptions, beforeReturn) { - await beforeReturn.listener.close() - // TODO: Temporary fix per wrtc issue - // https://github.com/node-webrtc/node-webrtc/issues/636 - process.exit(0) -} - -module.exports = { - test: { - before, - after - } -} diff --git a/.github/dependabot.yml b/.github/dependabot.yml index de46e32..290ad02 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -4,5 +4,5 @@ updates: directory: "/" schedule: interval: daily - time: "11:00" + time: "10:00" open-pull-requests-limit: 10 diff --git a/.github/workflows/automerge.yml b/.github/workflows/automerge.yml index d94b021..13da9c1 100644 --- a/.github/workflows/automerge.yml +++ b/.github/workflows/automerge.yml @@ -11,22 +11,22 @@ jobs: outputs: status: ${{ steps.should-automerge.outputs.status }} steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - name: Check if we should automerge - id: should-automerge - run: | - for commit in $(git rev-list --first-parent origin/${{ github.event.pull_request.base.ref }}..${{ github.event.pull_request.head.sha }}); do - committer=$(git show --format=$'%ce' -s $commit) - echo "Committer: $committer" - if [[ "$committer" != "web3-bot@users.noreply.github.com" ]]; then - echo "Commit $commit wasn't committed by web3-bot, but by $committer." - echo "::set-output name=status::false" - exit - fi - done - echo "::set-output name=status::true" + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Check if we should automerge + id: should-automerge + run: | + for commit in $(git rev-list --first-parent origin/${{ github.event.pull_request.base.ref }}..${{ github.event.pull_request.head.sha }}); do + committer=$(git show --format=$'%ce' -s $commit) + echo "Committer: $committer" + if [[ "$committer" != "web3-bot@users.noreply.github.com" ]]; then + echo "Commit $commit wasn't committed by web3-bot, but by $committer." + echo "::set-output name=status::false" + exit + fi + done + echo "::set-output name=status::true" automerge: needs: automerge-check runs-on: ubuntu-latest @@ -34,17 +34,17 @@ jobs: # but it prevents this job from spinning up, just to be skipped shortly after. if: github.event.pull_request.user.login == 'web3-bot' && needs.automerge-check.outputs.status == 'true' steps: - - name: Wait on tests - uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2 - with: - ref: ${{ github.event.pull_request.head.sha }} - repo-token: ${{ secrets.GITHUB_TOKEN }} - wait-interval: 10 - running-workflow-name: 'automerge' # the name of this job - - name: Merge PR - uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1 - env: - GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" - MERGE_LABELS: "" - MERGE_METHOD: "squash" - MERGE_DELETE_BRANCH: true + - name: Wait on tests + uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2 + with: + ref: ${{ github.event.pull_request.head.sha }} + repo-token: ${{ secrets.GITHUB_TOKEN }} + wait-interval: 10 + running-workflow-name: 'automerge' # the name of this job + - name: Merge PR + uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1 + env: + GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" + MERGE_LABELS: "" + MERGE_METHOD: "squash" + MERGE_DELETE_BRANCH: true diff --git a/.github/workflows/js-test-and-release.yml b/.github/workflows/js-test-and-release.yml index 2987bd2..8630dc5 100644 --- a/.github/workflows/js-test-and-release.yml +++ b/.github/workflows/js-test-and-release.yml @@ -12,14 +12,13 @@ jobs: check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-node@v2 - with: - node-version: lts/* - - run: npm install -g @mapbox/node-pre-gyp - - uses: ipfs/aegir/actions/cache-node-modules@master - - run: npm run --if-present lint - - run: npm run --if-present dep-check + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: lts/* + - uses: ipfs/aegir/actions/cache-node-modules@master + - run: npm run --if-present lint + - run: npm run --if-present dep-check test-node: needs: check @@ -34,7 +33,6 @@ jobs: - uses: actions/setup-node@v2 with: node-version: ${{ matrix.node }} - - run: npm install -g @mapbox/node-pre-gyp - uses: ipfs/aegir/actions/cache-node-modules@master - run: npm run --if-present test:node - uses: codecov/codecov-action@f32b3a3741e1053eb607407145bc9619351dc93b # v2.1.0 @@ -42,21 +40,6 @@ jobs: directory: ./.nyc_output flags: node - test-browser: - needs: check - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-node@v2 - with: - node-version: lts/* - - uses: ipfs/aegir/actions/cache-node-modules@master - - run: npm run --if-present test:browser - - uses: codecov/codecov-action@f32b3a3741e1053eb607407145bc9619351dc93b # v2.1.0 - with: - directory: ./.nyc_output - flags: browser - test-chrome: needs: check runs-on: ubuntu-latest @@ -148,7 +131,7 @@ jobs: flags: electron-renderer release: - needs: [test-node, test-browser, test-chrome, test-chrome-webworker, test-firefox, test-firefox-webworker, test-electron-main, test-electron-renderer] + needs: [test-node, test-chrome, test-chrome-webworker, test-firefox, test-firefox-webworker, test-electron-main, test-electron-renderer] runs-on: ubuntu-latest if: github.event_name == 'push' && github.ref == 'refs/heads/master' # with #262 - 'refs/heads/${{{ github.default_branch }}}' steps: diff --git a/LICENSE b/LICENSE index bbfffbf..20ce483 100644 --- a/LICENSE +++ b/LICENSE @@ -1,21 +1,4 @@ -MIT License +This project is dual licensed under MIT and Apache-2.0. -Copyright (c) 2017 libp2p - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +MIT: https://www.opensource.org/licenses/mit +Apache-2.0: https://www.apache.org/licenses/license-2.0 diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..14478a3 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,5 @@ +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..72dc60d --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md index e0b239b..87977f9 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ -# js-libp2p-webrtc-direct +# js-libp2p-webrtc-direct [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai) [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) [![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io) [![](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-webrtc-direct.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-webrtc-direct) -[![](https://img.shields.io/travis/libp2p/js-libp2p-webrtc-direct.svg?style=flat-square)](https://travis-ci.com/libp2p/js-libp2p-webrtc-direct) +[![Build Status](https://github.com/libp2p/js-libp2p-webrtc-direct/actions/workflows/js-test-and-release.yml/badge.svg?branch=main)](https://github.com/libp2p/js-libp2p-webrtc-direct/actions/workflows/js-test-and-release.yml) [![Dependency Status](https://david-dm.org/libp2p/js-libp2p-webrtc-direct.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-webrtc-direct) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) ![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/packages/libp2p-interfaces/src/connection/img/badge.png) @@ -13,24 +13,20 @@ > A WebRTC transport built for libp2p (not mandatory to use with libp2p) that doesn't require the set up a signalling server. Caveat, you can only establish Browser to Node.js and Node.js to Node.js connections. -## Lead Maintainer - -[Vasco Santos](https://github.com/vasco-santos). - -## Table of Contents +## Table of Contents - [Install](#install) - - [npm](#npm) - [Usage](#usage) - [API](#api) - + - [Transport](#transport) + - [Connection](#connection) +- [Contribute](#contribute) +- [License](#license) ## Install -### npm - ```bash -> npm install libp2p-webrtc-direct +> npm install @libp2p/webrtc-direct ``` **NOTE:** To run build scripts `node-pre-gyp` is required. You can install it by running `npm install -g node-pre-gyp`. @@ -38,40 +34,43 @@ ## Usage ```js -const WebRTCDirect = require('libp2p-webrtc-direct') -const multiaddr = require('multiaddr') -const pipe = require('pull-stream') -const { collect } = require('streaming-iterables') - -;(async () => { - const addr = new multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') - const upgrader = { - upgradeInbound: async maConn => maConn, - upgradeOutbound: async maConn => maConn, - } - const webRTCDirect = new WebRTCDirect({upgrader}) - - const listener = webRTCDirect.createListener((socket) => { +import { WebRTCDirect } from '@libp2p/webrtc-direct' +import { Multiaddr } from '@multiformats/multiaddr' +import { pipe } from 'it-pipe' +import all from 'it-all' + +const ECHO_PROTOCOL = '/echo/1.0.0' +const addr = new Multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') +const webRTCDirect = new WebRTCDirect() + +const listener = webRTCDirect.createListener({ + handler: (connection) => { console.log('new connection opened') - pipe( - ['hello'], - socket - ) - }) - - await listener.listen(addr) - console.log('listening') - - const conn = await webRTCDirect.dial(addr) - const values = await pipe( - conn, - collect - ) - console.log(`Value: ${values.toString()}`) - - // Close connection after reading - await listener.close() -})() + + connection.newStream([ECHO_PROTOCOL]) + .then(({ stream }) => { + void pipe(stream, stream) + }) + }, + upgrader +}) + +await listener.listen(addr) +console.log('listening') + +const connection = await webRTCDirect.dial(addr, { + upgrader +}) +const { stream } = await connection.newStream([ECHO_PROTOCOL]) +const values = await pipe( + [uint8arrayFromString('hello')], + stream, + (source) => all(source) +) +console.log(`Value: ${uint8ArrayToString(values[0])}`) + +// Close connection after reading +await listener.close() ``` Outputs: @@ -103,4 +102,4 @@ The libp2p implementation in JavaScript is a work in progress. As such, there ar ## License -[MIT](LICENSE) © Protocol Labs +[MIT](LICENCE-MIT) & [Apache](LICENCE-APACHE) - Protocol Labs 2019 diff --git a/package.json b/package.json index b67008c..8e20a94 100644 --- a/package.json +++ b/package.json @@ -1,82 +1,183 @@ { - "name": "libp2p-webrtc-direct", + "name": "@libp2p/webrtc-direct", "version": "0.7.1", "description": "Dial using WebRTC without the need to set up any Signalling Rendezvous Point! ", - "leadMaintainer": "Vasco Santos ", - "main": "src/index.js", - "browser": { - "wrtc": false, - "http": false, - "request": "xhr" - }, - "scripts": { - "lint": "aegir lint", - "build": "aegir build", - "docs": "aegir docs", - "test": "aegir test --target node --target browser --timeout 60000 -- --exit", - "test:node": "aegir test --target node --timeout 60000 -- --exit", - "test:browser": "aegir test --target browser --timeout 60000 -- --exit", - "release": "aegir release --target node --target browser", - "release-minor": "aegir release --type minor --target node --target browser", - "release-major": "aegir release --type major --target node --target browser", - "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" - }, + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p-webrtc-direct#readme", "repository": { "type": "git", "url": "git+https://github.com/libp2p/js-libp2p-webrtc-direct.git" }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p-webrtc-direct/issues" + }, "keywords": [ - "libp2p", - "webrtc", + "connection", "dial", + "libp2p", "stream", - "connection" + "webrtc" ], - "license": "MIT", - "bugs": { - "url": "https://github.com/libp2p/js-libp2p-webrtc-direct/issues" - }, - "homepage": "https://github.com/libp2p/js-libp2p-webrtc-direct#readme", "engines": { - "node": ">=14.0.0" + "node": ">=16.0.0", + "npm": ">=7.0.0" }, - "devDependencies": { - "aegir": "^36.0.0", - "chai": "^4.3.4", - "dirty-chai": "^2.0.1", - "it-pipe": "^1.1.0", - "libp2p-interfaces-compliance-tests": "https://gitpkg.now.sh/libp2p/js-libp2p-interfaces/packages/compliance-tests?chore/skip-abort-while-reading-for-webrtc", - "multiaddr": "^10.0.0", - "streaming-iterables": "^6.0.0", - "util": "^0.12.4", - "webrtcsupport": "^2.2.0" + "type": "module", + "types": "types/src/index.d.ts", + "typesVersions": { + "*": { + "*": [ + "types/*", + "types/src/*" + ], + "types/*": [ + "types/*", + "types/src/*" + ] + } + }, + "files": [ + "src", + "dist", + "!**/*.tsbuildinfo", + "!**/browser-test", + "!**/node-test", + "!**/test" + ], + "exports": { + ".": { + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "sourceType": "module" + } + }, + "release": { + "branches": [ + "master" + ], + "plugins": [ + [ + "@semantic-release/commit-analyzer", + { + "preset": "conventionalcommits", + "releaseRules": [ + { + "breaking": true, + "release": "major" + }, + { + "revert": true, + "release": "patch" + }, + { + "type": "feat", + "release": "minor" + }, + { + "type": "fix", + "release": "patch" + }, + { + "type": "chore", + "release": "patch" + }, + { + "type": "docs", + "release": "patch" + }, + { + "type": "test", + "release": "patch" + }, + { + "scope": "no-release", + "release": false + } + ] + } + ], + [ + "@semantic-release/release-notes-generator", + { + "preset": "conventionalcommits", + "presetConfig": { + "types": [ + { + "type": "feat", + "section": "Features" + }, + { + "type": "fix", + "section": "Bug Fixes" + }, + { + "type": "chore", + "section": "Trivial Changes" + }, + { + "type": "docs", + "section": "Trivial Changes" + }, + { + "type": "test", + "section": "Tests" + } + ] + } + } + ], + "@semantic-release/changelog", + [ + "@semantic-release/npm", + { + "pkgRoot": "dist" + } + ], + "@semantic-release/github", + "@semantic-release/git" + ] + }, + "scripts": { + "lint": "aegir lint", + "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", + "build": "tsc", + "pretest": "npm run build", + "test": "aegir test -f \"./dist/test/**/*.spec.js\"", + "test:node": "npm run test -- -t node -f ./dist/test/node.js", + "test:chrome": "npm run test -- -t browser -f ./dist/test/browser.js ", + "test:firefox": "npm run test -- -t browser -- --browser firefox -f ./dist/test/browser.js", + "release": "semantic-release" }, "dependencies": { - "abortable-iterator": "^3.0.0", - "class-is": "^1.1.0", - "concat-stream": "^2.0.0", - "debug": "^4.3.1", - "detect-node": "^2.0.4", + "@libp2p/interfaces": "^1.3.18", + "@libp2p/logger": "^1.1.2", + "@libp2p/utils": "^1.0.9" , + "@libp2p/webrtc-peer": "^1.0.5", + "@multiformats/mafmt": "^11.0.2", + "@multiformats/multiaddr": "^10.1.7", + "abortable-iterator": "^4.0.2", "err-code": "^3.0.0", - "libp2p-utils": "^0.4.1", - "libp2p-webrtc-peer": "^10.0.1", - "mafmt": "^10.0.0", "multiformats": "^9.4.5", "native-fetch": "^4.0.2", - "once": "^1.4.0", - "stream-to-it": "^0.2.2", "uint8arrays": "^3.0.0", - "undici": "^4.14.1", - "wrtc": "~0.4.6", - "xhr": "^2.5.0" + "undici": "^4.16.0", + "wherearewe": "^1.0.1" }, - "contributors": [ - "Vasco Santos ", - "David Dias ", - "Michiel De Backker <38858977+backkem@users.noreply.github.com>", - "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ ", - "Diogo Silva ", - "Dmitriy Ryajov ", - "Alex Potsides " - ] + "devDependencies": { + "@libp2p/interface-compliance-tests": "^1.1.20", + "@mapbox/node-pre-gyp": "^1.0.8", + "aegir": "^36.0.0", + "it-all": "^1.0.6", + "it-pipe": "^2.0.3", + "multiaddr": "^10.0.0", + "p-event": "^5.0.1", + "wrtc": "^0.4.6" + }, + "browser": { + "./dist/src/listener.js": "./dist/src/listener.browser.js" + } } diff --git a/src/constants.js b/src/constants.ts similarity index 53% rename from src/constants.js rename to src/constants.ts index aaf2bc9..6762b67 100644 --- a/src/constants.js +++ b/src/constants.ts @@ -1,9 +1,8 @@ -'use strict' // p2p multi-address code -exports.CODE_P2P = 421 -exports.CODE_CIRCUIT = 290 +export const CODE_P2P = 421 +export const CODE_CIRCUIT = 290 // Time to wait for a connection to close gracefully before destroying it // manually -module.exports.CLOSE_TIMEOUT = 2000 +export const CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js deleted file mode 100644 index 6e1d626..0000000 --- a/src/index.js +++ /dev/null @@ -1,192 +0,0 @@ -'use strict' - -const assert = require('debug') -const debug = require('debug') -const log = debug('libp2p:webrtcdirect') -log.error = debug('libp2p:webrtcdirect:error') -const errcode = require('err-code') - -const wrtc = require('wrtc') -const SimplePeer = require('libp2p-webrtc-peer') -const isNode = require('detect-node') -const mafmt = require('mafmt') -const { base58btc } = require('multiformats/bases/base58') -const { fetch } = require('native-fetch') -const withIs = require('class-is') -const { AbortError } = require('abortable-iterator') -const { toString } = require('uint8arrays/to-string') -const { fromString } = require('uint8arrays/from-string') - -const { CODE_CIRCUIT, CODE_P2P } = require('./constants') -const toConnection = require('./socket-to-conn') -const createListener = require('./listener') - -function noop () {} - -/** - * @typedef {import('multiaddr').Multiaddr} Multiaddr - */ - -class WebRTCDirect { - /** - * @class - * @param {object} options - * @param {Upgrader} options.upgrader - */ - constructor ({ upgrader }) { - assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') - this._upgrader = upgrader - } - - /** - * @param {Multiaddr} ma - * @param {object} options - * @param {AbortSignal} options.signal - Used to abort dial requests - * @returns {Promise} An upgraded Connection - */ - async dial (ma, options = {}) { - const socket = await this._connect(ma, options) - const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal }) - log('new outbound connection %s', maConn.remoteAddr) - const conn = await this._upgrader.upgradeOutbound(maConn) - log('outbound connection %s upgraded', maConn.remoteAddr) - return conn - } - - /** - * @private - * @param {Multiaddr} ma - * @param {object} options - * @param {AbortSignal} options.signal - Used to abort dial requests - * @returns {Promise} Resolves a SimplePeer Webrtc channel - */ - _connect (ma, options = {}) { - if (options.signal && options.signal.aborted) { - throw new AbortError() - } - - options = { - initiator: true, - trickle: false, - wrtc: isNode ? wrtc : undefined, - ...options - } - - return new Promise((resolve, reject) => { - const start = Date.now() - let connected - - const cOpts = ma.toOptions() - log('Dialing %s:%s', cOpts.host, cOpts.port) - - const channel = new SimplePeer(options) - - const onError = (err) => { - if (!connected) { - const msg = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` - - log.error(msg) - err.message = msg - done(err) - } - } - - const onTimeout = () => { - log('connnection timeout %s:%s', cOpts.host, cOpts.port) - const err = errcode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') - // Note: this will result in onError() being called - channel.emit('error', err) - } - - const onConnect = () => { - connected = true - - log('connection opened %s:%s', cOpts.host, cOpts.port) - done(null) - } - - const onAbort = () => { - log.error('connection aborted %s:%s', cOpts.host, cOpts.port) - channel.destroy() - done(new AbortError()) - } - - const done = (err) => { - channel.removeListener('error', onError) - channel.removeListener('timeout', onTimeout) - channel.removeListener('connect', onConnect) - channel.removeAllListeners('signal') - options.signal && options.signal.removeEventListener('abort', onAbort) - - err ? reject(err) : resolve(channel) - } - - channel.once('error', onError) - channel.once('timeout', onTimeout) - channel.once('connect', onConnect) - channel.on('close', () => channel.destroy()) - options.signal && options.signal.addEventListener('abort', onAbort) - - channel.on('signal', async (signal) => { - const signalStr = JSON.stringify(signal) - const url = 'http://' + cOpts.host + ':' + cOpts.port - const path = '/?signal=' + base58btc.encode(fromString(signalStr)) - const uri = url + path - - try { - const res = await fetch(uri) - const incSignalBuf = base58btc.decode(await res.text()) - const incSignalStr = toString(incSignalBuf) - const incSignal = JSON.parse(incSignalStr) - channel.signal(incSignal) - } catch (err) { - reject(err) - } - }) - }) - } - - /** - * Creates a WebrtcDirect listener. The provided `handler` function will be called - * anytime a new incoming Connection has been successfully upgraded via - * `upgrader.upgradeInbound`. - * - * @param {*} [options] - * @param {function(Connection)} handler - * @returns {Listener} A WebrtcDirect listener - */ - createListener (options = {}, handler) { - if (!isNode) { - throw errcode(new Error('Can\'t listen if run from the Browser'), 'ERR_NO_SUPPORT_FROM_BROWSER') - } - - if (typeof options === 'function') { - handler = options - options = {} - } - - handler = handler || noop - - return createListener({ handler, upgrader: this._upgrader }, options) - } - - /** - * Takes a list of `Multiaddr`s and returns only valid addresses - * - * @param {Multiaddr[]} multiaddrs - * @returns {Multiaddr[]} Valid multiaddrs - */ - filter (multiaddrs) { - multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] - - return multiaddrs.filter((ma) => { - if (ma.protoCodes().includes(CODE_CIRCUIT)) { - return false - } - - return mafmt.WebRTCDirect.matches(ma.decapsulateCode(CODE_P2P)) - }) - } -} - -module.exports = withIs(WebRTCDirect, { className: 'WebRTCDirect', symbolName: '@libp2p/js-libp2p-webrtc-direct/webrtcdirect' }) diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..0e412f3 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,200 @@ +import { logger } from '@libp2p/logger' +import * as mafmt from '@multiformats/mafmt' +import { base58btc } from 'multiformats/bases/base58' +import { fetch } from 'native-fetch' +import { AbortError } from 'abortable-iterator' +import { toString } from 'uint8arrays/to-string' +import { fromString } from 'uint8arrays/from-string' +import { CODE_CIRCUIT, CODE_P2P } from './constants.js' +import { toMultiaddrConnection } from './socket-to-conn.js' +import { createListener } from './listener.js' +import { Signal, WebRTCInitiator, WebRTCInitiatorInit, WebRTCReceiverInit, WRTC } from '@libp2p/webrtc-peer' +import { symbol } from '@libp2p/interfaces/transport' +import type { CreateListenerOptions, DialOptions, Listener, Transport } from '@libp2p/interfaces/transport' +import type { Multiaddr } from '@multiformats/multiaddr' + +const log = logger('libp2p:webrtc-direct') + +export interface WebRTCDirectInit { + wrtc?: WRTC + initiatorOptions?: WebRTCInitiatorInit + recieverOptions?: WebRTCReceiverInit +} + +export class WebRTCDirect implements Transport { + private readonly initiatorOptions?: WebRTCInitiatorInit + private readonly recieverOptions?: WebRTCReceiverInit + public wrtc?: WRTC + + constructor (init?: WebRTCDirectInit) { + this.initiatorOptions = init?.initiatorOptions + this.recieverOptions = init?.recieverOptions + this.wrtc = init?.wrtc + } + + get [symbol] (): true { + return true + } + + get [Symbol.toStringTag] () { + return '@libp2p/webrtc-direct' + } + + async dial (ma: Multiaddr, options: DialOptions) { + const socket = await this._connect(ma, options) + const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + const conn = await options.upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + return conn + } + + async _connect (ma: Multiaddr, options: DialOptions) { + if (options.signal?.aborted === true) { + throw new AbortError() + } + + const channelOptions = { + initiator: true, + trickle: false, + ...this.initiatorOptions + } + + // Use custom WebRTC implementation + if (this.wrtc != null) { + channelOptions.wrtc = this.wrtc + } + + return await new Promise((resolve, reject) => { + let connected: boolean + + const cOpts = ma.toOptions() + log('Dialing %s:%s', cOpts.host, cOpts.port) + + const channel = new WebRTCInitiator(channelOptions) + + const onError = (evt: CustomEvent) => { + const err = evt.detail + + if (!connected) { + const msg = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` + + log.error(msg) + err.message = msg + done(err) + } + } + + const onReady = () => { + connected = true + + log('connection opened %s:%s', cOpts.host, cOpts.port) + done() + } + + const onAbort = () => { + log.error('connection aborted %s:%s', cOpts.host, cOpts.port) + void channel.close().finally(() => { + done(new AbortError()) + }) + } + + const done = (err?: Error) => { + channel.removeEventListener('error', onError) + channel.removeEventListener('ready', onReady) + options.signal?.removeEventListener('abort', onAbort) + + if (err != null) { + reject(err) + } else { + resolve(channel) + } + } + + channel.addEventListener('error', onError, { + once: true + }) + channel.addEventListener('ready', onReady, { + once: true + }) + channel.addEventListener('close', () => { + channel.removeEventListener('error', onError) + }) + options.signal?.addEventListener('abort', onAbort) + + const onSignal = async (signal: Signal) => { + if (signal.type !== 'offer') { + // skip candidates, just send the offer as it includes the candidates + return + } + + const signalStr = JSON.stringify(signal) + + let host = cOpts.host + + if (cOpts.family === 6 && !host.startsWith('[')) { + host = `[${host}]` + } + + const url = `http://${host}:${cOpts.port}` + const path = `/?signal=${base58btc.encode(fromString(signalStr))}` + const uri = url + path + + try { + const res = await fetch(uri) + const body = await res.text() + + if (body.trim() === '') { + // no response to this signal + return + } + + const incSignalBuf = base58btc.decode(body) + const incSignalStr = toString(incSignalBuf) + const incSignal = JSON.parse(incSignalStr) + + channel.handleSignal(incSignal) + } catch (err: any) { + await channel.close(err) + reject(err) + } + } + + channel.addEventListener('signal', (evt) => { + const signal = evt.detail + + void onSignal(signal).catch(async err => { + await channel.close(err) + }) + }) + }) + } + + /** + * Creates a WebrtcDirect listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + */ + createListener (options: CreateListenerOptions): Listener { + return createListener({ + ...options, + receiverOptions: this.recieverOptions, + wrtc: this.wrtc + }) + } + + /** + * Takes a list of `Multiaddr`s and returns only valid addresses + */ + filter (multiaddrs: Multiaddr[]): Multiaddr[] { + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] + + return multiaddrs.filter((ma) => { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { + return false + } + + return mafmt.WebRTCDirect.matches(ma.decapsulateCode(CODE_P2P)) + }) + } +} diff --git a/src/listener.browser.ts b/src/listener.browser.ts new file mode 100644 index 0000000..ff0c0d6 --- /dev/null +++ b/src/listener.browser.ts @@ -0,0 +1,4 @@ + +export function createListener () { + throw new Error('WebRTCDirect Servers can not be created in the browser!') +} diff --git a/src/listener.js b/src/listener.js deleted file mode 100644 index 60031b1..0000000 --- a/src/listener.js +++ /dev/null @@ -1,132 +0,0 @@ -'use strict' - -const http = require('http') -const EventEmitter = require('events') -const debug = require('debug') -const log = debug('libp2p:webrtcdirect:listener') -log.error = debug('libp2p:webrtcdirect:listener:error') - -const isNode = require('detect-node') -const wrtc = require('wrtc') -const SimplePeer = require('libp2p-webrtc-peer') -const { base58btc } = require('multiformats/bases/base58') -const { toString } = require('uint8arrays/to-string') -const { fromString } = require('uint8arrays/from-string') -const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') - -const toConnection = require('./socket-to-conn') - -module.exports = ({ handler, upgrader }, options = {}) => { - const listener = new EventEmitter() - const server = http.createServer() - - let maSelf - - // Keep track of open connections to destroy in case of timeout - listener.__connections = [] - - server.on('request', async (req, res) => { - res.setHeader('Content-Type', 'text/plain') - res.setHeader('Access-Control-Allow-Origin', '*') - - const path = req.url - const incSignalStr = path.split('?signal=')[1] - const incSignalBuf = base58btc.decode(incSignalStr) - const incSignal = JSON.parse(toString(incSignalBuf)) - - options = { - trickle: false, - wrtc: isNode ? wrtc : undefined, - ...options - } - - const channel = new SimplePeer(options) - - const maConn = toConnection(channel, { - remoteAddr: toMultiaddr(req.connection.remoteAddress, req.connection.remotePort) - }) - log('new inbound connection %s', maConn.remoteAddr) - - channel.on('error', (err) => { - log.error(`incoming connectioned errored with ${err}`) - }) - channel.once('close', () => { - channel.removeAllListeners('error') - }) - channel.on('signal', (signal) => { - const signalStr = JSON.stringify(signal) - const signalEncoded = base58btc.encode(fromString(signalStr)) - res.end(Buffer.from(signalEncoded)) - }) - - channel.signal(incSignal) - - let conn - try { - conn = await upgrader.upgradeInbound(maConn) - } catch (err) { - log.error('inbound connection failed to upgrade', err) - return maConn.close() - } - log('inbound connection %s upgraded', maConn.remoteAddr) - - trackConn(listener, maConn) - - channel.on('connect', () => { - listener.emit('connection', conn) - handler(conn) - - channel.removeAllListeners('connect') - channel.removeAllListeners('signal') - }) - }) - - server.on('error', (err) => listener.emit('error', err)) - server.on('close', () => listener.emit('close')) - - listener.listen = (ma) => { - maSelf = ma - const lOpts = ma.toOptions() - - return new Promise((resolve, reject) => { - server.on('listening', (err) => { - if (err) { - return reject(err) - } - - listener.emit('listening') - log('Listening on %s %s', lOpts.port, lOpts.host) - resolve() - }) - - server.listen(lOpts) - }) - } - - listener.close = async () => { - if (!server.listening) { - return - } - - await Promise.all(listener.__connections.map(c => c.close())) - return new Promise((resolve, reject) => { - server.close((err) => err ? reject(err) : resolve()) - }) - } - - listener.getAddrs = () => { - return [maSelf] - } - - return listener -} - -function trackConn (listener, maConn) { - listener.__connections.push(maConn) - - const untrackConn = () => { - listener.__connections = listener.__connections.filter(c => c !== maConn) - } - - maConn.conn.once('close', untrackConn) -} diff --git a/src/listener.ts b/src/listener.ts new file mode 100644 index 0000000..fdba86d --- /dev/null +++ b/src/listener.ts @@ -0,0 +1,243 @@ +import http from 'http' +import { logger } from '@libp2p/logger' +import { base58btc } from 'multiformats/bases/base58' +import { toString } from 'uint8arrays/to-string' +import { fromString } from 'uint8arrays/from-string' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { IncomingMessage, ServerResponse } from 'http' +import { EventEmitter, CustomEvent } from '@libp2p/interfaces' +import type { Connection } from '@libp2p/interfaces/connection' +import type { Listener, CreateListenerOptions, MultiaddrConnection, ConnectionHandler, ListenerEvents, Upgrader } from '@libp2p/interfaces/transport' +import { ipPortToMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' +import { toMultiaddrConnection } from './socket-to-conn.js' +import { Signal, WebRTCReceiver, WebRTCReceiverInit, WRTC } from '@libp2p/webrtc-peer' +import errCode from 'err-code' +import { pEvent } from 'p-event' + +const log = logger('libp2p:webrtc-direct:listener') + +interface WebRTCDirectListenerOptions extends CreateListenerOptions { + receiverOptions?: WebRTCReceiverInit + wrtc?: WRTC +} + +interface WebRTCDirectServerEvents { + 'error': CustomEvent + 'listening': CustomEvent + 'connection': CustomEvent +} + +class WebRTCDirectServer extends EventEmitter { + private readonly server: http.Server + private readonly wrtc?: WRTC + private readonly receiverOptions?: WebRTCReceiverInit + private connections: MultiaddrConnection[] + private channels: WebRTCReceiver[] + + constructor (multiaddr: Multiaddr, wrtc?: WRTC, receiverOptions?: WebRTCReceiverInit) { + super() + + this.connections = [] + this.channels = [] + this.wrtc = wrtc + this.receiverOptions = receiverOptions + this.server = http.createServer() + + this.server.on('request', (req: IncomingMessage, res: ServerResponse) => { + void this.processRequest(req, res).catch(err => { + log.error(err) + }) + }) + + this.server.on('error', (err) => this.dispatchEvent(new CustomEvent('error', { detail: err }))) + + const lOpts = multiaddr.toOptions() + + this.server.on('listening', (err: Error) => { + if (err != null) { + this.dispatchEvent(new CustomEvent('error', { detail: err })) + + return + } + + this.dispatchEvent(new CustomEvent('listening')) + log('Listening on %s %s', lOpts.port, lOpts.host) + }) + + this.server.listen(lOpts) + } + + async processRequest (req: IncomingMessage, res: ServerResponse) { + const remoteAddress = req?.socket?.remoteAddress + const remotePort = req?.socket.remotePort + const remoteHost = req.headers.host + const requestUrl = req.url + + if (remoteAddress == null || remotePort == null || requestUrl == null || remoteHost == null) { + const err = new Error('Invalid listener request. Specify request\'s url, remoteAddress, remotePort.') + log.error(err) + res.writeHead(500) + res.end(err) + return + } + res.setHeader('Content-Type', 'text/plain') + res.setHeader('Access-Control-Allow-Origin', '*') + + const url = new URL(requestUrl, `http://${remoteHost}`) + const incSignalStr = url.searchParams.get('signal') + + if (incSignalStr == null) { + const err = new Error('Invalid listener request. Signal not found.') + log.error(err) + res.writeHead(500) + res.end(err) + return + } + + const incSignalBuf = base58btc.decode(incSignalStr) + const incSignal: Signal = JSON.parse(toString(incSignalBuf)) + + if (incSignal.type !== 'offer') { + // offers contain candidates so only respond to the offer + res.end() + return + } + + const channel = new WebRTCReceiver({ + wrtc: this.wrtc, + ...this.receiverOptions + }) + this.channels.push(channel) + + channel.addEventListener('signal', (evt) => { + const signal = evt.detail + const signalStr = JSON.stringify(signal) + const signalEncoded = base58btc.encode(fromString(signalStr)) + + res.end(signalEncoded) + }) + channel.addEventListener('error', (evt) => { + const err = evt.detail + + log.error('incoming connection errored with', err) + res.end() + void channel.close().catch(err => { + log.error(err) + }) + }) + channel.addEventListener('ready', () => { + const maConn = toMultiaddrConnection(channel, { + remoteAddr: ipPortToMultiaddr(remoteAddress, remotePort) + }) + log('new inbound connection %s', maConn.remoteAddr) + + this.connections.push(maConn) + + const untrackConn = () => { + this.connections = this.connections.filter(c => c !== maConn) + this.channels = this.channels.filter(c => c !== channel) + } + + channel.addEventListener('close', untrackConn, { + once: true + }) + + this.dispatchEvent(new CustomEvent('connection', { detail: maConn })) + }) + + channel.handleSignal(incSignal) + } + + async close () { + await Promise.all( + this.channels.map(async channel => await channel.close()) + ) + + await new Promise((resolve, reject) => { + this.server.close((err) => { + if (err != null) { + return reject(err) + } + + resolve() + }) + }) + } +} + +class WebRTCDirectListener extends EventEmitter implements Listener { + private server?: WebRTCDirectServer + private multiaddr?: Multiaddr + private readonly wrtc?: WRTC + private readonly receiverOptions?: WebRTCReceiverInit + private readonly handler?: ConnectionHandler + private readonly upgrader: Upgrader + + constructor (upgrader: Upgrader, wrtc?: WRTC, receiverOptions?: WebRTCReceiverInit, handler?: ConnectionHandler) { + super() + + this.upgrader = upgrader + this.wrtc = wrtc + this.receiverOptions = receiverOptions + this.handler = handler + } + + async listen (multiaddr: Multiaddr) { + // Should only be used if not already listening + if (this.multiaddr != null) { + throw errCode(new Error('listener already in use'), 'ERR_ALREADY_LISTENING') + } + + this.multiaddr = multiaddr + const server = new WebRTCDirectServer(multiaddr, this.wrtc, this.receiverOptions) + this.server = server + + this.server.addEventListener('connection', (evt) => { + void this.onConnection(evt.detail).catch(err => { + log.error(err) + }) + }) + + await pEvent(server, 'listening') + + this.dispatchEvent(new CustomEvent('listening')) + } + + async onConnection (maConn: MultiaddrConnection) { + let connection: Connection + + try { + connection = await this.upgrader.upgradeInbound(maConn) + } catch (err) { + log.error('inbound connection failed to upgrade', err) + return await maConn.close() + } + log('inbound connection %s upgraded', maConn.remoteAddr) + + if (this.handler != null) { + this.handler(connection) + } + + this.dispatchEvent(new CustomEvent('connection', { detail: connection })) + } + + async close () { + if (this.server != null) { + await this.server.close() + } + + this.dispatchEvent(new CustomEvent('close')) + } + + getAddrs () { + if (this.multiaddr != null) { + return [this.multiaddr] + } + + return [] + } +} + +export function createListener (options: WebRTCDirectListenerOptions) { + return new WebRTCDirectListener(options.upgrader, options.wrtc, options.receiverOptions, options.handler) +} diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js deleted file mode 100644 index 026f373..0000000 --- a/src/socket-to-conn.js +++ /dev/null @@ -1,114 +0,0 @@ -'use strict' - -const abortable = require('abortable-iterator') -const toIterable = require('stream-to-it') - -const { CLOSE_TIMEOUT } = require('./constants') -const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') - -const debug = require('debug') -const log = debug('libp2p:webrtcdirect:socket') -log.error = debug('libp2p:webrtcdirect:socket:error') - -// Convert a socket into a MultiaddrConnection -// https://github.com/libp2p/interface-transport#multiaddrconnection - -module.exports = (socket, options = {}) => { - const { sink, source } = toIterable.duplex(socket) - - const maConn = { - async sink (source) { - if (options.signal) { - source = abortable(source, options.signal) - } - - try { - await sink((async function * () { - for await (const chunk of source) { - // Convert BufferList to Buffer - yield chunk instanceof Uint8Array ? chunk : chunk.slice() - } - })()) - } catch (err) { - // If aborted we can safely ignore - if (err.type !== 'aborted') { - // If the source errored the socket will already have been destroyed by - // toIterable.duplex(). If the socket errored it will already be - // destroyed. There's nothing to do here except log the error & return. - log.error(err) - } - } - }, - - source: options.signal ? abortable(source, options.signal) : source, - - conn: socket, - - localAddr: toLocalAddr(socket), - - // If the remote address was passed, use it - it may have the peer ID encapsulated - remoteAddr: options.remoteAddr, - - timeline: { open: Date.now() }, - - close () { - if (socket.destroyed) return - - return new Promise((resolve, reject) => { - const start = Date.now() - - // Attempt to end the socket. If it takes longer to close than the - // timeout, destroy it manually. - const timeout = setTimeout(() => { - if (maConn.remoteAddr) { - const { host, port } = maConn.remoteAddr.toOptions() - log('timeout closing socket to %s:%s after %dms, destroying it manually', - host, port, Date.now() - start) - } - - if (!socket.destroyed) { - socket.destroy() - } - }, CLOSE_TIMEOUT) - - socket.once('close', () => { - resolve() - }) - - socket.end((err) => { - clearTimeout(timeout) - - maConn.timeline.close = Date.now() - if (err) return reject(err) - }) - }) - } - } - - socket.once('close', () => { - // In instances where `close` was not explicitly called, - // such as an iterable stream ending, ensure we have set the close - // timeline - if (!maConn.timeline.close) { - maConn.timeline.close = Date.now() - } - }) - - return maConn -} - -/** - * Get local multiaddr from socket. - * - * @param {SimplePeer} socket - * @returns {Multiaddr|undefined} - */ -function toLocalAddr (socket) { - if (socket.localAddress && socket.localPort) { - try { - return toMultiaddr(socket.localAddress, socket.localPort) - } catch { - // Might fail if the socket.localAddress is fqdn - } - } -} diff --git a/src/socket-to-conn.ts b/src/socket-to-conn.ts new file mode 100644 index 0000000..bfe2bbd --- /dev/null +++ b/src/socket-to-conn.ts @@ -0,0 +1,86 @@ +import { abortableSource } from 'abortable-iterator' +import { CLOSE_TIMEOUT } from './constants.js' +import { logger } from '@libp2p/logger' +import type { MultiaddrConnection } from '@libp2p/interfaces/transport' +import type { WebRTCPeer } from '@libp2p/webrtc-peer' +import type { AbortOptions } from '@libp2p/interfaces' +import type { Multiaddr } from '@multiformats/multiaddr' + +const log = logger('libp2p:webrtc-direct:socket') + +export interface ToMultiaddrConnectionOptions extends AbortOptions { + remoteAddr: Multiaddr +} + +export function toMultiaddrConnection (socket: WebRTCPeer, options: ToMultiaddrConnectionOptions): MultiaddrConnection { + const { sink, source } = socket + + const maConn: MultiaddrConnection = { + remoteAddr: options.remoteAddr, + + async sink (source) { + if (options.signal != null) { + source = abortableSource(source, options.signal) + } + + try { + await sink(source) + } catch (err: any) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log.error(err) + } + } + }, + + source: (options.signal != null) ? abortableSource(source, options.signal) : source, + + timeline: { open: Date.now() }, + + async close () { + if (socket.closed) { + return + } + + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + if (maConn.remoteAddr != null) { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + } + + if (!socket.closed) { + socket.close().catch(err => { + log.error('could not close socket', err) + }) + } + }, CLOSE_TIMEOUT) + + try { + await socket.close() + } finally { + clearTimeout(timeout) + } + } + } + + socket.addEventListener('close', () => { + // In instances where `close` was not explicitly called, + // such as an iterable stream ending, ensure we have set the close + // timeline + if (maConn.timeline.close == null) { + maConn.timeline.close = Date.now() + } + }, { + once: true + }) + + return maConn +} diff --git a/test/browser.ts b/test/browser.ts new file mode 100644 index 0000000..44b95ea --- /dev/null +++ b/test/browser.ts @@ -0,0 +1,15 @@ + +import listenTests from './listen.js' +import dialTests from './dial.js' +import { WebRTCDirect } from '../src/index.js' + +describe('browser RTC', () => { + const create = async () => { + const ws = new WebRTCDirect() + + return ws + } + + dialTests(create) + listenTests(create) +}) diff --git a/test/compliance.js b/test/compliance.js deleted file mode 100644 index 16f5299..0000000 --- a/test/compliance.js +++ /dev/null @@ -1,30 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const tests = require('libp2p-interfaces-compliance-tests/src/transport') -const { Multiaddr } = require('multiaddr') - -const WDirect = require('../src') - -describe('interface-transport compliance', () => { - tests({ - setup ({ upgrader }) { - const ws = new WDirect({ upgrader }) - - const addrs = [ - new Multiaddr('/ip4/127.0.0.1/tcp/22222/http/p2p-webrtc-direct'), - new Multiaddr('/ip4/127.0.0.1/tcp/33333/http/p2p-webrtc-direct'), - new Multiaddr('/ip4/127.0.0.1/tcp/44444/http/p2p-webrtc-direct'), - new Multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') - ] - - // Used by the dial tests to simulate a delayed connect - const connector = { - delay () {}, - restore () {} - } - - return { transport: ws, addrs, connector } - } - }) -}) diff --git a/test/compliance.ts b/test/compliance.ts new file mode 100644 index 0000000..f2979ce --- /dev/null +++ b/test/compliance.ts @@ -0,0 +1,33 @@ +/* eslint-env mocha */ + +import tests from '@libp2p/interface-compliance-tests/transport' +import { Multiaddr } from '@multiformats/multiaddr' +import type { WebRTCDirect } from '../src/index.js' + +export default (create: () => Promise) => { + describe('interface-transport compliance', function () { + this.timeout(20 * 1000) + + tests({ + async setup () { + const ws = await create() + + const addrs = [ + new Multiaddr('/ip4/127.0.0.1/tcp/22222/http/p2p-webrtc-direct'), + new Multiaddr('/ip4/127.0.0.1/tcp/33333/http/p2p-webrtc-direct'), + new Multiaddr('/ip4/127.0.0.1/tcp/44444/http/p2p-webrtc-direct'), + new Multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') + ] + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay () {}, + restore () {} + } + + return { transport: ws, addrs, connector } + }, + async teardown () {} + }) + }) +} diff --git a/test/dial.spec.js b/test/dial.spec.js deleted file mode 100644 index 61db6cd..0000000 --- a/test/dial.spec.js +++ /dev/null @@ -1,62 +0,0 @@ -/* eslint-env mocha */ - -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const { Multiaddr } = require('multiaddr') - -const pipe = require('it-pipe') -const { collect } = require('streaming-iterables') -const { fromString } = require('uint8arrays/from-string') - -const WebRTCDirect = require('../src') - -const mockUpgrader = { - upgradeInbound: maConn => maConn, - upgradeOutbound: maConn => maConn -} - -describe('dial', function () { - this.timeout(20 * 1000) - - const ma = new Multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') - let wd - - before(() => { - wd = new WebRTCDirect({ upgrader: mockUpgrader }) - }) - - it('dial on IPv4', async () => { - const conn = await wd.dial(ma) - const data = fromString('some data') - - const values = await pipe( - [data], - conn, - collect - ) - - expect(values).to.eql([data]) - }) - - it('dial offline / non-existent node on IPv4, check callback', async () => { - const maOffline = new Multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') - - try { - await wd.dial(maOffline, { config: {} }) - } catch (err) { - expect(err).to.exist() - return - } - - throw new Error('dial did not fail') - }) - - it.skip('dial on IPv6', () => { - // TODO IPv6 not supported yet - }) -}) diff --git a/test/dial.ts b/test/dial.ts new file mode 100644 index 0000000..927ec75 --- /dev/null +++ b/test/dial.ts @@ -0,0 +1,107 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/utils/chai.js' +import { Multiaddr } from '@multiformats/multiaddr' +import { pipe } from 'it-pipe' +import all from 'it-all' +import { fromString } from 'uint8arrays/from-string' +import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import type { WebRTCDirect } from '../src/index.js' +import type { Upgrader } from '@libp2p/interfaces/transport' + +// this node is started in aegir.cjs before the test run +const REMOTE_MULTIADDR_IP4 = new Multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') +const REMOTE_MULTIADDR_IP6 = new Multiaddr('/ip6/::1/tcp/12346/http/p2p-webrtc-direct') + +const ECHO_PROTOCOL = '/echo/1.0.0' + +export default (create: () => Promise) => { + describe('dial', function () { + this.timeout(20 * 1000) + + let upgrader: Upgrader + + beforeEach(() => { + const protocol = '/echo/1.0.0' + const registrar = mockRegistrar() + void registrar.handle(protocol, ({ stream }) => { + void pipe( + stream, + stream + ) + }) + upgrader = mockUpgrader({ + registrar + }) + }) + + it('dial on IPv4', async () => { + const wd = await create() + const conn = await wd.dial(REMOTE_MULTIADDR_IP4, { upgrader }) + const { stream } = await conn.newStream(ECHO_PROTOCOL) + const data = fromString('some data') + + const values = await pipe( + [data], + stream, + async (source) => await all(source) + ) + + expect(values).to.deep.equal([data]) + await conn.close() + }) + + it('dials the same server twice', async () => { + const wd = await create() + const conn1 = await wd.dial(REMOTE_MULTIADDR_IP4, { upgrader }) + const conn2 = await wd.dial(REMOTE_MULTIADDR_IP4, { upgrader }) + + const values = await Promise.all( + [conn1, conn2].map(async conn => { + const { stream } = await conn1.newStream(ECHO_PROTOCOL) + const data = fromString('some data ' + conn.id) + + const values = await pipe( + [data], + stream, + async (source) => await all(source) + ) + + return values + }) + ) + + expect(values).to.deep.equal([[ + fromString('some data ' + conn1.id) + ], [ + fromString('some data ' + conn2.id) + ]]) + + await conn1.close() + await conn2.close() + }) + + it('dial offline / non-existent node on IPv4, check callback', async () => { + const wd = await create() + const maOffline = new Multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') + + await expect(wd.dial(maOffline, { upgrader })).to.eventually.be.rejected() + }) + + it('dial on IPv6', async () => { + const wd = await create() + const conn = await wd.dial(REMOTE_MULTIADDR_IP6, { upgrader }) + const { stream } = await conn.newStream(['/echo/1.0.0']) + const data = fromString('some data') + + const values = await pipe( + [data], + stream, + async (source) => await all(source) + ) + + expect(values).to.deep.equal([data]) + await conn.close() + }) + }) +} diff --git a/test/filter.spec.js b/test/filter.spec.ts similarity index 66% rename from test/filter.spec.js rename to test/filter.spec.ts index 9c82985..d5354e5 100644 --- a/test/filter.spec.js +++ b/test/filter.spec.ts @@ -1,22 +1,12 @@ /* eslint-env mocha */ -'use strict' -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const { Multiaddr } = require('multiaddr') - -const WebRTCDirect = require('../src') - -const mockUpgrader = { - upgradeInbound: maConn => maConn, - upgradeOutbound: maConn => maConn -} +import { expect } from 'aegir/utils/chai.js' +import { Multiaddr } from '@multiformats/multiaddr' +import { WebRTCDirect } from '../src/index.js' describe('filter', () => { it('filters non valid webrtc-direct multiaddrs', () => { - const wd = new WebRTCDirect({ upgrader: mockUpgrader }) + const wd = new WebRTCDirect() const maArr = [ new Multiaddr('/ip4/1.2.3.4/tcp/3456/http/p2p-webrtc-direct'), new Multiaddr('/ip4/127.0.0.1/tcp/9090/ws'), @@ -31,10 +21,10 @@ describe('filter', () => { }) it('filter a single addr for this transport', () => { - const wd = new WebRTCDirect({ upgrader: mockUpgrader }) + const wd = new WebRTCDirect() const ma = new Multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') - const filtered = wd.filter(ma) + const filtered = wd.filter([ma]) expect(filtered.length).to.equal(1) }) }) diff --git a/test/instance.spec.js b/test/instance.spec.js deleted file mode 100644 index 681a01f..0000000 --- a/test/instance.spec.js +++ /dev/null @@ -1,27 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const WebRTCDirect = require('../src') - -const mockUpgrader = { - upgradeInbound: maConn => maConn, - upgradeOutbound: maConn => maConn -} - -describe('instances', () => { - it('create', (done) => { - const wdirect = new WebRTCDirect({ upgrader: mockUpgrader }) - expect(wdirect).to.exist() - done() - }) - - it('create without new throws', (done) => { - expect(() => WebRTCDirect()).to.throw() - done() - }) -}) diff --git a/test/instance.spec.ts b/test/instance.spec.ts new file mode 100644 index 0000000..4b42089 --- /dev/null +++ b/test/instance.spec.ts @@ -0,0 +1,18 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/utils/chai.js' +import { WebRTCDirect } from '../src/index.js' + +describe('instances', () => { + it('create', (done) => { + const wdirect = new WebRTCDirect() + expect(wdirect).to.exist() + done() + }) + + it('create without new throws', (done) => { + // @ts-expect-error need new keyword + expect(() => WebRTCDirect()).to.throw() + done() + }) +}) diff --git a/test/listen.js b/test/listen.js deleted file mode 100644 index 6336cfa..0000000 --- a/test/listen.js +++ /dev/null @@ -1,127 +0,0 @@ -/* eslint-env mocha */ - -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const { Multiaddr } = require('multiaddr') -const pipe = require('it-pipe') - -const WebRTCDirect = require('../src') - -const mockUpgrader = { - upgradeInbound: maConn => maConn, - upgradeOutbound: maConn => maConn -} - -describe('listen', () => { - let wd - - const ma = new Multiaddr('/ip4/127.0.0.1/tcp/20123/http/p2p-webrtc-direct') - - before(() => { - wd = new WebRTCDirect({ upgrader: mockUpgrader }) - }) - - it('listen, check for promise', async () => { - const listener = wd.createListener({ config: {} }, (_) => { }) - - await listener.listen(ma) - await listener.close() - }) - - it('listen, check for listening event', (done) => { - const listener = wd.createListener({ config: {} }, (conn) => {}) - - listener.once('listening', async () => { - await listener.close() - done() - }) - listener.listen(ma) - }) - - it('listen, check for the close event', (done) => { - const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.listen(ma).then(() => { - listener.once('close', done) - - listener.close() - }) - }) - - it.skip('listen in 0.0.0.0', (done) => { - // TODO - }) - - it.skip('listen in port 0', (done) => { - // TODO - }) - - it.skip('listen on IPv6 addr', (done) => { - // TODO IPv6 not supported yet - }) - - it('getAddrs', async () => { - const listener = wd.createListener({ config: {} }, (conn) => {}) - - await listener.listen(ma) - - const addrs = listener.getAddrs() - expect(addrs[0]).to.deep.equal(ma) - - await listener.close() - }) - - it('should untrack conn after being closed', async function () { - this.timeout(20e3) - - const ma1 = new Multiaddr('/ip4/127.0.0.1/tcp/12346/http/p2p-webrtc-direct') - - const wd1 = new WebRTCDirect({ upgrader: mockUpgrader }) - const listener1 = wd1.createListener((conn) => pipe(conn, conn)) - - await listener1.listen(ma1) - expect(listener1.__connections).to.have.lengthOf(0) - - const conn = await wd.dial(ma1) - expect(listener1.__connections).to.have.lengthOf(1) - - await conn.close() - - // wait for listener to know of the disconnect - await new Promise((resolve) => { - setTimeout(resolve, 1000) - }) - - expect(listener1.__connections).to.have.lengthOf(0) - - await listener1.close() - }) - - it('should have remoteAddress in listener connection', async function () { - this.timeout(20e3) - - const ma1 = new Multiaddr('/ip4/127.0.0.1/tcp/12346/http/p2p-webrtc-direct') - - const wd1 = new WebRTCDirect({ upgrader: mockUpgrader }) - const listener1 = wd1.createListener((conn) => { - expect(conn.remoteAddr).to.exist() - pipe(conn, conn) - }) - - await listener1.listen(ma1) - const conn = await wd.dial(ma1) - expect(conn.remoteAddr).to.exist() - - await conn.close() - - // wait for listener to know of the disconnect - await new Promise((resolve) => { - setTimeout(resolve, 1000) - }) - await listener1.close() - }) -}) diff --git a/test/listen.ts b/test/listen.ts new file mode 100644 index 0000000..d80f154 --- /dev/null +++ b/test/listen.ts @@ -0,0 +1,189 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/utils/chai.js' +import { Multiaddr } from '@multiformats/multiaddr' +import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { isBrowser } from 'wherearewe' +import type { WebRTCDirect } from '../src/index.js' +import { pipe } from 'it-pipe' +import { pEvent } from 'p-event' + +const ECHO_PROTOCOL = '/echo/1.0.0' + +export default (create: () => Promise) => { + describe('listen', function () { + this.timeout(20 * 1000) + + if (isBrowser) { + return + } + + let wd: WebRTCDirect + + const ma = new Multiaddr('/ip4/127.0.0.1/tcp/20123/http/p2p-webrtc-direct') + + before(async () => { + wd = await create() + }) + + it('listen, check for promise', async () => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + await listener.listen(ma) + await listener.close() + }) + + it('listen, check for listening event', (done) => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + listener.addEventListener('listening', () => { + void listener.close() + .then(done, done) + }, { + once: true + }) + void listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + void listener.listen(ma).then(async () => { + listener.addEventListener('close', () => done(), { + once: true + }) + + await listener.close() + }) + }) + + it('listen in 0.0.0.0', async () => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + await listener.listen(new Multiaddr('/ip4/0.0.0.0/tcp/48322')) + await listener.close() + }) + + it('listen in port 0', async () => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + await listener.listen(new Multiaddr('/ip4/127.0.0.1/tcp/0')) + await listener.close() + }) + + it('listen on IPv6 addr', async () => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + await listener.listen(new Multiaddr('/ip6/::1/tcp/48322')) + await listener.close() + }) + + it('getAddrs', async () => { + const listener = wd.createListener({ + upgrader: mockUpgrader() + }) + + await listener.listen(ma) + + const addrs = listener.getAddrs() + expect(addrs[0]).to.deep.equal(ma) + + await listener.close() + }) + + it('should untrack conn after being closed', async function () { + const ma1 = new Multiaddr('/ip4/127.0.0.1/tcp/12346/http/p2p-webrtc-direct') + const registrar = mockRegistrar() + void registrar.handle(ECHO_PROTOCOL, ({ stream }) => { + void pipe( + stream, + stream + ) + }) + const upgrader = mockUpgrader({ + registrar + }) + + const wd1 = await create() + const listener1 = wd1.createListener({ + upgrader, + handler: (conn) => { + void conn.newStream([ECHO_PROTOCOL]) + .then(({ stream }) => { + void pipe(stream, stream) + }) + } + }) + + await listener1.listen(ma1) + expect(listener1).to.have.nested.property('server.connections').that.has.lengthOf(0) + + const conn = await wd.dial(ma1, { + upgrader + }) + + // wait for listener to know of the connect + await pEvent(listener1, 'connection') + + expect(listener1).to.have.nested.property('server.connections').that.has.lengthOf(1) + + await conn.close() + + // wait for listener to know of the disconnect + await new Promise((resolve) => { + setTimeout(resolve, 1000) + }) + + expect(listener1).to.have.nested.property('server.connections').that.has.lengthOf(0) + + await listener1.close() + }) + + it('should have remoteAddress in listener connection', async function () { + const ma1 = new Multiaddr('/ip4/127.0.0.1/tcp/12346/http/p2p-webrtc-direct') + const registrar = mockRegistrar() + void registrar.handle(ECHO_PROTOCOL, ({ stream }) => { + void pipe( + stream, + stream + ) + }) + const upgrader = mockUpgrader({ + registrar + }) + + const wd1 = await create() + const listener1 = wd1.createListener({ + handler: (conn) => { + expect(conn.remoteAddr).to.exist() + + void conn.newStream([ECHO_PROTOCOL]) + .then(({ stream }) => { + void pipe(stream, stream) + }) + }, + upgrader + }) + + await listener1.listen(ma1) + const conn = await wd.dial(ma1, { + upgrader + }) + expect(conn.remoteAddr).to.exist() + + await conn.close() + await listener1.close() + }) + }) +} diff --git a/test/node.js b/test/node.js deleted file mode 100644 index 58b90fd..0000000 --- a/test/node.js +++ /dev/null @@ -1,4 +0,0 @@ -'use strict' - -require('./listen.js') -require('./compliance.js') diff --git a/test/node.ts b/test/node.ts new file mode 100644 index 0000000..ebe1076 --- /dev/null +++ b/test/node.ts @@ -0,0 +1,25 @@ + +import listenTests from './listen.js' +import complianceTests from './compliance.js' +import dialTests from './dial.js' +// @ts-expect-error no types +import wrtc from 'wrtc' +import { WebRTCDirect } from '../src/index.js' + +// TODO: Temporary fix per wrtc issue +// https://github.com/node-webrtc/node-webrtc/issues/636#issuecomment-774171409 +process.on('beforeExit', (code) => process.exit(code)) + +describe('transport: with wrtc', () => { + const create = async () => { + const ws = new WebRTCDirect({ + wrtc + }) + + return ws + } + + dialTests(create) + listenTests(create) + complianceTests(create) +}) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..f296f99 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist", + "emitDeclarationOnly": false, + "module": "ES2020" + }, + "include": [ + "src", + "test" + ] +}