Skip to content

Commit 65072a1

Browse files
committed
feat: implement peer caching and auto-reconnect with hole punching
1 parent ed624c4 commit 65072a1

File tree

2 files changed

+221
-1
lines changed

2 files changed

+221
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
p2p_poc
2+
peer_cache.json

main.go

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"flag"
88
"fmt"
9+
"io"
910
"log"
1011
"os"
1112
"os/signal"
@@ -24,16 +25,26 @@ import (
2425
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
2526
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
2627
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
28+
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
2729
"github.com/multiformats/go-multiaddr"
2830
)
2931

30-
const topicName = "broadcast_p2p_poc"
32+
const (
33+
topicName = "broadcast_p2p_poc"
34+
peerCacheFile = "peer_cache.json"
35+
)
3136

3237
type Message struct {
3338
Name string `json:"name"`
3439
Counter int `json:"counter"`
3540
}
3641

42+
type CachedPeer struct {
43+
ID string `json:"id"`
44+
Name string `json:"name,omitempty"`
45+
Addrs []string `json:"addrs"`
46+
}
47+
3748
type PeerTracker struct {
3849
mu sync.RWMutex
3950
names map[peer.ID]string
@@ -152,6 +163,12 @@ func main() {
152163

153164
connectToBootstrapNodes(ctx, h, bootstrapPeers)
154165

166+
cachedPeers := loadPeerCache()
167+
if len(cachedPeers) > 0 {
168+
fmt.Printf("Connecting to %d cached peers...\n", len(cachedPeers))
169+
connectToCachedPeers(ctx, h, cachedPeers)
170+
}
171+
155172
ps, err := pubsub.NewGossipSub(ctx, h)
156173
if err != nil {
157174
log.Fatalf("Failed to create pubsub: %v", err)
@@ -195,9 +212,16 @@ func main() {
195212
h.Network().Notify(&network.NotifyBundle{
196213
ConnectedF: func(n network.Network, conn network.Conn) {
197214
monitorRelayActivity(conn, peerTracker)
215+
monitorConnectionUpgrade(conn)
216+
},
217+
DisconnectedF: func(n network.Network, conn network.Conn) {
218+
fmt.Printf("\n[DISCONNECTED] Lost connection to %s\n", conn.RemotePeer().String()[:16])
219+
fmt.Printf(" Will attempt reconnection via cached peers and discovery...\n\n")
198220
},
199221
})
200222

223+
subscribeToHolePunchEvents(ctx, h)
224+
201225
go discoverPeers(ctx, h, routingDiscovery)
202226

203227
go receiveMessages(ctx, sub, h.ID(), peerTracker)
@@ -206,6 +230,8 @@ func main() {
206230

207231
go printPeersPeriodically(ctx, h, topic, peerTracker)
208232

233+
go maintainPeerConnections(ctx, h, topic)
234+
209235
fmt.Println("Press Ctrl+C to exit")
210236

211237
sigChan := make(chan os.Signal, 1)
@@ -407,14 +433,19 @@ func printPeersPeriodically(ctx context.Context, h host.Host, topic *pubsub.Topi
407433

408434
relayCount := tracker.GetRelayCount()
409435
fmt.Printf("\n[Total connections: %d | Topic peers: %d | Acting as relay: %d]\n", len(allPeers), len(topicPeers), relayCount)
436+
437+
var cachedPeers []CachedPeer
410438
if len(topicPeers) > 0 {
411439
fmt.Println("Topic peers:")
412440
for _, p := range topicPeers {
413441
name := tracker.GetName(p)
414442
conns := h.Network().ConnsToPeer(p)
415443

444+
var peerAddrs []string
416445
for _, conn := range conns {
417446
addr := conn.RemoteMultiaddr().String()
447+
peerAddrs = append(peerAddrs, addr)
448+
418449
connType := "DIRECT"
419450
if isRelayedConnection(addr) {
420451
connType = "RELAYED"
@@ -425,8 +456,16 @@ func printPeersPeriodically(ctx context.Context, h host.Host, topic *pubsub.Topi
425456

426457
if len(conns) == 0 {
427458
fmt.Printf(" - %s (%s) [NO CONNECTION]\n", p.String(), name)
459+
} else {
460+
cachedPeers = append(cachedPeers, CachedPeer{
461+
ID: p.String(),
462+
Name: name,
463+
Addrs: peerAddrs,
464+
})
428465
}
429466
}
467+
468+
savePeerCache(cachedPeers)
430469
} else {
431470
fmt.Println(" (No peers on topic yet)")
432471
}
@@ -439,6 +478,81 @@ func isRelayedConnection(addr string) bool {
439478
return strings.Contains(addr, "/p2p-circuit/")
440479
}
441480

481+
func loadPeerCache() []CachedPeer {
482+
file, err := os.Open(peerCacheFile)
483+
if err != nil {
484+
if !os.IsNotExist(err) {
485+
log.Printf("Warning: failed to open peer cache: %v", err)
486+
}
487+
return nil
488+
}
489+
defer file.Close()
490+
491+
data, err := io.ReadAll(file)
492+
if err != nil {
493+
log.Printf("Warning: failed to read peer cache: %v", err)
494+
return nil
495+
}
496+
497+
var peers []CachedPeer
498+
if err := json.Unmarshal(data, &peers); err != nil {
499+
log.Printf("Warning: failed to parse peer cache: %v", err)
500+
return nil
501+
}
502+
503+
return peers
504+
}
505+
506+
func savePeerCache(peers []CachedPeer) {
507+
data, err := json.MarshalIndent(peers, "", " ")
508+
if err != nil {
509+
log.Printf("Warning: failed to marshal peer cache: %v", err)
510+
return
511+
}
512+
513+
if err := os.WriteFile(peerCacheFile, data, 0644); err != nil {
514+
log.Printf("Warning: failed to write peer cache: %v", err)
515+
}
516+
}
517+
518+
func connectToCachedPeers(ctx context.Context, h host.Host, cachedPeers []CachedPeer) {
519+
for _, cp := range cachedPeers {
520+
peerID, err := peer.Decode(cp.ID)
521+
if err != nil {
522+
log.Printf("Invalid cached peer ID %s: %v", cp.ID, err)
523+
continue
524+
}
525+
526+
if h.Network().Connectedness(peerID) == network.Connected {
527+
continue
528+
}
529+
530+
var maddrs []multiaddr.Multiaddr
531+
for _, addrStr := range cp.Addrs {
532+
maddr, err := multiaddr.NewMultiaddr(addrStr)
533+
if err != nil {
534+
continue
535+
}
536+
maddrs = append(maddrs, maddr)
537+
}
538+
539+
if len(maddrs) == 0 {
540+
continue
541+
}
542+
543+
addrInfo := peer.AddrInfo{
544+
ID: peerID,
545+
Addrs: maddrs,
546+
}
547+
548+
go func(ai peer.AddrInfo) {
549+
if err := h.Connect(ctx, ai); err == nil {
550+
fmt.Printf("Reconnected to cached peer: %s\n", ai.ID.String())
551+
}
552+
}(addrInfo)
553+
}
554+
}
555+
442556
func monitorRelayActivity(conn network.Conn, tracker *PeerTracker) {
443557
go func() {
444558
streams := conn.GetStreams()
@@ -456,3 +570,108 @@ func monitorRelayActivity(conn network.Conn, tracker *PeerTracker) {
456570
}
457571
}()
458572
}
573+
574+
func monitorConnectionUpgrade(conn network.Conn) {
575+
addr := conn.RemoteMultiaddr().String()
576+
if strings.Contains(addr, "/p2p-circuit/") {
577+
fmt.Printf("\n[RELAY CONNECTION] Connected via relay to %s\n", conn.RemotePeer().String()[:16])
578+
fmt.Printf(" Waiting for hole punch to establish direct connection...\n\n")
579+
}
580+
}
581+
582+
func subscribeToHolePunchEvents(ctx context.Context, h host.Host) {
583+
bus := h.EventBus()
584+
585+
sub, err := bus.Subscribe(new(holepunch.Event))
586+
if err != nil {
587+
log.Printf("Warning: failed to subscribe to hole punch events: %v", err)
588+
return
589+
}
590+
591+
go func() {
592+
defer sub.Close()
593+
for {
594+
select {
595+
case <-ctx.Done():
596+
return
597+
case evt := <-sub.Out():
598+
hpEvt, ok := evt.(holepunch.Event)
599+
if !ok {
600+
continue
601+
}
602+
603+
switch hpEvt.Type {
604+
case "StartHolePunch":
605+
fmt.Printf("\n[HOLE PUNCH] Starting hole punch with %s\n\n", hpEvt.Remote.String()[:16])
606+
case "EndHolePunch":
607+
fmt.Printf("\n[HOLE PUNCH] Completed attempt with %s (check connection type in peer list)\n\n", hpEvt.Remote.String()[:16])
608+
case "HolePunchAttempt":
609+
fmt.Printf("\n[HOLE PUNCH] Attempting direct connection to %s...\n\n", hpEvt.Remote.String()[:16])
610+
}
611+
}
612+
}
613+
}()
614+
}
615+
616+
func maintainPeerConnections(ctx context.Context, h host.Host, topic *pubsub.Topic) {
617+
ticker := time.NewTicker(30 * time.Second)
618+
defer ticker.Stop()
619+
620+
for {
621+
select {
622+
case <-ctx.Done():
623+
return
624+
case <-ticker.C:
625+
topicPeers := topic.ListPeers()
626+
cachedPeers := loadPeerCache()
627+
628+
for _, cp := range cachedPeers {
629+
peerID, err := peer.Decode(cp.ID)
630+
if err != nil {
631+
continue
632+
}
633+
634+
isTopicPeer := false
635+
for _, tp := range topicPeers {
636+
if tp == peerID {
637+
isTopicPeer = true
638+
break
639+
}
640+
}
641+
642+
if !isTopicPeer {
643+
continue
644+
}
645+
646+
connectedness := h.Network().Connectedness(peerID)
647+
if connectedness == network.Connected {
648+
continue
649+
}
650+
651+
fmt.Printf("\n[RECONNECT] Attempting to reconnect to %s (%s)...\n", cp.Name, peerID.String()[:16])
652+
653+
var maddrs []multiaddr.Multiaddr
654+
for _, addrStr := range cp.Addrs {
655+
maddr, err := multiaddr.NewMultiaddr(addrStr)
656+
if err != nil {
657+
continue
658+
}
659+
maddrs = append(maddrs, maddr)
660+
}
661+
662+
if len(maddrs) > 0 {
663+
addrInfo := peer.AddrInfo{
664+
ID: peerID,
665+
Addrs: maddrs,
666+
}
667+
668+
if err := h.Connect(ctx, addrInfo); err != nil {
669+
fmt.Printf(" Reconnection attempt failed: %v\n\n", err)
670+
} else {
671+
fmt.Printf(" Reconnected successfully!\n\n")
672+
}
673+
}
674+
}
675+
}
676+
}
677+
}

0 commit comments

Comments
 (0)