diff --git a/src/connection/connection.d.ts b/src/connection/connection.d.ts index f74e29693..9bd1af388 100644 --- a/src/connection/connection.d.ts +++ b/src/connection/connection.d.ts @@ -1,5 +1,24 @@ declare const _exports: typeof Connection; export = _exports; +export type Sink = (source: Uint8Array) => Promise; +export type DuplexIterableStream = { + sink: Sink; + source: () => AsyncIterator; +}; +export type PeerId = import("peer-id"); +export type Multiaddr = import("multiaddr"); +/** + * @callback Sink + * @param {Uint8Array} source + * @returns {Promise} + * + * @typedef {object} DuplexIterableStream + * @property {Sink} sink + * @property {() AsyncIterator} source + * + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr + */ /** * An implementation of the js-libp2p connection. * Any libp2p transport should use an upgrader to return this connection. @@ -8,13 +27,13 @@ declare class Connection { /** * Creates an instance of Connection. * @param {object} properties properties of the connection. - * @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known. - * @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection. + * @param {Multiaddr} [properties.localAddr] local multiaddr of the connection if known. + * @param {Multiaddr} [properties.remoteAddr] remote multiaddr of the connection. * @param {PeerId} properties.localPeer local peer-id. * @param {PeerId} properties.remotePeer remote peer-id. * @param {function} properties.newStream new stream muxer function. * @param {function} properties.close close raw connection function. - * @param {function(): Stream[]} properties.getStreams get streams from muxer function. + * @param {function(): DuplexIterableStream[]} properties.getStreams get streams from muxer function. * @param {object} properties.stat metadata of the connection. * @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound"). * @param {object} properties.stat.timeline connection relevant events timestamp. @@ -30,7 +49,7 @@ declare class Connection { remotePeer: import("peer-id"); newStream: Function; close: Function; - getStreams: () => any[]; + getStreams: () => DuplexIterableStream[]; stat: { direction: string; timeline: { @@ -85,7 +104,7 @@ declare class Connection { /** * Reference to the getStreams function of the muxer */ - _getStreams: () => any[]; + _getStreams: () => DuplexIterableStream[]; /** * Connection streams registry */ @@ -113,25 +132,25 @@ declare class Connection { * Get all the streams of the muxer. * @this {Connection} */ - get streams(): any[]; + get streams(): DuplexIterableStream[]; /** * Create a new stream from this connection * @param {string[]} protocols intended protocol for the stream - * @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol + * @return {Promise<{stream: DuplexIterableStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol */ newStream(protocols: string[]): Promise<{ - stream: any; + stream: DuplexIterableStream; protocol: string; }>; /** * Add a stream when it is opened to the registry. - * @param {*} muxedStream a muxed stream + * @param {DuplexIterableStream} muxedStream a muxed stream * @param {object} properties the stream properties to be registered * @param {string} properties.protocol the protocol used by the stream * @param {object} properties.metadata metadata of the stream * @return {void} */ - addStream(muxedStream: any, { protocol, metadata }: { + addStream(muxedStream: DuplexIterableStream, { protocol, metadata }: { protocol: string; metadata: any; }): void; diff --git a/src/connection/connection.js b/src/connection/connection.js index d1ff248cd..1659d12c6 100644 --- a/src/connection/connection.js +++ b/src/connection/connection.js @@ -1,56 +1,23 @@ 'use strict' -const PeerId = require('peer-id') -const multiaddr = require('multiaddr') const withIs = require('class-is') const errCode = require('err-code') const Status = require('./status') -function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { - if (localAddr && !multiaddr.isMultiaddr(localAddr)) { - throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS') - } - - if (!PeerId.isPeerId(localPeer)) { - throw errCode(new Error('localPeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS') - } - - if (!PeerId.isPeerId(remotePeer)) { - throw errCode(new Error('remotePeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS') - } - - if (typeof newStream !== 'function') { - throw errCode(new Error('new stream must be a function'), 'ERR_INVALID_PARAMETERS') - } - - if (typeof close !== 'function') { - throw errCode(new Error('close must be a function'), 'ERR_INVALID_PARAMETERS') - } +const { validateArgs } = require('./utils') - if (typeof getStreams !== 'function') { - throw errCode(new Error('getStreams must be a function'), 'ERR_INVALID_PARAMETERS') - } - - if (!stat) { - throw errCode(new Error('connection metadata object must be provided'), 'ERR_INVALID_PARAMETERS') - } - - if (stat.direction !== 'inbound' && stat.direction !== 'outbound') { - throw errCode(new Error('direction must be "inbound" or "outbound"'), 'ERR_INVALID_PARAMETERS') - } - - if (!stat.timeline) { - throw errCode(new Error('connection timeline object must be provided in the stat object'), 'ERR_INVALID_PARAMETERS') - } - - if (!stat.timeline.open) { - throw errCode(new Error('connection open timestamp must be provided'), 'ERR_INVALID_PARAMETERS') - } - - if (!stat.timeline.upgraded) { - throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS') - } -} +/** + * @callback Sink + * @param {Uint8Array} source + * @returns {Promise} + * + * @typedef {object} DuplexIterableStream + * @property {Sink} sink + * @property {() AsyncIterator} source + * + * @typedef {import('peer-id')} PeerId + * @typedef {import('multiaddr')} Multiaddr + */ /** * An implementation of the js-libp2p connection. @@ -60,13 +27,13 @@ class Connection { /** * Creates an instance of Connection. * @param {object} properties properties of the connection. - * @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known. - * @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection. + * @param {Multiaddr} [properties.localAddr] local multiaddr of the connection if known. + * @param {Multiaddr} [properties.remoteAddr] remote multiaddr of the connection. * @param {PeerId} properties.localPeer local peer-id. * @param {PeerId} properties.remotePeer remote peer-id. * @param {function} properties.newStream new stream muxer function. * @param {function} properties.close close raw connection function. - * @param {function(): Stream[]} properties.getStreams get streams from muxer function. + * @param {function(): DuplexIterableStream[]} properties.getStreams get streams from muxer function. * @param {object} properties.stat metadata of the connection. * @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound"). * @param {object} properties.stat.timeline connection relevant events timestamp. @@ -157,7 +124,7 @@ class Connection { /** * Create a new stream from this connection * @param {string[]} protocols intended protocol for the stream - * @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol + * @return {Promise<{stream: DuplexIterableStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol */ async newStream (protocols) { if (this.stat.status === Status.CLOSING) { @@ -182,7 +149,7 @@ class Connection { /** * Add a stream when it is opened to the registry. - * @param {*} muxedStream a muxed stream + * @param {DuplexIterableStream} muxedStream a muxed stream * @param {object} properties the stream properties to be registered * @param {string} properties.protocol the protocol used by the stream * @param {object} properties.metadata metadata of the stream diff --git a/src/connection/utils.d.ts b/src/connection/utils.d.ts new file mode 100644 index 000000000..abc4d7a97 --- /dev/null +++ b/src/connection/utils.d.ts @@ -0,0 +1 @@ +export function validateArgs(localAddr: any, localPeer: any, remotePeer: any, newStream: any, close: any, getStreams: any, stat: any): void; diff --git a/src/connection/utils.js b/src/connection/utils.js new file mode 100644 index 000000000..b46baa603 --- /dev/null +++ b/src/connection/utils.js @@ -0,0 +1,55 @@ +'use strict' + +const errCode = require('err-code') +const PeerId = require('peer-id') +const multiaddr = require('multiaddr') + +function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { + if (localAddr && !multiaddr.isMultiaddr(localAddr)) { + throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS') + } + + if (!PeerId.isPeerId(localPeer)) { + throw errCode(new Error('localPeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS') + } + + if (!PeerId.isPeerId(remotePeer)) { + throw errCode(new Error('remotePeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS') + } + + if (typeof newStream !== 'function') { + throw errCode(new Error('new stream must be a function'), 'ERR_INVALID_PARAMETERS') + } + + if (typeof close !== 'function') { + throw errCode(new Error('close must be a function'), 'ERR_INVALID_PARAMETERS') + } + + if (typeof getStreams !== 'function') { + throw errCode(new Error('getStreams must be a function'), 'ERR_INVALID_PARAMETERS') + } + + if (!stat) { + throw errCode(new Error('connection metadata object must be provided'), 'ERR_INVALID_PARAMETERS') + } + + if (stat.direction !== 'inbound' && stat.direction !== 'outbound') { + throw errCode(new Error('direction must be "inbound" or "outbound"'), 'ERR_INVALID_PARAMETERS') + } + + if (!stat.timeline) { + throw errCode(new Error('connection timeline object must be provided in the stat object'), 'ERR_INVALID_PARAMETERS') + } + + if (!stat.timeline.open) { + throw errCode(new Error('connection open timestamp must be provided'), 'ERR_INVALID_PARAMETERS') + } + + if (!stat.timeline.upgraded) { + throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS') + } +} + +module.exports = { + validateArgs +} diff --git a/src/pubsub/peer-streams.d.ts b/src/pubsub/peer-streams.d.ts index 632fe2c84..cb48989ac 100644 --- a/src/pubsub/peer-streams.d.ts +++ b/src/pubsub/peer-streams.d.ts @@ -1,15 +1,8 @@ export = PeerStreams; /** - * @callback Sink - * @param {Uint8Array} source - * @returns {Promise} + * @typedef {import('../connection/connection').DuplexIterableStream} DuplexIterableStream * - * @typedef {object} DuplexIterableStream - * @property {Sink} sink - * @property {() AsyncIterator} source - * - * @typedef PeerId - * @type import('peer-id') + * @typedef {import('peer-id')} PeerId */ /** * Thin wrapper around a peer's inbound / outbound pubsub streams @@ -87,7 +80,7 @@ declare class PeerStreams { * @param {DuplexIterableStream} stream * @returns {void} */ - attachInboundStream(stream: DuplexIterableStream): void; + attachInboundStream(stream: import("../connection/connection").DuplexIterableStream): void; /** * Attach a raw outbound stream and setup a write stream * @@ -102,12 +95,11 @@ declare class PeerStreams { close(): void; } declare namespace PeerStreams { - export { Sink, DuplexIterableStream, PeerId }; + export { DuplexIterableStream, PeerId }; } type DuplexIterableStream = { - sink: Sink; + sink: import("../connection/connection").Sink; source: () => AsyncIterator; }; declare const AbortController: typeof import("abort-controller"); -type Sink = (source: Uint8Array) => Promise; type PeerId = import("peer-id"); diff --git a/src/pubsub/peer-streams.js b/src/pubsub/peer-streams.js index a41e1d882..4f14a73ab 100644 --- a/src/pubsub/peer-streams.js +++ b/src/pubsub/peer-streams.js @@ -13,16 +13,9 @@ const log = debug('libp2p-pubsub:peer-streams') log.error = debug('libp2p-pubsub:peer-streams:error') /** - * @callback Sink - * @param {Uint8Array} source - * @returns {Promise} + * @typedef {import('../connection/connection').DuplexIterableStream} DuplexIterableStream * - * @typedef {object} DuplexIterableStream - * @property {Sink} sink - * @property {() AsyncIterator} source - * - * @typedef PeerId - * @type import('peer-id') + * @typedef {import('peer-id')} PeerId */ /**