Skip to content

Commit a052b13

Browse files
committed
test(client): add tests for client publish and subscription behavior
1 parent 3251666 commit a052b13

File tree

1 file changed

+299
-0
lines changed

1 file changed

+299
-0
lines changed

client_messaging_test.go

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package p2p
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestClientSubscribeMultipleTopicsConcurrently(t *testing.T) {
13+
privKey, err := GeneratePrivateKey()
14+
require.NoError(t, err)
15+
16+
config := Config{
17+
Name: testPeerName,
18+
PrivateKey: privKey,
19+
}
20+
21+
cl, err := NewClient(config)
22+
require.NoError(t, err)
23+
defer func() {
24+
closeErr := cl.Close()
25+
require.NoError(t, closeErr)
26+
}()
27+
28+
// Subscribe to multiple topics concurrently
29+
topics := []string{"topic1", "topic2", "topic3", "topic4", "topic5"}
30+
channels := make([]<-chan Message, len(topics))
31+
32+
for i, topic := range topics {
33+
channels[i] = cl.Subscribe(topic)
34+
require.NotNil(t, channels[i])
35+
}
36+
37+
// Verify all channels are open
38+
for _, ch := range channels {
39+
select {
40+
case _, ok := <-ch:
41+
if !ok {
42+
t.Fatal("channel closed unexpectedly")
43+
}
44+
case <-time.After(100 * time.Millisecond):
45+
// Channel not closed, which is expected
46+
}
47+
}
48+
}
49+
50+
func TestClientPublishToUnsubscribedTopic(t *testing.T) {
51+
privKey, err := GeneratePrivateKey()
52+
require.NoError(t, err)
53+
54+
config := Config{
55+
Name: testPeerName,
56+
PrivateKey: privKey,
57+
}
58+
59+
cl, err := NewClient(config)
60+
require.NoError(t, err)
61+
defer func() {
62+
closeErr := cl.Close()
63+
require.NoError(t, closeErr)
64+
}()
65+
66+
ctx := context.Background()
67+
68+
// Publish to a topic we haven't subscribed to
69+
err = cl.Publish(ctx, "new-topic", []byte("test data"))
70+
require.NoError(t, err)
71+
}
72+
73+
func TestClientPublishWithCanceledContext(t *testing.T) {
74+
privKey, err := GeneratePrivateKey()
75+
require.NoError(t, err)
76+
77+
config := Config{
78+
Name: testPeerName,
79+
PrivateKey: privKey,
80+
}
81+
82+
cl, err := NewClient(config)
83+
require.NoError(t, err)
84+
defer func() {
85+
closeErr := cl.Close()
86+
require.NoError(t, closeErr)
87+
}()
88+
89+
// Create and immediately cancel context
90+
ctx, cancel := context.WithCancel(context.Background())
91+
cancel()
92+
93+
// Try to publish with canceled context
94+
err = cl.Publish(ctx, testTopicName, []byte("test"))
95+
// Should either succeed immediately or fail with context error
96+
// Either is acceptable behavior
97+
if err != nil {
98+
assert.Contains(t, err.Error(), "context")
99+
}
100+
}
101+
102+
func TestClientPublishEmptyData(t *testing.T) {
103+
privKey, err := GeneratePrivateKey()
104+
require.NoError(t, err)
105+
106+
config := Config{
107+
Name: testPeerName,
108+
PrivateKey: privKey,
109+
}
110+
111+
cl, err := NewClient(config)
112+
require.NoError(t, err)
113+
defer func() {
114+
closeErr := cl.Close()
115+
require.NoError(t, closeErr)
116+
}()
117+
118+
ctx := context.Background()
119+
120+
// Publish empty data
121+
err = cl.Publish(ctx, testTopicName, []byte{})
122+
require.NoError(t, err)
123+
124+
// Publish nil data
125+
err = cl.Publish(ctx, testTopicName, nil)
126+
require.NoError(t, err)
127+
}
128+
129+
func TestClientPublishLargeData(t *testing.T) {
130+
privKey, err := GeneratePrivateKey()
131+
require.NoError(t, err)
132+
133+
config := Config{
134+
Name: testPeerName,
135+
PrivateKey: privKey,
136+
}
137+
138+
cl, err := NewClient(config)
139+
require.NoError(t, err)
140+
defer func() {
141+
closeErr := cl.Close()
142+
require.NoError(t, closeErr)
143+
}()
144+
145+
ctx := context.Background()
146+
147+
// Publish large data (1MB)
148+
largeData := make([]byte, 1024*1024)
149+
for i := range largeData {
150+
largeData[i] = byte(i % 256)
151+
}
152+
153+
err = cl.Publish(ctx, testTopicName, largeData)
154+
require.NoError(t, err)
155+
}
156+
157+
func TestClientCloseClosesSubscriptionChannels(t *testing.T) {
158+
privKey, err := GeneratePrivateKey()
159+
require.NoError(t, err)
160+
161+
config := Config{
162+
Name: testPeerName,
163+
PrivateKey: privKey,
164+
}
165+
166+
cl, err := NewClient(config)
167+
require.NoError(t, err)
168+
169+
// Subscribe to multiple topics
170+
ch1 := cl.Subscribe("topic1")
171+
ch2 := cl.Subscribe("topic2")
172+
ch3 := cl.Subscribe("topic3")
173+
174+
// Give subscriptions time to initialize
175+
time.Sleep(200 * time.Millisecond)
176+
177+
// Close client
178+
err = cl.Close()
179+
require.NoError(t, err)
180+
181+
// All channels should be closed
182+
select {
183+
case _, ok := <-ch1:
184+
assert.False(t, ok, "channel 1 should be closed")
185+
case <-time.After(100 * time.Millisecond):
186+
t.Fatal("channel 1 not closed after client Close()")
187+
}
188+
189+
select {
190+
case _, ok := <-ch2:
191+
assert.False(t, ok, "channel 2 should be closed")
192+
case <-time.After(100 * time.Millisecond):
193+
t.Fatal("channel 2 not closed after client Close()")
194+
}
195+
196+
select {
197+
case _, ok := <-ch3:
198+
assert.False(t, ok, "channel 3 should be closed")
199+
case <-time.After(100 * time.Millisecond):
200+
t.Fatal("channel 3 not closed after client Close()")
201+
}
202+
}
203+
204+
func TestClientSubscribeContextCancellation(t *testing.T) {
205+
privKey, err := GeneratePrivateKey()
206+
require.NoError(t, err)
207+
208+
config := Config{
209+
Name: testPeerName,
210+
PrivateKey: privKey,
211+
}
212+
213+
cl, err := NewClient(config)
214+
require.NoError(t, err)
215+
216+
// Subscribe to a topic
217+
msgChan := cl.Subscribe("cancel-test")
218+
require.NotNil(t, msgChan)
219+
220+
// Give subscription time to start
221+
time.Sleep(100 * time.Millisecond)
222+
223+
// Close the client (which cancels the context)
224+
err = cl.Close()
225+
require.NoError(t, err)
226+
227+
// Channel should be closed
228+
select {
229+
case _, ok := <-msgChan:
230+
assert.False(t, ok, "channel should be closed")
231+
case <-time.After(100 * time.Millisecond):
232+
t.Fatal("channel not closed after context cancellation")
233+
}
234+
}
235+
236+
func TestClientPublishConcurrent(t *testing.T) {
237+
privKey, err := GeneratePrivateKey()
238+
require.NoError(t, err)
239+
240+
config := Config{
241+
Name: testPeerName,
242+
PrivateKey: privKey,
243+
}
244+
245+
cl, err := NewClient(config)
246+
require.NoError(t, err)
247+
defer func() {
248+
closeErr := cl.Close()
249+
require.NoError(t, closeErr)
250+
}()
251+
252+
ctx := context.Background()
253+
254+
// Publish concurrently from multiple goroutines to different topics
255+
// to avoid topic join race condition
256+
done := make(chan error, 10)
257+
for i := 0; i < 10; i++ {
258+
go func(n int) {
259+
data := []byte("test message")
260+
topicName := "test-topic-" + string(rune('0'+n))
261+
done <- cl.Publish(ctx, topicName, data)
262+
}(i)
263+
}
264+
265+
// Wait for all publishes
266+
for i := 0; i < 10; i++ {
267+
publishErr := <-done
268+
assert.NoError(t, publishErr)
269+
}
270+
}
271+
272+
func TestMessageStructFields(t *testing.T) {
273+
msg := Message{
274+
Topic: "test-topic",
275+
From: "test-peer",
276+
FromID: "12D3KooTest",
277+
Data: []byte("test data"),
278+
Timestamp: time.Now(),
279+
}
280+
281+
assert.Equal(t, "test-topic", msg.Topic)
282+
assert.Equal(t, "test-peer", msg.From)
283+
assert.Equal(t, "12D3KooTest", msg.FromID)
284+
assert.Equal(t, []byte("test data"), msg.Data)
285+
assert.False(t, msg.Timestamp.IsZero())
286+
}
287+
288+
func TestPeerInfoStructFields(t *testing.T) {
289+
info := PeerInfo{
290+
ID: "peer-id",
291+
Name: "peer-name",
292+
Addrs: []string{"/ip4/127.0.0.1/tcp/4001"},
293+
}
294+
295+
assert.Equal(t, "peer-id", info.ID)
296+
assert.Equal(t, "peer-name", info.Name)
297+
assert.Len(t, info.Addrs, 1)
298+
assert.Equal(t, "/ip4/127.0.0.1/tcp/4001", info.Addrs[0])
299+
}

0 commit comments

Comments
 (0)