@@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
4
4
import { CODE_P2P } from './constants.js'
5
5
import {
6
6
getMultiaddrs ,
7
- multiaddrToNetConfig
7
+ multiaddrToNetConfig ,
8
+ NetConfig
8
9
} from './utils.js'
9
10
import { EventEmitter , CustomEvent } from '@libp2p/interfaces/events'
10
11
import type { MultiaddrConnection , Connection } from '@libp2p/interface-connection'
@@ -25,15 +26,27 @@ async function attemptClose (maConn: MultiaddrConnection) {
25
26
}
26
27
}
27
28
29
+ export interface LimitServerConnectionsOpts {
30
+ acceptBelow : number
31
+ rejectAbove : number
32
+ onListenError ?: ( err : Error ) => void
33
+ }
34
+
28
35
interface Context extends TCPCreateListenerOptions {
29
36
handler ?: ( conn : Connection ) => void
30
37
upgrader : Upgrader
31
38
socketInactivityTimeout ?: number
32
39
socketCloseTimeout ?: number
33
40
maxConnections ?: number
41
+ limitServerConnections ?: LimitServerConnectionsOpts
34
42
}
35
43
36
- type Status = { started : false } | { started : true , listeningAddr : Multiaddr , peerId : string | null }
44
+ type Status = { started : false } | {
45
+ started : true
46
+ listeningAddr : Multiaddr
47
+ peerId : string | null
48
+ netConfig : NetConfig
49
+ }
37
50
38
51
export class TCPListener extends EventEmitter < ListenerEvents > implements Listener {
39
52
private readonly server : net . Server
@@ -89,12 +102,33 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
89
102
90
103
socket . once ( 'close' , ( ) => {
91
104
this . connections . delete ( maConn )
105
+
106
+ if (
107
+ this . context . limitServerConnections != null &&
108
+ this . connections . size < this . context . limitServerConnections . acceptBelow
109
+ ) {
110
+ // The most likely case of error is if the port taken by this application is binded by
111
+ // another process during the time the server if closed. In that case there's not much
112
+ // we can do. netListen() will be called again every time a connection is dropped, which
113
+ // acts as an eventual retry mechanism. onListenError allows the consumer act on this.
114
+ this . netListen ( ) . catch ( e => {
115
+ log . error ( 'error attempting to listen server once connection count under limit' , e )
116
+ this . context . limitServerConnections ?. onListenError ?.( e as Error )
117
+ } )
118
+ }
92
119
} )
93
120
94
121
if ( this . context . handler != null ) {
95
122
this . context . handler ( conn )
96
123
}
97
124
125
+ if (
126
+ this . context . limitServerConnections != null &&
127
+ this . connections . size >= this . context . limitServerConnections . rejectAbove
128
+ ) {
129
+ this . netClose ( )
130
+ }
131
+
98
132
this . dispatchEvent ( new CustomEvent < Connection > ( 'connection' , { detail : conn } ) )
99
133
} )
100
134
. catch ( async err => {
@@ -148,21 +182,21 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
148
182
}
149
183
150
184
async listen ( ma : Multiaddr ) {
185
+ if ( this . status . started ) {
186
+ throw Error ( 'server is already listening' )
187
+ }
188
+
151
189
const peerId = ma . getPeerId ( )
152
190
const listeningAddr = peerId == null ? ma . decapsulateCode ( CODE_P2P ) : ma
153
191
154
- this . status = { started : true , listeningAddr, peerId }
192
+ this . status = {
193
+ started : true ,
194
+ listeningAddr,
195
+ peerId,
196
+ netConfig : multiaddrToNetConfig ( listeningAddr )
197
+ }
155
198
156
- return await new Promise < void > ( ( resolve , reject ) => {
157
- const options = multiaddrToNetConfig ( listeningAddr )
158
- this . server . listen ( options , ( err ?: any ) => {
159
- if ( err != null ) {
160
- return reject ( err )
161
- }
162
- log ( 'Listening on %s' , this . server . address ( ) )
163
- resolve ( )
164
- } )
165
- } )
199
+ await this . netListen ( )
166
200
}
167
201
168
202
async close ( ) {
@@ -174,8 +208,47 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
174
208
Array . from ( this . connections . values ( ) ) . map ( async maConn => await attemptClose ( maConn ) )
175
209
)
176
210
211
+ await this . netClose ( )
212
+ }
213
+
214
+ private async netListen ( ) : Promise < void > {
215
+ if ( ! this . status . started || this . server . listening ) {
216
+ return
217
+ }
218
+
219
+ const netConfig = this . status . netConfig
220
+
177
221
await new Promise < void > ( ( resolve , reject ) => {
178
- this . server . close ( err => ( err != null ) ? reject ( err ) : resolve ( ) )
222
+ // NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
223
+ this . server . once ( 'error' , reject )
224
+ this . server . listen ( netConfig , resolve )
179
225
} )
226
+
227
+ log ( 'Listening on %s' , this . server . address ( ) )
228
+ }
229
+
230
+ private netClose ( ) : void {
231
+ if ( ! this . status . started || ! this . server . listening ) {
232
+ return
233
+ }
234
+
235
+ log ( 'Closing server on %s' , this . server . address ( ) )
236
+
237
+ // NodeJS implementation tracks listening status with `this._handle` property.
238
+ // - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
239
+ // - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
240
+ // - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
241
+ //
242
+ // NOTE: Both listen and close are technically not async actions, so it's not necessary to track
243
+ // states 'pending-close' or 'pending-listen'
244
+
245
+ // From docs https://nodejs.org/api/net.html#serverclosecallback
246
+ // Stops the server from accepting new connections and keeps existing connections.
247
+ // 'close' event is emitted only emitted when all connections are ended.
248
+ // The optional callback will be called once the 'close' event occurs.
249
+ //
250
+ // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
251
+ // to pass a callback to close.
252
+ this . server . close ( )
180
253
}
181
254
}
0 commit comments