Skip to content

Commit 587626c

Browse files
authored
subtree full fix (#237)
1 parent e2636ec commit 587626c

File tree

14 files changed

+470
-322
lines changed

14 files changed

+470
-322
lines changed

services/blockassembly/BlockAssembler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,9 @@ func (b *BlockAssembler) Start(ctx context.Context) (err error) {
743743
return errors.NewStorageError("[BlockAssembler] failed to load un-mined transactions: %v", err)
744744
}
745745

746+
// Start SubtreeProcessor goroutine after loading unmined transactions to avoid race conditions
747+
b.subtreeProcessor.Start(ctx)
748+
746749
if err = b.startChannelListeners(ctx); err != nil {
747750
return errors.NewProcessingError("[BlockAssembler] failed to start channel listeners: %v", err)
748751
}
@@ -870,12 +873,13 @@ func (b *BlockAssembler) AddTx(node subtree.Node, txInpoints subtree.TxInpoints)
870873
// RemoveTx removes a transaction from the block assembler.
871874
//
872875
// Parameters:
876+
// - ctx: Context for the removal operation
873877
// - hash: Hash of the transaction to remove
874878
//
875879
// Returns:
876880
// - error: Any error encountered during removal
877-
func (b *BlockAssembler) RemoveTx(hash chainhash.Hash) error {
878-
return b.subtreeProcessor.Remove(hash)
881+
func (b *BlockAssembler) RemoveTx(ctx context.Context, hash chainhash.Hash) error {
882+
return b.subtreeProcessor.Remove(ctx, hash)
879883
}
880884

881885
type resetRequest struct {

services/blockassembly/BlockAssembler_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ func setupBlockAssemblyTest(t *testing.T) *baTestItems {
655655

656656
// overwrite default subtree processor with a new one
657657
ba.subtreeProcessor, err = subtreeprocessor.NewSubtreeProcessor(
658-
context.Background(),
658+
t.Context(),
659659
ulogger.TestLogger{},
660660
ba.settings,
661661
nil,
@@ -669,10 +669,13 @@ func setupBlockAssemblyTest(t *testing.T) *baTestItems {
669669
// Ensure SubtreeProcessor is properly cleaned up when test ends
670670
t.Cleanup(func() {
671671
if ba.subtreeProcessor != nil {
672-
ba.subtreeProcessor.Close()
672+
ba.subtreeProcessor.Stop(context.Background())
673673
}
674674
})
675675

676+
// Start the subtree processor
677+
ba.subtreeProcessor.Start(t.Context())
678+
676679
items.blockAssembler = ba
677680

678681
return &items
@@ -1931,7 +1934,7 @@ func TestBlockAssembly_RemoveTx(t *testing.T) {
19311934
txHash := tx.TxIDChainHash()
19321935

19331936
// Since RemoveTx returns an error, we can test it
1934-
err := testItems.blockAssembler.RemoveTx(*txHash)
1937+
err := testItems.blockAssembler.RemoveTx(t.Context(), *txHash)
19351938
// The error might be that the tx doesn't exist, which is fine for this test
19361939
_ = err
19371940
})
@@ -1971,7 +1974,7 @@ func TestBlockAssembly_Start_InitStateFailures(t *testing.T) {
19711974
// Set skip wait for pending blocks
19721975
blockAssembler.SetSkipWaitForPendingBlocks(true)
19731976

1974-
err = blockAssembler.Start(context.Background())
1977+
err = blockAssembler.Start(t.Context())
19751978
require.Error(t, err)
19761979
assert.Contains(t, err.Error(), "failed to initialize state")
19771980
})
@@ -2021,7 +2024,7 @@ func TestBlockAssembly_Start_InitStateFailures(t *testing.T) {
20212024
// Set skip wait for pending blocks
20222025
blockAssembler.SetSkipWaitForPendingBlocks(true)
20232026

2024-
err = blockAssembler.Start(context.Background())
2027+
err = blockAssembler.Start(t.Context())
20252028
require.NoError(t, err)
20262029

20272030
// Verify state was properly initialized

services/blockassembly/Server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -709,13 +709,13 @@ func (ba *BlockAssembly) Start(ctx context.Context, readyCh chan<- struct{}) (er
709709
//
710710
// Returns:
711711
// - error: Any error encountered during shutdown
712-
func (ba *BlockAssembly) Stop(_ context.Context) error {
712+
func (ba *BlockAssembly) Stop(ctx context.Context) error {
713713
ba.stopOnce.Do(func() {
714714
ba.jobStore.Stop()
715715

716-
// Close the subtree processor to stop the announcement ticker and cleanup resources
716+
// Stop the subtree processor to stop the announcement ticker and cleanup resources
717717
if ba.blockAssembler != nil && ba.blockAssembler.subtreeProcessor != nil {
718-
ba.blockAssembler.subtreeProcessor.Close()
718+
ba.blockAssembler.subtreeProcessor.Stop(ctx)
719719
}
720720
})
721721

@@ -816,7 +816,7 @@ func (ba *BlockAssembly) RemoveTx(ctx context.Context, req *blockassembly_api.Re
816816
hash := chainhash.Hash(req.Txid)
817817

818818
if !ba.settings.BlockAssembly.Disabled {
819-
if err := ba.blockAssembler.RemoveTx(hash); err != nil {
819+
if err := ba.blockAssembler.RemoveTx(ctx, hash); err != nil {
820820
return nil, errors.WrapGRPC(err)
821821
}
822822
}

services/blockassembly/server_test.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func TestCheckBlockAssembly(t *testing.T) {
9393
t.Run("success", func(t *testing.T) {
9494
server, _, _, _ := setup(t)
9595

96+
// Start the block assembler so the subtree processor goroutine is running
97+
err := server.blockAssembler.Start(t.Context())
98+
require.NoError(t, err)
99+
96100
resp, err := server.CheckBlockAssembly(t.Context(), &blockassembly_api.EmptyMessage{})
97101
require.NoError(t, err)
98102

@@ -106,7 +110,7 @@ func TestCheckBlockAssembly(t *testing.T) {
106110

107111
mockSubtreeProcessor := &subtreeprocessor.MockSubtreeProcessor{}
108112
mockSubtreeProcessor.On("CheckSubtreeProcessor").Return(errors.NewProcessingError("test error"))
109-
mockSubtreeProcessor.On("Close").Return() // Expect Close() to be called during cleanup
113+
mockSubtreeProcessor.On("Stop", mock.Anything).Return() // Expect Stop() to be called during cleanup
110114

111115
server.blockAssembler.subtreeProcessor = mockSubtreeProcessor
112116

@@ -589,6 +593,10 @@ func TestGetBlockAssemblyStateCoverage(t *testing.T) {
589593
server, _, _, _ := setup(t)
590594
ctx := t.Context()
591595

596+
// Start the block assembler so the subtree processor goroutine is running
597+
err := server.blockAssembler.Start(ctx)
598+
require.NoError(t, err)
599+
592600
t.Run("get block assembly state", func(t *testing.T) {
593601
resp, err := server.GetBlockAssemblyState(ctx, &blockassembly_api.EmptyMessage{})
594602
if err == nil {
@@ -609,6 +617,10 @@ func TestGetBlockAssemblyTxsCoverage(t *testing.T) {
609617
server, _, _, _ := setup(t)
610618
ctx := t.Context()
611619

620+
// Start the block assembler so the subtree processor goroutine is running
621+
err := server.blockAssembler.Start(ctx)
622+
require.NoError(t, err)
623+
612624
t.Run("get block assembly transactions", func(t *testing.T) {
613625
resp, err := server.GetBlockAssemblyTxs(ctx, &blockassembly_api.EmptyMessage{})
614626
if err == nil {
@@ -1082,7 +1094,7 @@ func TestAddTxIntensive(t *testing.T) {
10821094

10831095
t.Run("AddTx increments transaction counter", func(t *testing.T) {
10841096
server, _ := setupServer(t)
1085-
err := server.blockAssembler.Start(context.Background())
1097+
err := server.blockAssembler.Start(t.Context())
10861098
require.NoError(t, err)
10871099

10881100
initialCount := server.TxCount()
@@ -1114,7 +1126,7 @@ func TestAddTxIntensive(t *testing.T) {
11141126

11151127
t.Run("AddTx with various fee and size values", func(t *testing.T) {
11161128
server, _ := setupServer(t)
1117-
err := server.blockAssembler.Start(context.Background())
1129+
err := server.blockAssembler.Start(t.Context())
11181130
require.NoError(t, err)
11191131

11201132
testCases := []struct {
@@ -1165,7 +1177,7 @@ func TestRemoveTxIntensive(t *testing.T) {
11651177

11661178
t.Run("RemoveTx with valid transaction", func(t *testing.T) {
11671179
server, _ := setupServer(t)
1168-
err := server.blockAssembler.Start(context.Background())
1180+
err := server.blockAssembler.Start(t.Context())
11691181
require.NoError(t, err)
11701182

11711183
// First add a transaction
@@ -1234,7 +1246,7 @@ func TestAddTxBatchIntensive(t *testing.T) {
12341246

12351247
t.Run("AddTxBatch with large batch", func(t *testing.T) {
12361248
server, _ := setupServer(t)
1237-
err := server.blockAssembler.Start(context.Background())
1249+
err := server.blockAssembler.Start(t.Context())
12381250
require.NoError(t, err)
12391251

12401252
// Create a large batch
@@ -1312,7 +1324,7 @@ func TestGetMiningCandidateIntensive(t *testing.T) {
13121324

13131325
t.Run("GetMiningCandidate with includeSubtreeHashes", func(t *testing.T) {
13141326
server, _ := setupServer(t)
1315-
err := server.blockAssembler.Start(context.Background())
1327+
err := server.blockAssembler.Start(t.Context())
13161328
require.NoError(t, err)
13171329

13181330
// Add some transactions to create subtrees
@@ -1339,7 +1351,7 @@ func TestGetMiningCandidateIntensive(t *testing.T) {
13391351

13401352
t.Run("GetMiningCandidate creates job in cache", func(t *testing.T) {
13411353
server, _ := setupServer(t)
1342-
err := server.blockAssembler.Start(context.Background())
1354+
err := server.blockAssembler.Start(t.Context())
13431355
require.NoError(t, err)
13441356

13451357
req := &blockassembly_api.GetMiningCandidateRequest{}
@@ -1593,7 +1605,7 @@ func TestRunBackgroundProcessors(t *testing.T) {
15931605
func TestConcurrentOperations(t *testing.T) {
15941606
t.Run("concurrent AddTx operations", func(t *testing.T) {
15951607
server, _ := setupServer(t)
1596-
err := server.blockAssembler.Start(context.Background())
1608+
err := server.blockAssembler.Start(t.Context())
15971609
require.NoError(t, err)
15981610

15991611
const numGoroutines = 10
@@ -1679,7 +1691,7 @@ func TestEdgeCasesAndErrorPaths(t *testing.T) {
16791691
assert.GreaterOrEqual(t, count, 0)
16801692

16811693
// After starting
1682-
err := server.blockAssembler.Start(context.Background())
1694+
err := server.blockAssembler.Start(t.Context())
16831695
require.NoError(t, err)
16841696

16851697
count = server.SubtreeCount()
@@ -1821,7 +1833,7 @@ func TestGetCurrentDifficultyErrors(t *testing.T) {
18211833
func TestRemoveTxEdgeCases(t *testing.T) {
18221834
t.Run("removeTx coverage boost", func(t *testing.T) {
18231835
server, _ := setupServer(t)
1824-
err := server.blockAssembler.Start(context.Background())
1836+
err := server.blockAssembler.Start(t.Context())
18251837
require.NoError(t, err)
18261838

18271839
// Add a transaction first
@@ -2034,7 +2046,7 @@ func TestStoreRetryErrorPaths(t *testing.T) {
20342046
func TestGenerateBlockErrors(t *testing.T) {
20352047
t.Run("generateBlock coverage boost", func(t *testing.T) {
20362048
server, _ := setupServer(t)
2037-
err := server.blockAssembler.Start(context.Background())
2049+
err := server.blockAssembler.Start(t.Context())
20382050
require.NoError(t, err)
20392051

20402052
// Call generateBlock to increase coverage - it will likely fail but that's expected
@@ -2089,7 +2101,7 @@ func TestSubmitMiningSolutionEdgeCases(t *testing.T) {
20892101

20902102
t.Run("submitMiningSolution with invalid coinbase", func(t *testing.T) {
20912103
server, _ := setupServer(t)
2092-
err := server.blockAssembler.Start(context.Background())
2104+
err := server.blockAssembler.Start(t.Context())
20932105
require.NoError(t, err)
20942106

20952107
// Get a mining candidate first to create a job
@@ -2117,7 +2129,7 @@ func TestSubmitMiningSolutionEdgeCases(t *testing.T) {
21172129

21182130
t.Run("submitMiningSolution with mining already on same block", func(t *testing.T) {
21192131
server, _ := setupServer(t)
2120-
err := server.blockAssembler.Start(context.Background())
2132+
err := server.blockAssembler.Start(t.Context())
21212133
require.NoError(t, err)
21222134

21232135
// Get a mining candidate
@@ -2252,7 +2264,7 @@ func TestMoreCoveragePaths(t *testing.T) {
22522264

22532265
t.Run("GetMiningCandidate send notification in background", func(t *testing.T) {
22542266
server, _ := setupServer(t)
2255-
err := server.blockAssembler.Start(context.Background())
2267+
err := server.blockAssembler.Start(t.Context())
22562268
require.NoError(t, err)
22572269

22582270
req := &blockassembly_api.GetMiningCandidateRequest{}

services/blockassembly/subtreeprocessor/ConflictingTransactions_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ func TestProcessConflictingTransactions(t *testing.T) {
3838
newSubtreeChan := make(chan NewSubtreeRequest, 10)
3939

4040
// Create a subtree processor with mocked dependencies
41+
ctx := context.Background()
4142
stp, err := NewSubtreeProcessor(
42-
context.Background(),
43+
ctx,
4344
ulogger.TestLogger{},
4445
settings,
4546
blobStore,
@@ -48,6 +49,7 @@ func TestProcessConflictingTransactions(t *testing.T) {
4849
newSubtreeChan,
4950
)
5051
require.NoError(t, err)
52+
stp.Start(ctx)
5153

5254
// Create test data
5355
block := &model.Block{
@@ -116,8 +118,9 @@ func TestWaitForBlockBeingMined(t *testing.T) {
116118
newSubtreeChan := make(chan NewSubtreeRequest, 10)
117119

118120
// Create a subtree processor with mocked blockchain client
121+
ctx := context.Background()
119122
stp, err := NewSubtreeProcessor(
120-
context.Background(),
123+
ctx,
121124
ulogger.TestLogger{},
122125
settings,
123126
blobStore,
@@ -126,6 +129,7 @@ func TestWaitForBlockBeingMined(t *testing.T) {
126129
newSubtreeChan,
127130
)
128131
require.NoError(t, err)
132+
stp.Start(ctx)
129133

130134
// Create test data
131135
blockHash := chainhash.HashH([]byte("test-block"))
@@ -196,8 +200,9 @@ func TestGetBlockIDsMap(t *testing.T) {
196200
newSubtreeChan := make(chan NewSubtreeRequest, 10)
197201

198202
// Create a subtree processor with mocked dependencies
203+
ctx := context.Background()
199204
stp, err := NewSubtreeProcessor(
200-
context.Background(),
205+
ctx,
201206
ulogger.TestLogger{},
202207
settings,
203208
blobStore,
@@ -206,6 +211,7 @@ func TestGetBlockIDsMap(t *testing.T) {
206211
newSubtreeChan,
207212
)
208213
require.NoError(t, err)
214+
stp.Start(ctx)
209215

210216
// Create test data
211217
tx1Hash := chainhash.HashH([]byte("tx1"))
@@ -266,8 +272,9 @@ func TestGetSubtreeAndConflictingTransactionsMap(t *testing.T) {
266272
newSubtreeChan := make(chan NewSubtreeRequest, 10)
267273

268274
// Create a subtree processor
275+
ctx := context.Background()
269276
stp, err := NewSubtreeProcessor(
270-
context.Background(),
277+
ctx,
271278
ulogger.TestLogger{},
272279
settings,
273280
blobStore,
@@ -276,6 +283,7 @@ func TestGetSubtreeAndConflictingTransactionsMap(t *testing.T) {
276283
newSubtreeChan,
277284
)
278285
require.NoError(t, err)
286+
stp.Start(ctx)
279287

280288
// Create a subtree with some transactions
281289
subtree, err := subtreepkg.NewTreeByLeafCount(4)
@@ -334,8 +342,9 @@ func TestMarkConflictingTxsInSubtrees(t *testing.T) {
334342
newSubtreeChan := make(chan NewSubtreeRequest, 10)
335343

336344
// Create a subtree processor with mocked dependencies
345+
ctx := context.Background()
337346
stp, err := NewSubtreeProcessor(
338-
context.Background(),
347+
ctx,
339348
ulogger.TestLogger{},
340349
settings,
341350
blobStore,
@@ -344,6 +353,7 @@ func TestMarkConflictingTxsInSubtrees(t *testing.T) {
344353
newSubtreeChan,
345354
)
346355
require.NoError(t, err)
356+
stp.Start(ctx)
347357

348358
// Create test data
349359
tx1Hash := chainhash.HashH([]byte("tx1"))

0 commit comments

Comments
 (0)