Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit 46899b6

Browse files
authored
chore: complete pubsub tests (#81)
1 parent 13f6ed6 commit 46899b6

File tree

8 files changed

+414
-168
lines changed

8 files changed

+414
-168
lines changed

src/pubsub/tests/api.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
/* eslint-env mocha */
33
'use strict'
44

5-
const chai = require('chai')
6-
const { expect } = chai
5+
const { expect } = require('aegir/utils/chai')
76
const sinon = require('sinon')
87

98
const pDefer = require('p-defer')
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// @ts-nocheck interface tests
2+
/* eslint-env mocha */
3+
'use strict'
4+
5+
const { expect } = require('aegir/utils/chai')
6+
const sinon = require('sinon')
7+
const pDefer = require('p-defer')
8+
const pWaitFor = require('p-wait-for')
9+
const uint8ArrayToString = require('uint8arrays/to-string')
10+
11+
const { expectSet } = require('./utils')
12+
13+
module.exports = (common) => {
14+
describe('pubsub connection handlers', () => {
15+
let psA, psB
16+
17+
describe('nodes send state on connection', () => {
18+
// Create pubsub nodes and connect them
19+
before(async () => {
20+
[psA, psB] = await common.setup(2)
21+
22+
expect(psA.peers.size).to.be.eql(0)
23+
expect(psB.peers.size).to.be.eql(0)
24+
25+
// Start pubsub
26+
psA.start()
27+
psB.start()
28+
})
29+
30+
// Make subscriptions prior to nodes connected
31+
before(() => {
32+
psA.subscribe('Za')
33+
psB.subscribe('Zb')
34+
35+
expect(psA.peers.size).to.equal(0)
36+
expectSet(psA.subscriptions, ['Za'])
37+
expect(psB.peers.size).to.equal(0)
38+
expectSet(psB.subscriptions, ['Zb'])
39+
})
40+
41+
after(async () => {
42+
sinon.restore()
43+
await common.teardown()
44+
})
45+
46+
it('existing subscriptions are sent upon peer connection', async function () {
47+
this.timeout(10e3)
48+
49+
await Promise.all([
50+
psA._libp2p.dial(psB.peerId),
51+
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
52+
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
53+
])
54+
55+
expect(psA.peers.size).to.equal(1)
56+
expect(psB.peers.size).to.equal(1)
57+
58+
expectSet(psA.subscriptions, ['Za'])
59+
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
60+
61+
expectSet(psB.subscriptions, ['Zb'])
62+
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
63+
})
64+
})
65+
66+
describe('pubsub started before connect', () => {
67+
// Create pubsub nodes and start them
68+
beforeEach(async () => {
69+
[psA, psB] = await common.setup(2)
70+
71+
psA.start()
72+
psB.start()
73+
})
74+
75+
afterEach(async () => {
76+
sinon.restore()
77+
78+
await common.teardown()
79+
})
80+
81+
it('should get notified of connected peers on dial', async () => {
82+
const connection = await psA._libp2p.dial(psB.peerId)
83+
expect(connection).to.exist()
84+
85+
return Promise.all([
86+
pWaitFor(() => psA.peers.size === 1),
87+
pWaitFor(() => psB.peers.size === 1)
88+
])
89+
})
90+
91+
it('should receive pubsub messages', async () => {
92+
const defer = pDefer()
93+
const topic = 'test-topic'
94+
const data = 'hey!'
95+
96+
await psA._libp2p.dial(psB.peerId)
97+
98+
let subscribedTopics = psA.getTopics()
99+
expect(subscribedTopics).to.not.include(topic)
100+
101+
psA.on(topic, (msg) => {
102+
expect(uint8ArrayToString(msg.data)).to.equal(data)
103+
defer.resolve()
104+
})
105+
psA.subscribe(topic)
106+
107+
subscribedTopics = psA.getTopics()
108+
expect(subscribedTopics).to.include(topic)
109+
110+
// wait for psB to know about psA subscription
111+
await pWaitFor(() => {
112+
const subscribedPeers = psB.getSubscribers(topic)
113+
return subscribedPeers.includes(psA.peerId.toB58String())
114+
})
115+
psB.publish(topic, data)
116+
117+
await defer.promise
118+
})
119+
})
120+
121+
describe('pubsub started after connect', () => {
122+
// Create pubsub nodes
123+
beforeEach(async () => {
124+
[psA, psB] = await common.setup(2)
125+
})
126+
127+
afterEach(async () => {
128+
sinon.restore()
129+
130+
psA && psA.stop()
131+
psB && psB.stop()
132+
133+
await common.teardown()
134+
})
135+
136+
it('should get notified of connected peers after starting', async () => {
137+
const connection = await psA._libp2p.dial(psB.peerId)
138+
expect(connection).to.exist()
139+
expect(psA.peers.size).to.be.eql(0)
140+
expect(psB.peers.size).to.be.eql(0)
141+
142+
psA.start()
143+
psB.start()
144+
145+
return Promise.all([
146+
pWaitFor(() => psA.peers.size === 1),
147+
pWaitFor(() => psB.peers.size === 1)
148+
])
149+
})
150+
151+
it('should receive pubsub messages', async () => {
152+
const defer = pDefer()
153+
const topic = 'test-topic'
154+
const data = 'hey!'
155+
156+
await psA._libp2p.dial(psB.peerId)
157+
158+
psA.start()
159+
psB.start()
160+
161+
await Promise.all([
162+
pWaitFor(() => psA.peers.size === 1),
163+
pWaitFor(() => psB.peers.size === 1)
164+
])
165+
166+
let subscribedTopics = psA.getTopics()
167+
expect(subscribedTopics).to.not.include(topic)
168+
169+
psA.on(topic, (msg) => {
170+
expect(uint8ArrayToString(msg.data)).to.equal(data)
171+
defer.resolve()
172+
})
173+
psA.subscribe(topic)
174+
175+
subscribedTopics = psA.getTopics()
176+
expect(subscribedTopics).to.include(topic)
177+
178+
// wait for psB to know about psA subscription
179+
await pWaitFor(() => {
180+
const subscribedPeers = psB.getSubscribers(topic)
181+
return subscribedPeers.includes(psA.peerId.toB58String())
182+
})
183+
psB.publish(topic, data)
184+
185+
await defer.promise
186+
})
187+
})
188+
189+
describe('pubsub with intermittent connections', () => {
190+
// Create pubsub nodes and start them
191+
beforeEach(async () => {
192+
[psA, psB] = await common.setup(2)
193+
194+
psA.start()
195+
psB.start()
196+
})
197+
198+
afterEach(async () => {
199+
sinon.restore()
200+
201+
psA && psA.stop()
202+
psB && psB.stop()
203+
204+
await common.teardown()
205+
})
206+
207+
it('should receive pubsub messages after a node restart', async function () {
208+
this.timeout(10e3)
209+
const topic = 'test-topic'
210+
const data = 'hey!'
211+
const psAid = psA.peerId.toB58String()
212+
213+
let counter = 0
214+
const defer1 = pDefer()
215+
const defer2 = pDefer()
216+
217+
await psA._libp2p.dial(psB.peerId)
218+
219+
let subscribedTopics = psA.getTopics()
220+
expect(subscribedTopics).to.not.include(topic)
221+
222+
psA.on(topic, (msg) => {
223+
expect(uint8ArrayToString(msg.data)).to.equal(data)
224+
counter++
225+
counter === 1 ? defer1.resolve() : defer2.resolve()
226+
})
227+
psA.subscribe(topic)
228+
229+
subscribedTopics = psA.getTopics()
230+
expect(subscribedTopics).to.include(topic)
231+
232+
// wait for psB to know about psA subscription
233+
await pWaitFor(() => {
234+
const subscribedPeers = psB.getSubscribers(topic)
235+
return subscribedPeers.includes(psAid)
236+
})
237+
psB.publish(topic, data)
238+
239+
await defer1.promise
240+
241+
psB.stop()
242+
await psB._libp2p.stop()
243+
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
244+
await psB._libp2p.start()
245+
psB.start()
246+
247+
psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
248+
await psA._libp2p.dial(psB.peerId)
249+
250+
// wait for remoteLibp2p to know about libp2p subscription
251+
await pWaitFor(() => {
252+
const subscribedPeers = psB.getSubscribers(topic)
253+
return subscribedPeers.includes(psAid)
254+
})
255+
256+
psB.publish(topic, data)
257+
258+
await defer2.promise
259+
})
260+
261+
it('should handle quick reconnects with a delayed disconnect', async () => {
262+
// Subscribe on both
263+
const handlerSpy = sinon.spy()
264+
const topic = 'reconnect-channel'
265+
266+
psA.on(topic, handlerSpy)
267+
psB.on(topic, handlerSpy)
268+
await Promise.all([
269+
psA.subscribe(topic),
270+
psB.subscribe(topic)
271+
])
272+
273+
// Create two connections to the remote peer
274+
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
275+
// second connection
276+
await psA._libp2p.dialer.connectToPeer(psB.peerId)
277+
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)
278+
279+
// Wait for subscriptions to occur
280+
await pWaitFor(() => {
281+
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
282+
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
283+
})
284+
285+
// Verify messages go both ways
286+
psA.publish(topic, 'message1')
287+
psB.publish(topic, 'message2')
288+
await pWaitFor(() => handlerSpy.callCount >= 2)
289+
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
290+
291+
// Disconnect the first connection (this acts as a delayed reconnect)
292+
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')
293+
294+
await originalConnection.close()
295+
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)
296+
297+
// Verify messages go both ways after the disconnect
298+
handlerSpy.resetHistory()
299+
psA.publish(topic, 'message3')
300+
psB.publish(topic, 'message4')
301+
await pWaitFor(() => handlerSpy.callCount >= 2)
302+
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
303+
})
304+
})
305+
})
306+
}

src/pubsub/tests/emit-self.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
/* eslint-env mocha */
33
'use strict'
44

5-
const chai = require('chai')
6-
const { expect } = chai
5+
const { expect } = require('aegir/utils/chai')
76
const sinon = require('sinon')
87

98
const uint8ArrayFromString = require('uint8arrays/from-string')

src/pubsub/tests/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
const apiTest = require('./api')
66
const emitSelfTest = require('./emit-self')
77
const messagesTest = require('./messages')
8+
const connectionHandlersTest = require('./connection-handlers')
89
const twoNodesTest = require('./two-nodes')
910
const multipleNodesTest = require('./multiple-nodes')
1011

@@ -13,6 +14,7 @@ module.exports = (common) => {
1314
apiTest(common)
1415
emitSelfTest(common)
1516
messagesTest(common)
17+
connectionHandlersTest(common)
1618
twoNodesTest(common)
1719
multipleNodesTest(common)
1820
})

src/pubsub/tests/messages.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
/* eslint-env mocha */
33
'use strict'
44

5-
const chai = require('chai')
6-
const { expect } = chai
5+
const { expect } = require('aegir/utils/chai')
76
const sinon = require('sinon')
87

98
const PeerId = require('peer-id')

src/pubsub/tests/multiple-nodes.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
/* eslint max-nested-callbacks: ["error", 6] */
44
'use strict'
55

6-
const chai = require('chai')
7-
const { expect } = chai
6+
const { expect } = require('aegir/utils/chai')
87
const sinon = require('sinon')
98

109
const delay = require('delay')

0 commit comments

Comments
 (0)