@@ -725,6 +725,11 @@ func TestServer_blockFoundCh_triggersCatchupCh(t *testing.T) {
725725 blockFoundCh := make (chan processBlockFound , 10 )
726726 catchupCh := make (chan processBlockCatchup , 10 )
727727
728+ nearForkThreshold := uint32 (tSettings .ChainCfgParams .CoinbaseMaturity / 2 )
729+ if tSettings .BlockValidation .NearForkThreshold > 0 {
730+ nearForkThreshold = uint32 (tSettings .BlockValidation .NearForkThreshold )
731+ }
732+
728733 baseServer := & Server {
729734 logger : ulogger.TestLogger {},
730735 settings : tSettings ,
@@ -738,39 +743,72 @@ func TestServer_blockFoundCh_triggersCatchupCh(t *testing.T) {
738743 txStore : nil ,
739744 utxoStore : nil ,
740745 blockPriorityQueue : NewBlockPriorityQueue (ulogger.TestLogger {}),
746+ blockClassifier : NewBlockClassifier (ulogger.TestLogger {}, nearForkThreshold , mockBlockchain ),
741747 processBlockNotify : ttlcache .New [chainhash.Hash , bool ](),
742748 catchupAlternatives : ttlcache .New [chainhash.Hash , []processBlockCatchup ](),
743749 }
744750
745751 defer baseServer .processBlockNotify .Stop ()
746752
753+ // Prevent worker goroutines started during Init from draining channels we inspect
754+ testPQ := baseServer .blockPriorityQueue
755+ baseServer .blockPriorityQueue = nil
756+ baseServer .forkManager .SetPriorityQueue (nil )
757+
747758 err = baseServer .Init (ctx )
748759 require .NoError (t , err )
749760
750- // Fill blockFoundCh to trigger the catchup path - send enough blocks so that
751- // when workers consume them, len(blockFoundCh) > 3 remains for threshold check
752- // With 10 concurrent workers on CI, need many more blocks to ensure len > 3
753- // when checked. Send 5 blocks to overwhelm the workers.
754- go func () {
755- for i := 0 ; i < 5 ; i ++ {
756- blockFoundCh <- processBlockFound {
757- hash : dummyBlock .Hash (),
758- baseURL : fmt .Sprintf ("http://peer%d" , i ),
759- errCh : make (chan error , 1 ),
760- }
761- }
762- }()
761+ baseServer .blockPriorityQueue = testPQ
762+ baseServer .forkManager .SetPriorityQueue (testPQ )
763+
764+ // Stop background goroutines (legacy workers & catchup processor) for deterministic assertions
765+ cancel ()
766+ processingCtx := context .Background ()
767+
768+ // Pre-fill queue so queueSize > 10, forcing catchup path without relying on len(blockFoundCh)
769+ prefillBlockPriorityQueueForTest (t , testPQ , 12 )
770+
771+ err = baseServer .processBlockFoundChannel (processingCtx , processBlockFound {
772+ hash : dummyBlock .Hash (),
773+ baseURL : "http://peer0" ,
774+ errCh : make (chan error , 1 ),
775+ })
776+ require .NoError (t , err )
763777
764778 select {
765779 case got := <- catchupCh :
766780 assert .NotNil (t , got .block )
767- // With multiple blocks sent, any peer URL is valid
768- assert .Contains (t , got .baseURL , "http://peer" )
781+ assert .Equal (t , "http://peer0" , got .baseURL )
769782 case <- time .After (5 * time .Second ):
770783 t .Fatal ("processBlockFoundChannel did not put anything on catchupCh" )
771784 }
772785}
773786
787+ func prefillBlockPriorityQueueForTest (t * testing.T , pq * BlockPriorityQueue , count int ) {
788+ t .Helper ()
789+
790+ pq .mu .Lock ()
791+ defer pq .mu .Unlock ()
792+
793+ for i := 0 ; i < count ; i ++ {
794+ hash := chainhash .DoubleHashH ([]byte (fmt .Sprintf ("prefill-%d" , i )))
795+ hashCopy := hash
796+
797+ item := & PrioritizedBlock {
798+ blockFound : processBlockFound {
799+ hash : & hashCopy ,
800+ baseURL : fmt .Sprintf ("http://prefill-%d" , i ),
801+ },
802+ priority : PriorityDeepFork ,
803+ height : uint32 (i ),
804+ timestamp : time .Now (),
805+ }
806+
807+ pq .items = append (pq .items , item )
808+ pq .hashIndex [* item .blockFound .hash ] = item
809+ }
810+ }
811+
774812func TestServer_blockFoundCh_triggersCatchupCh_BlockLocator (t * testing.T ) {
775813 t .Skip ("Skipping test that hangs - needs proper cleanup" )
776814 initPrometheusMetrics ()
0 commit comments