Skip to content

Commit 099810e

Browse files
committed
feat(example): add P2P messaging application with topic subscription
1 parent ed93657 commit 099810e

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

cmd/example/main.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// Package main provides an example P2P messaging application with topic subscription and broadcasting.
2+
package main
3+
4+
import (
5+
"context"
6+
"encoding/json"
7+
"flag"
8+
"fmt"
9+
"os"
10+
"os/signal"
11+
"strings"
12+
"syscall"
13+
"time"
14+
15+
p2p "github.com/bsv-blockchain/go-p2p-message-bus"
16+
"github.com/libp2p/go-libp2p/core/crypto"
17+
)
18+
19+
func main() {
20+
name := flag.String("name", "", "Your node name")
21+
privateKey := flag.String("key", "", "Private key hex (will generate if not provided)")
22+
topics := flag.String("topics", "broadcast_p2p_poc", "Comma-separated list of topics to subscribe to")
23+
port := flag.Int("port", 0, "port to listen on (0 for random)")
24+
noBroadcast := flag.Bool("no-broadcast", false, "Disable message broadcasting")
25+
prettyJSON := flag.Bool("pretty-json", false, "Pretty print JSON messages")
26+
relays := flag.String("relays", "", "Comma-separated list of relay peer multiaddrs (e.g., /ip4/1.2.3.4/tcp/4001/p2p/PeerID)")
27+
28+
flag.Parse()
29+
30+
logger := &p2p.DefaultLogger{}
31+
32+
if *name == "" {
33+
logger.Errorf("--name flag is required")
34+
return
35+
}
36+
37+
ctx, cancel := context.WithCancel(context.Background())
38+
defer cancel()
39+
40+
// Get or generate private key
41+
privKey, err := getOrGeneratePrivateKey(*privateKey, logger)
42+
if err != nil {
43+
logger.Errorf("Failed to get private key: %v", err)
44+
return
45+
}
46+
47+
// Parse relay peers list
48+
relayPeers := parseRelayPeers(*relays)
49+
50+
// Create P2P client
51+
client, err := p2p.NewClient(p2p.Config{
52+
Name: *name,
53+
Logger: logger,
54+
PrivateKey: privKey,
55+
Port: *port,
56+
PeerCacheFile: "peer_cache.json", // Enable peer persistence
57+
RelayPeers: relayPeers,
58+
})
59+
if err != nil {
60+
logger.Errorf("Failed to create P2P client: %v", err)
61+
return
62+
}
63+
64+
defer func() {
65+
if err := client.Close(); err != nil {
66+
logger.Errorf("Failed to close client: %v", err)
67+
}
68+
}()
69+
70+
// Parse and subscribe to topics
71+
topicList := parseTopics(*topics)
72+
allMsgChan := subscribeToTopics(client, topicList, logger)
73+
74+
// Start message receiver
75+
go receiveMessages(allMsgChan, *prettyJSON, logger)
76+
77+
// Start message broadcaster (publishes to all topics)
78+
if !*noBroadcast {
79+
go broadcastMessages(ctx, client, topicList, *name, logger)
80+
}
81+
82+
// Periodically display peer information
83+
go displayPeers(ctx, client, logger)
84+
85+
logger.Infof("P2P client started. Press Ctrl+C to exit")
86+
87+
// Wait for shutdown signal
88+
sigChan := make(chan os.Signal, 1)
89+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
90+
<-sigChan
91+
92+
logger.Infof("\nShutting down...")
93+
cancel()
94+
}
95+
96+
// Helper functions
97+
98+
func getOrGeneratePrivateKey(keyHex string, logger *p2p.DefaultLogger) (crypto.PrivKey, error) {
99+
if keyHex == "" {
100+
keyHex = os.Getenv("P2P_PRIVATE_KEY")
101+
}
102+
103+
if keyHex == "" {
104+
privKey, err := p2p.GeneratePrivateKey()
105+
if err != nil {
106+
return nil, err
107+
}
108+
newKeyHex, _ := p2p.PrivateKeyToHex(privKey)
109+
logger.Infof("Generated new private key: %s\n", newKeyHex)
110+
logger.Infof("Save this key and use it next time with --key flag or P2P_PRIVATE_KEY env var")
111+
return privKey, nil
112+
}
113+
114+
return p2p.PrivateKeyFromHex(keyHex)
115+
}
116+
117+
func parseRelayPeers(relays string) []string {
118+
if relays == "" {
119+
return nil
120+
}
121+
122+
parts := strings.Split(relays, ",")
123+
relayPeers := make([]string, 0, len(parts))
124+
for _, r := range parts {
125+
relayPeers = append(relayPeers, strings.TrimSpace(r))
126+
}
127+
return relayPeers
128+
}
129+
130+
func parseTopics(topics string) []string {
131+
topicList := strings.Split(topics, ",")
132+
for i, t := range topicList {
133+
topicList[i] = strings.TrimSpace(t)
134+
}
135+
return topicList
136+
}
137+
138+
func subscribeToTopics(client p2p.Client, topics []string, logger *p2p.DefaultLogger) chan p2p.Message {
139+
allMsgChan := make(chan p2p.Message, 100)
140+
141+
for _, topic := range topics {
142+
msgChan := client.Subscribe(topic)
143+
logger.Infof("Subscribed to topic: %s", topic)
144+
145+
go func(ch <-chan p2p.Message) {
146+
for msg := range ch {
147+
allMsgChan <- msg
148+
}
149+
}(msgChan)
150+
}
151+
152+
return allMsgChan
153+
}
154+
155+
func receiveMessages(msgChan <-chan p2p.Message, prettyJSON bool, logger *p2p.DefaultLogger) {
156+
for msg := range msgChan {
157+
data := formatMessageData(msg.Data, prettyJSON)
158+
logger.Infof("[%-52s] %s: (%s)\n%s", msg.FromID, msg.From, msg.Topic, data)
159+
}
160+
}
161+
162+
func formatMessageData(data []byte, prettyJSON bool) string {
163+
if !prettyJSON {
164+
return string(data)
165+
}
166+
167+
var jsonObj interface{}
168+
if err := json.Unmarshal(data, &jsonObj); err == nil {
169+
if jsonBytes, err := json.MarshalIndent(jsonObj, "", " "); err == nil {
170+
return string(jsonBytes)
171+
}
172+
}
173+
return string(data)
174+
}
175+
176+
func broadcastMessages(ctx context.Context, client p2p.Client, topics []string, name string, logger *p2p.DefaultLogger) {
177+
counter := 0
178+
ticker := time.NewTicker(1 * time.Second)
179+
defer ticker.Stop()
180+
181+
for {
182+
select {
183+
case <-ctx.Done():
184+
return
185+
case <-ticker.C:
186+
counter++
187+
data := fmt.Sprintf("Message #%d", counter)
188+
189+
for _, topic := range topics {
190+
if err := client.Publish(ctx, topic, []byte(data)); err != nil {
191+
return
192+
}
193+
}
194+
logger.Infof("[%-52s] %s: %s\n", "local", name, data)
195+
}
196+
}
197+
}
198+
199+
func displayPeers(ctx context.Context, client p2p.Client, logger *p2p.DefaultLogger) {
200+
ticker := time.NewTicker(10 * time.Second)
201+
defer ticker.Stop()
202+
203+
for {
204+
select {
205+
case <-ctx.Done():
206+
return
207+
case <-ticker.C:
208+
peers := client.GetPeers()
209+
if len(peers) > 0 {
210+
sb := strings.Builder{}
211+
sb.WriteString(fmt.Sprintf("\n=== Connected Peers: %d ===\n", len(peers)))
212+
for _, peer := range peers {
213+
sb.WriteString(fmt.Sprintf(" - %s [%s]\n", peer.Name, peer.ID))
214+
for _, addr := range peer.Addrs {
215+
sb.WriteString(fmt.Sprintf(" %s\n", addr))
216+
}
217+
}
218+
logger.Infof("%s", sb.String())
219+
}
220+
}
221+
}
222+
}

0 commit comments

Comments
 (0)