Skip to content

Commit f71621d

Browse files
committed
feat: add bootstrap peer reconnection and improve disconnect handling for topic peers
1 parent 65072a1 commit f71621d

File tree

1 file changed

+46
-18
lines changed

1 file changed

+46
-18
lines changed

main.go

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
)
3131

3232
const (
33-
topicName = "broadcast_p2p_poc"
34-
peerCacheFile = "peer_cache.json"
33+
topicName = "broadcast_p2p_poc"
34+
peerCacheFile = "peer_cache.json"
3535
)
3636

3737
type Message struct {
@@ -215,8 +215,15 @@ func main() {
215215
monitorConnectionUpgrade(conn)
216216
},
217217
DisconnectedF: func(n network.Network, conn network.Conn) {
218-
fmt.Printf("\n[DISCONNECTED] Lost connection to %s\n", conn.RemotePeer().String()[:16])
219-
fmt.Printf(" Will attempt reconnection via cached peers and discovery...\n\n")
218+
peerID := conn.RemotePeer()
219+
topicPeers := topic.ListPeers()
220+
for _, tp := range topicPeers {
221+
if tp == peerID {
222+
fmt.Printf("\n[DISCONNECTED] Lost connection to topic peer %s\n", peerID.String()[:16])
223+
fmt.Printf(" Will attempt reconnection via cached peers and discovery...\n\n")
224+
return
225+
}
226+
}
220227
},
221228
})
222229

@@ -232,6 +239,8 @@ func main() {
232239

233240
go maintainPeerConnections(ctx, h, topic)
234241

242+
go maintainBootstrapConnections(ctx, h, bootstrapPeers)
243+
235244
fmt.Println("Press Ctrl+C to exit")
236245

237246
sigChan := make(chan os.Signal, 1)
@@ -622,7 +631,6 @@ func maintainPeerConnections(ctx context.Context, h host.Host, topic *pubsub.Top
622631
case <-ctx.Done():
623632
return
624633
case <-ticker.C:
625-
topicPeers := topic.ListPeers()
626634
cachedPeers := loadPeerCache()
627635

628636
for _, cp := range cachedPeers {
@@ -631,24 +639,12 @@ func maintainPeerConnections(ctx context.Context, h host.Host, topic *pubsub.Top
631639
continue
632640
}
633641

634-
isTopicPeer := false
635-
for _, tp := range topicPeers {
636-
if tp == peerID {
637-
isTopicPeer = true
638-
break
639-
}
640-
}
641-
642-
if !isTopicPeer {
643-
continue
644-
}
645-
646642
connectedness := h.Network().Connectedness(peerID)
647643
if connectedness == network.Connected {
648644
continue
649645
}
650646

651-
fmt.Printf("\n[RECONNECT] Attempting to reconnect to %s (%s)...\n", cp.Name, peerID.String()[:16])
647+
fmt.Printf("\n[RECONNECT] Attempting to reconnect to topic peer %s (%s)...\n", cp.Name, peerID.String()[:16])
652648

653649
var maddrs []multiaddr.Multiaddr
654650
for _, addrStr := range cp.Addrs {
@@ -675,3 +671,35 @@ func maintainPeerConnections(ctx context.Context, h host.Host, topic *pubsub.Top
675671
}
676672
}
677673
}
674+
675+
func maintainBootstrapConnections(ctx context.Context, h host.Host, bootstrapPeers []string) {
676+
ticker := time.NewTicker(30 * time.Second)
677+
defer ticker.Stop()
678+
679+
for {
680+
select {
681+
case <-ctx.Done():
682+
return
683+
case <-ticker.C:
684+
for _, addr := range bootstrapPeers {
685+
maddr, err := multiaddr.NewMultiaddr(addr)
686+
if err != nil {
687+
continue
688+
}
689+
690+
peerInfo, err := peer.AddrInfoFromP2pAddr(maddr)
691+
if err != nil {
692+
continue
693+
}
694+
695+
if h.Network().Connectedness(peerInfo.ID) != network.Connected {
696+
if err := h.Connect(ctx, *peerInfo); err != nil {
697+
log.Printf("Failed to maintain bootstrap connection to %s: %v", peerInfo.ID.String()[:16], err)
698+
} else {
699+
fmt.Printf("\n[BOOTSTRAP] Reconnected to %s to maintain NAT mapping\n\n", peerInfo.ID.String()[:16])
700+
}
701+
}
702+
}
703+
}
704+
}
705+
}

0 commit comments

Comments
 (0)