Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

chore: add duplex iterable type to connection #71

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions src/connection/connection.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
declare const _exports: typeof Connection;
export = _exports;
export type Sink = (source: Uint8Array) => Promise<Uint8Array>;
export type DuplexIterableStream = {
sink: Sink;
source: () => AsyncIterator<Uint8Array, any, undefined>;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type here should really just be a Stream, and it should match what's coming from mplex https://github.com/libp2p/js-libp2p-mplex/blob/v0.10.1/src/stream.js#L59

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the Stream in a type declaration file in the multiplexer interface?

export type PeerId = import("peer-id");
export type Multiaddr = import("multiaddr");
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr')} Multiaddr
*/
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
Expand All @@ -8,13 +27,13 @@ declare class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {Multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {Multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {function(): DuplexIterableStream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
Expand All @@ -30,7 +49,7 @@ declare class Connection {
remotePeer: import("peer-id");
newStream: Function;
close: Function;
getStreams: () => any[];
getStreams: () => DuplexIterableStream[];
stat: {
direction: string;
timeline: {
Expand Down Expand Up @@ -85,7 +104,7 @@ declare class Connection {
/**
* Reference to the getStreams function of the muxer
*/
_getStreams: () => any[];
_getStreams: () => DuplexIterableStream[];
/**
* Connection streams registry
*/
Expand Down Expand Up @@ -113,25 +132,25 @@ declare class Connection {
* Get all the streams of the muxer.
* @this {Connection}
*/
get streams(): any[];
get streams(): DuplexIterableStream[];
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
* @return {Promise<{stream: DuplexIterableStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
newStream(protocols: string[]): Promise<{
stream: any;
stream: DuplexIterableStream;
protocol: string;
}>;
/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {DuplexIterableStream} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
* @return {void}
*/
addStream(muxedStream: any, { protocol, metadata }: {
addStream(muxedStream: DuplexIterableStream, { protocol, metadata }: {
protocol: string;
metadata: any;
}): void;
Expand Down
69 changes: 18 additions & 51 deletions src/connection/connection.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,23 @@
'use strict'

const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const withIs = require('class-is')
const errCode = require('err-code')
const Status = require('./status')

function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
if (localAddr && !multiaddr.isMultiaddr(localAddr)) {
throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS')
}

if (!PeerId.isPeerId(localPeer)) {
throw errCode(new Error('localPeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS')
}

if (!PeerId.isPeerId(remotePeer)) {
throw errCode(new Error('remotePeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS')
}

if (typeof newStream !== 'function') {
throw errCode(new Error('new stream must be a function'), 'ERR_INVALID_PARAMETERS')
}

if (typeof close !== 'function') {
throw errCode(new Error('close must be a function'), 'ERR_INVALID_PARAMETERS')
}
const { validateArgs } = require('./utils')

if (typeof getStreams !== 'function') {
throw errCode(new Error('getStreams must be a function'), 'ERR_INVALID_PARAMETERS')
}

if (!stat) {
throw errCode(new Error('connection metadata object must be provided'), 'ERR_INVALID_PARAMETERS')
}

if (stat.direction !== 'inbound' && stat.direction !== 'outbound') {
throw errCode(new Error('direction must be "inbound" or "outbound"'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline) {
throw errCode(new Error('connection timeline object must be provided in the stat object'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline.open) {
throw errCode(new Error('connection open timestamp must be provided'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline.upgraded) {
throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS')
}
}
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this AsyncIterator or is it meant to be AsyncIterable ?

*
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr')} Multiaddr
*/

/**
* An implementation of the js-libp2p connection.
Expand All @@ -60,13 +27,13 @@ class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {Multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {Multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {function(): DuplexIterableStream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
Expand Down Expand Up @@ -157,7 +124,7 @@ class Connection {
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
* @return {Promise<{stream: DuplexIterableStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
async newStream (protocols) {
if (this.stat.status === Status.CLOSING) {
Expand All @@ -182,7 +149,7 @@ class Connection {

/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {DuplexIterableStream} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
Expand Down
1 change: 1 addition & 0 deletions src/connection/utils.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export function validateArgs(localAddr: any, localPeer: any, remotePeer: any, newStream: any, close: any, getStreams: any, stat: any): void;
55 changes: 55 additions & 0 deletions src/connection/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict'

const errCode = require('err-code')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')

function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
if (localAddr && !multiaddr.isMultiaddr(localAddr)) {
throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS')
}

if (!PeerId.isPeerId(localPeer)) {
throw errCode(new Error('localPeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS')
}

if (!PeerId.isPeerId(remotePeer)) {
throw errCode(new Error('remotePeer must be an instance of peer-id'), 'ERR_INVALID_PARAMETERS')
}

if (typeof newStream !== 'function') {
throw errCode(new Error('new stream must be a function'), 'ERR_INVALID_PARAMETERS')
}

if (typeof close !== 'function') {
throw errCode(new Error('close must be a function'), 'ERR_INVALID_PARAMETERS')
}

if (typeof getStreams !== 'function') {
throw errCode(new Error('getStreams must be a function'), 'ERR_INVALID_PARAMETERS')
}

if (!stat) {
throw errCode(new Error('connection metadata object must be provided'), 'ERR_INVALID_PARAMETERS')
}

if (stat.direction !== 'inbound' && stat.direction !== 'outbound') {
throw errCode(new Error('direction must be "inbound" or "outbound"'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline) {
throw errCode(new Error('connection timeline object must be provided in the stat object'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline.open) {
throw errCode(new Error('connection open timestamp must be provided'), 'ERR_INVALID_PARAMETERS')
}

if (!stat.timeline.upgraded) {
throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS')
}
}

module.exports = {
validateArgs
}
18 changes: 5 additions & 13 deletions src/pubsub/peer-streams.d.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
export = PeerStreams;
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
* @typedef {import('../connection/connection').DuplexIterableStream} DuplexIterableStream
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
* @typedef {import('peer-id')} PeerId
*/
/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
Expand Down Expand Up @@ -87,7 +80,7 @@ declare class PeerStreams {
* @param {DuplexIterableStream} stream
* @returns {void}
*/
attachInboundStream(stream: DuplexIterableStream): void;
attachInboundStream(stream: import("../connection/connection").DuplexIterableStream): void;
/**
* Attach a raw outbound stream and setup a write stream
*
Expand All @@ -102,12 +95,11 @@ declare class PeerStreams {
close(): void;
}
declare namespace PeerStreams {
export { Sink, DuplexIterableStream, PeerId };
export { DuplexIterableStream, PeerId };
}
type DuplexIterableStream = {
sink: Sink;
sink: import("../connection/connection").Sink;
source: () => AsyncIterator<Uint8Array, any, undefined>;
};
declare const AbortController: typeof import("abort-controller");
type Sink = (source: Uint8Array) => Promise<Uint8Array>;
type PeerId = import("peer-id");
11 changes: 2 additions & 9 deletions src/pubsub/peer-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,9 @@ const log = debug('libp2p-pubsub:peer-streams')
log.error = debug('libp2p-pubsub:peer-streams:error')

/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
* @typedef {import('../connection/connection').DuplexIterableStream} DuplexIterableStream
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
* @typedef {import('peer-id')} PeerId
*/

/**
Expand Down