Skip to content

Commit ba8527c

Browse files
authored
fix: improve pubsub example (#1549)
Instead of waiting an arbitrary amount of time for subscriptions to propagate, before sending messages, ensure that node1 has node2's subs and node2 has node3's subs. Closes #1540
1 parent 2dac4be commit ba8527c

File tree

2 files changed

+41
-19
lines changed

2 files changed

+41
-19
lines changed

examples/pubsub/message-filtering/1.js

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ const createNode = async () => {
6868
node3.pubsub.subscribe(topic)
6969

7070
// wait for subscriptions to propagate
71-
await delay(1000)
71+
await hasSubscription(node1, node2, topic)
72+
await hasSubscription(node2, node3, topic)
7273

7374
const validateFruit = (msgTopic, msg) => {
7475
const fruit = uint8ArrayToString(msg.data)
@@ -91,13 +92,27 @@ const createNode = async () => {
9192
await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
9293
}
9394

94-
// wait a few seconds for messages to be received
95-
await delay(5000)
9695
console.log('############## all messages sent ##############')
9796
})()
9897

9998
async function delay (ms) {
10099
await new Promise((resolve) => {
101100
setTimeout(() => resolve(), ms)
102101
})
103-
}
102+
}
103+
104+
/**
105+
* Wait for node1 to see that node2 has subscribed to the topic
106+
*/
107+
async function hasSubscription (node1, node2, topic) {
108+
while (true) {
109+
const subs = await node1.pubsub.getSubscribers(topic)
110+
111+
if (subs.map(peer => peer.toString()).includes(node2.peerId.toString())) {
112+
return
113+
}
114+
115+
// wait for subscriptions to propagate
116+
await delay(100)
117+
}
118+
}

examples/pubsub/message-filtering/test.js

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,36 @@ export async function test () {
1919
all: true
2020
})
2121

22+
let output = ''
23+
24+
const expected = [
25+
'node2 received: banana',
26+
'node2 received: apple',
27+
'node2 received: orange',
28+
'node3 received: banana',
29+
'node3 received: apple',
30+
'node3 received: orange'
31+
]
32+
2233
proc.all.on('data', async (data) => {
2334
process.stdout.write(data)
24-
const line = uint8ArrayToString(data)
35+
output += uint8ArrayToString(data)
2536

26-
// End
27-
if (line.includes('all messages sent')) {
28-
if (messages.car > 0) {
29-
defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
30-
}
37+
if (output.includes('received: car')) {
38+
defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
39+
}
3140

32-
for (const fruit of ['banana', 'apple', 'orange']) {
33-
if (messages[fruit] !== 2) {
34-
defer.reject(new Error(`Not enough ${fruit} messages - received ${messages[fruit] ?? 0}, expected 2`))
35-
}
41+
let allMessagesReceived = true
42+
43+
expected.forEach(message => {
44+
if (!output.includes(message)) {
45+
allMessagesReceived = false
3646
}
47+
})
3748

49+
if (allMessagesReceived) {
3850
defer.resolve()
3951
}
40-
41-
if (line.includes('received:')) {
42-
const fruit = line.split('received:')[1].trim()
43-
messages[fruit] = (messages[fruit] ?? 0) + 1
44-
}
4552
})
4653

4754
await defer.promise

0 commit comments

Comments
 (0)