Skip to content

Commit 5fb0df4

Browse files
ordishsoskarszoon
andauthored
refactor(rpc,p2p,daemon): improve peer client clarity and type safety (#201)
Co-authored-by: oskarszoon <1449115+oskarszoon@users.noreply.github.com>
1 parent 2d37b57 commit 5fb0df4

File tree

11 files changed

+373
-203
lines changed

11 files changed

+373
-203
lines changed

daemon/daemon_services.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
452452
return err
453453
}
454454

455-
peerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings)
455+
legacyPeerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings)
456456
if err != nil {
457457
return err
458458
}
@@ -489,7 +489,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
489489
// Create the RPC server with the necessary parts
490490
var rpcServer *rpc.RPCServer
491491

492-
rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, peerClient, p2pClient, txStore, validatorClient)
492+
rpcServer, err = rpc.NewServer(createLogger(loggerRPC), appSettings, blockchainClient, blockValidationClient, utxoStore, blockAssemblyClient, legacyPeerClient, p2pClient, txStore, validatorClient)
493493
if err != nil {
494494
return err
495495
}

daemon/test_daemon.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/bsv-blockchain/teranode/test/utils/wait"
4848
"github.com/bsv-blockchain/teranode/ulogger"
4949
"github.com/bsv-blockchain/teranode/util"
50+
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
5051
"github.com/stretchr/testify/assert"
5152
"github.com/stretchr/testify/require"
5253
tc "github.com/testcontainers/testcontainers-go/modules/compose"
@@ -109,7 +110,7 @@ func (je *JSONError) Error() string {
109110

110111
// NewTestDaemon creates a new TestDaemon instance with the provided options.
111112
func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon {
112-
ctx, cancel := context.WithCancel(context.Background())
113+
ctx, cancel := context.WithCancel(t.Context())
113114

114115
var (
115116
composeDependencies tc.ComposeStack
@@ -460,8 +461,13 @@ func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon {
460461
blockAssembler, ok := blockAssemblyService.(*blockassembly.BlockAssembly)
461462
require.True(t, ok)
462463

464+
assetURL := fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort)
465+
if appSettings.Asset.APIPrefix != "" {
466+
assetURL += appSettings.Asset.APIPrefix
467+
}
468+
463469
return &TestDaemon{
464-
AssetURL: fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort),
470+
AssetURL: assetURL,
465471
BlockAssembler: blockAssembler.GetBlockAssembler(),
466472
BlockAssemblyClient: blockAssemblyClient,
467473
BlockValidationClient: blockValidationClient,
@@ -1294,6 +1300,13 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model
12941300
}
12951301
}
12961302

1303+
func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) {
1304+
require.Eventually(t, func() bool {
1305+
_, err := td.BlockchainClient.GetBlock(td.Ctx, blockHash)
1306+
return err == nil
1307+
}, timeout, 100*time.Millisecond, "Timeout waiting for block with hash %s", blockHash.String())
1308+
}
1309+
12971310
func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) {
12981311
ctx, cancel := context.WithTimeout(td.Ctx, timeout)
12991312
defer cancel()
@@ -1865,6 +1878,25 @@ func (td *TestDaemon) DisconnectFromPeer(t *testing.T, peer *TestDaemon) {
18651878
require.NoError(t, err, "Failed to disconnect from peer")
18661879
}
18671880

1881+
func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) {
1882+
peerID, err := libp2pPeer.Decode(peer.Settings.P2P.PeerID)
1883+
require.NoError(t, err, "Failed to decode peer ID")
1884+
1885+
p2pService, err := td.d.ServiceManager.GetService("P2P")
1886+
require.NoError(t, err, "Failed to get P2P service")
1887+
1888+
p2pServer, ok := p2pService.(*p2p.Server)
1889+
require.True(t, ok, "Failed to cast P2P service to Server")
1890+
1891+
// Inject my peer info to other peer...
1892+
header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx)
1893+
require.NoError(t, err, "Failed to get best block header")
1894+
1895+
p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash())
1896+
1897+
t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID)
1898+
}
1899+
18681900
func peerAddress(peer *TestDaemon) string {
18691901
return fmt.Sprintf("/dns/127.0.0.1/tcp/%d/p2p/%s", peer.Settings.P2P.Port, peer.Settings.P2P.PeerID)
18701902
}

services/p2p/Server.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,15 +1390,19 @@ func (s *Server) processBlockchainNotification(ctx context.Context, notification
13901390
return errors.NewError(fmt.Sprintf("error getting chainhash from notification hash %s: %%w", notification.Hash), err)
13911391
}
13921392

1393-
s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
1394-
13951393
switch notification.Type {
13961394
case model.NotificationType_Block:
1395+
s.logger.Infof("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
13971396
return s.handleBlockNotification(ctx, hash) // These handlers return wrapped errors
1397+
13981398
case model.NotificationType_Subtree:
1399+
s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
13991400
return s.handleSubtreeNotification(ctx, hash)
1401+
14001402
case model.NotificationType_PeerFailure:
1403+
s.logger.Debugf("[processBlockchainNotification] Processing %s notification: %s", notification.Type, hash.String())
14011404
return s.handlePeerFailureNotification(ctx, notification)
1405+
14021406
default:
14031407
s.logger.Warnf("[processBlockchainNotification] Received unhandled notification type: %s for hash %s", notification.Type, hash.String())
14041408
}

services/p2p/server_helpers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,18 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string, height uint
480480
}
481481
}
482482

483+
// InjectPeerForTesting directly injects a peer into the registry for testing purposes.
484+
// This method allows deterministic peer setup without requiring actual P2P network connections.
485+
func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash *chainhash.Hash) {
486+
if s.peerRegistry == nil {
487+
return
488+
}
489+
490+
s.peerRegistry.Put(peerID, clientName, height, blockHash, dataHubURL)
491+
492+
s.peerRegistry.UpdateStorage(peerID, "full")
493+
}
494+
483495
func (s *Server) removePeer(peerID peer.ID) {
484496
if s.peerRegistry != nil {
485497
// Mark as disconnected before removing

services/p2p/sync_coordinator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,13 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) {
503503
func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) {
504504
defer sc.wg.Done()
505505

506-
ticker := time.NewTicker(30 * time.Second)
506+
interval := sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval
507+
if interval <= 0 {
508+
sc.logger.Warnf("[SyncCoordinator] Invalid periodic evaluation interval %v, using default 30s", interval)
509+
interval = 30 * time.Second
510+
}
511+
512+
ticker := time.NewTicker(interval)
507513
defer ticker.Stop()
508514

509515
for {

services/rpc/Server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,9 @@ type RPCServer struct {
669669
// Used for mining-related RPC commands like getminingcandidate and generate
670670
blockAssemblyClient blockassembly.ClientI
671671

672-
// peerClient provides access to legacy peer network services
672+
// legacyP2PClient provides access to legacy peer network services
673673
// Used for peer management and information retrieval
674-
peerClient peer.ClientI
674+
legacyP2PClient peer.ClientI
675675

676676
// p2pClient provides access to the P2P network services
677677
// Used for modern peer management and network operations
@@ -1384,7 +1384,7 @@ func (s *RPCServer) Start(ctx context.Context, readyCh chan<- struct{}) error {
13841384
// Returns:
13851385
// - *RPCServer: Configured server instance ready for initialization
13861386
// - error: Any error encountered during configuration
1387-
func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, peerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) {
1387+
func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainClient blockchain.ClientI, blockValidationClient blockvalidation.Interface, utxoStore utxo.Store, blockAssemblyClient blockassembly.ClientI, legacyPeerClient peer.ClientI, p2pClient p2p.ClientI, txStore blob.Store, validatorClient validator.Interface) (*RPCServer, error) {
13881388
initPrometheusMetrics()
13891389

13901390
assetHTTPAddress := tSettings.Asset.HTTPAddress
@@ -1409,7 +1409,7 @@ func NewServer(logger ulogger.Logger, tSettings *settings.Settings, blockchainCl
14091409
helpCacher: newHelpCacher(),
14101410
utxoStore: utxoStore,
14111411
blockAssemblyClient: blockAssemblyClient,
1412-
peerClient: peerClient,
1412+
legacyP2PClient: legacyPeerClient,
14131413
p2pClient: p2pClient,
14141414
txStore: txStore,
14151415
validatorClient: validatorClient,

0 commit comments

Comments
 (0)