Skip to content

Commit ebb22d1

Browse files
committed
feat: cookie for discovery
1 parent b080670 commit ebb22d1

File tree

9 files changed

+332
-119
lines changed

9 files changed

+332
-119
lines changed

src/index.js

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ const defaultServerOptions = {
2626
gcInterval: 3e5
2727
}
2828

29+
/**
30+
* Rendezvous point contains the connection to a rendezvous server, as well as,
31+
* the cookies per namespace that the client received.
32+
* @typedef {Object} RendezvousPoint
33+
* @property {Connection} connection
34+
* @property {Map<string, string>} cookies
35+
*/
36+
2937
/**
3038
* Libp2p Rendezvous.
3139
* A lightweight mechanism for generalized peer discovery.
@@ -57,9 +65,15 @@ class Rendezvous {
5765
}
5866

5967
/**
60-
* @type {Map<string, Connection>}
68+
* @type {Map<string, RendezvousPoint>}
6169
*/
62-
this._rendezvousConns = new Map()
70+
this._rendezvousPoints = new Map()
71+
72+
/**
73+
* Client cookies per namespace for own server
74+
* @type {Map<string, string>}
75+
*/
76+
this._cookiesSelf = new Map()
6377

6478
this._server = undefined
6579

@@ -120,12 +134,19 @@ class Rendezvous {
120134
}
121135

122136
this._registrarId = undefined
137+
this._rendezvousPoints.clear()
138+
this._cookiesSelf.clear()
139+
123140
log('stopped')
124141
}
125142

143+
/**
144+
* Keep registrations updated on servers.
145+
* @returns {void}
146+
*/
126147
_keepRegistrations () {
127148
const register = () => {
128-
if (!this._rendezvousConns.size) {
149+
if (!this._rendezvousPoints.size) {
129150
return
130151
}
131152

@@ -152,7 +173,7 @@ class Rendezvous {
152173
const idB58Str = peerId.toB58String()
153174
log('connected', idB58Str)
154175

155-
this._rendezvousConns.set(idB58Str, conn)
176+
this._rendezvousPoints.set(idB58Str, { connection: conn })
156177
}
157178

158179
/**
@@ -164,7 +185,7 @@ class Rendezvous {
164185
const idB58Str = peerId.toB58String()
165186
log('disconnected', idB58Str)
166187

167-
this._rendezvousConns.delete(idB58Str)
188+
this._rendezvousPoints.delete(idB58Str)
168189

169190
if (this._server) {
170191
this._server.removePeerRegistrations(peerId)
@@ -196,7 +217,7 @@ class Rendezvous {
196217
}
197218

198219
// Are there available rendezvous servers?
199-
if (!this._rendezvousConns.size) {
220+
if (!this._rendezvousPoints.size) {
200221
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
201222
}
202223

@@ -214,8 +235,8 @@ class Rendezvous {
214235

215236
const registerTasks = []
216237
const taskFn = async (id) => {
217-
const conn = this._rendezvousConns.get(id)
218-
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
238+
const { connection } = this._rendezvousPoints.get(id)
239+
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
219240

220241
const [response] = await pipe(
221242
[message],
@@ -235,7 +256,7 @@ class Rendezvous {
235256
return recMessage.registerResponse.ttl
236257
}
237258

238-
for (const id of this._rendezvousConns.keys()) {
259+
for (const id of this._rendezvousPoints.keys()) {
239260
registerTasks.push(taskFn(id))
240261
}
241262

@@ -255,7 +276,7 @@ class Rendezvous {
255276
}
256277

257278
// Are there available rendezvous servers?
258-
if (!this._rendezvousConns.size) {
279+
if (!this._rendezvousPoints.size) {
259280
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
260281
}
261282

@@ -269,8 +290,8 @@ class Rendezvous {
269290

270291
const unregisterTasks = []
271292
const taskFn = async (id) => {
272-
const conn = this._rendezvousConns.get(id)
273-
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
293+
const { connection } = this._rendezvousPoints.get(id)
294+
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
274295

275296
await pipe(
276297
[message],
@@ -282,7 +303,7 @@ class Rendezvous {
282303
)
283304
}
284305

285-
for (const id of this._rendezvousConns.keys()) {
306+
for (const id of this._rendezvousPoints.keys()) {
286307
unregisterTasks.push(taskFn(id))
287308
}
288309

@@ -293,12 +314,11 @@ class Rendezvous {
293314
* Discover peers registered under a given namespace
294315
* @param {string} ns
295316
* @param {number} [limit]
296-
* @param {Buffer} [cookie]
297317
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Array<Multiaddr>, ns: string, ttl: number }>}
298318
*/
299-
async * discover (ns, limit, cookie) {
319+
async * discover (ns, limit) {
300320
// Are there available rendezvous servers?
301-
if (!this._rendezvousConns.size) {
321+
if (!this._rendezvousPoints.size) {
302322
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
303323
}
304324

@@ -311,7 +331,9 @@ class Rendezvous {
311331

312332
// Local search if Server
313333
if (this._server) {
314-
const localRegistrations = this._server.getRegistrations(ns, limit)
334+
const cookieSelf = this._cookiesSelf.get(ns)
335+
const { cookie: cookieS, registrations: localRegistrations } = this._server.getRegistrations(ns, { limit, cookie: cookieSelf })
336+
315337
for (const r of localRegistrations) {
316338
yield registrationTransformer(r)
317339

@@ -320,21 +342,28 @@ class Rendezvous {
320342
return
321343
}
322344
}
323-
}
324345

325-
const message = Message.encode({
326-
type: MESSAGE_TYPE.DISCOVER,
327-
discover: {
328-
ns,
329-
limit,
330-
cookie
331-
}
332-
})
346+
// Store cookie self
347+
this._cookiesSelf.set(ns, cookieS)
348+
}
333349

334-
for (const id of this._rendezvousConns.keys()) {
335-
const conn = this._rendezvousConns.get(id)
336-
const { stream } = await conn.newStream(PROTOCOL_MULTICODEC)
350+
// Iterate over all rendezvous points
351+
for (const [id, rp] of this._rendezvousPoints.entries()) {
352+
const rpCookies = rp.cookies || new Map()
353+
354+
// Check if we have a cookie and encode discover message
355+
const cookie = rpCookies.get(ns)
356+
const message = Message.encode({
357+
type: MESSAGE_TYPE.DISCOVER,
358+
discover: {
359+
ns,
360+
limit,
361+
cookie: cookie ? Buffer.from(cookie) : undefined
362+
}
363+
})
337364

365+
// Send discover message and wait for response
366+
const { stream } = await rp.connection.newStream(PROTOCOL_MULTICODEC)
338367
const [response] = await pipe(
339368
[message],
340369
lp.encode(),
@@ -350,10 +379,18 @@ class Rendezvous {
350379
throw new Error('unexpected message received')
351380
}
352381

382+
// Iterate over registrations response
353383
for (const r of recMessage.discoverResponse.registrations) {
354-
// track registrations and check if already provided
384+
// track registrations
355385
yield registrationTransformer(r)
356386

387+
// Store cookie
388+
rpCookies.set(ns, recMessage.discoverResponse.cookie.toString())
389+
this._rendezvousPoints.set(id, {
390+
connection: rp.connection,
391+
cookies: rpCookies
392+
})
393+
357394
limit--
358395
if (limit === 0) {
359396
return

0 commit comments

Comments
 (0)