Skip to content

Commit a2720a2

Browse files
committed
Prevent subtree data race
1 parent 17f17c2 commit a2720a2

File tree

5 files changed

+110
-95
lines changed

5 files changed

+110
-95
lines changed

services/blockassembly/subtreeprocessor/SubtreeProcessor.go

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ type SubtreeProcessor struct {
207207
chainedSubtreeCount atomic.Int32
208208

209209
// currentSubtree represents the subtree currently being built
210-
currentSubtree *subtreepkg.Subtree
210+
// Uses atomic.Pointer for safe concurrent access from external callers (e.g., gRPC handlers)
211+
currentSubtree atomic.Pointer[subtreepkg.Subtree]
211212

212213
// currentBlockHeader stores the current block header being processed
213214
currentBlockHeader *model.BlockHeader
@@ -384,7 +385,6 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings *
384385
newSubtreeChan: newSubtreeChan,
385386
chainedSubtrees: make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees),
386387
chainedSubtreeCount: atomic.Int32{},
387-
currentSubtree: firstSubtree,
388388
batcher: NewTxIDAndFeeBatch(tSettings.BlockAssembly.SubtreeProcessorBatcherSize),
389389
queue: queue,
390390
currentTxMap: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](),
@@ -397,6 +397,7 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings *
397397
currentRunningState: atomic.Value{},
398398
announcementTicker: time.NewTicker(tSettings.BlockAssembly.SubtreeAnnouncementInterval),
399399
}
400+
stp.currentSubtree.Store(firstSubtree)
400401
stp.setCurrentRunningState(StateStarting)
401402

402403
// need to make sure first coinbase tx is counted when we start
@@ -457,7 +458,7 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
457458
completeSubtrees = append(completeSubtrees, stp.chainedSubtrees...)
458459

459460
// incomplete subtrees ?
460-
if chainedCount == 0 && stp.currentSubtree.Length() > 1 {
461+
if chainedCount == 0 && stp.currentSubtree.Load().Length() > 1 {
461462
incompleteSubtree, err := stp.createIncompleteSubtreeCopy()
462463
if err != nil {
463464
logger.Errorf("[SubtreeProcessor] error creating incomplete subtree: %s", err.Error())
@@ -513,8 +514,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
513514
subtreeHashes = append(subtreeHashes, *subtree.RootHash())
514515
}
515516

516-
if stp.currentSubtree.Length() > 0 {
517-
subtreeHashes = append(subtreeHashes, *stp.currentSubtree.RootHash())
517+
if stp.currentSubtree.Load().Length() > 0 {
518+
subtreeHashes = append(subtreeHashes, *stp.currentSubtree.Load().RootHash())
518519
}
519520

520521
getSubtreeHashesChan <- subtreeHashes
@@ -530,8 +531,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
530531
transactionHashes = append(transactionHashes, node.Hash)
531532
}
532533
}
533-
if stp.currentSubtree.Length() > 0 {
534-
for _, node := range stp.currentSubtree.Nodes {
534+
if stp.currentSubtree.Load().Length() > 0 {
535+
for _, node := range stp.currentSubtree.Load().Nodes {
535536
transactionHashes = append(transactionHashes, node.Hash)
536537
}
537538
}
@@ -560,14 +561,14 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
560561

561562
// store current state before attempting to move forward the block
562563
originalChainedSubtrees := stp.chainedSubtrees
563-
originalCurrentSubtree := stp.currentSubtree
564+
originalCurrentSubtree := stp.currentSubtree.Load()
564565
originalCurrentTxMap := stp.currentTxMap
565566
currentBlockHeader := stp.currentBlockHeader
566567

567568
if _, err = stp.moveForwardBlock(processorCtx, moveForwardReq.block, false, processedConflictingHashesMap, false, true); err != nil {
568569
// rollback to previous state
569570
stp.chainedSubtrees = originalChainedSubtrees
570-
stp.currentSubtree = originalCurrentSubtree
571+
stp.currentSubtree.Store(originalCurrentSubtree)
571572
stp.currentTxMap = originalCurrentTxMap
572573
stp.currentBlockHeader = currentBlockHeader
573574

@@ -608,7 +609,7 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
608609

609610
case lengthCh := <-stp.lengthCh:
610611
// return the length of the current subtree
611-
lengthCh <- stp.currentSubtree.Length()
612+
lengthCh <- stp.currentSubtree.Load().Length()
612613

613614
case errCh := <-stp.checkSubtreeProcessorCh:
614615
stp.setCurrentRunningState(StateCheckSubtreeProcessor)
@@ -619,8 +620,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) {
619620

620621
case <-stp.announcementTicker.C:
621622
// Periodically announce the current subtree if it has transactions
622-
if stp.currentSubtree.Length() > 1 {
623-
logger.Debugf("[SubtreeProcessor] periodic announcement of current subtree with %d transactions", stp.currentSubtree.Length()-1)
623+
if stp.currentSubtree.Load().Length() > 1 {
624+
logger.Debugf("[SubtreeProcessor] periodic announcement of current subtree with %d transactions", stp.currentSubtree.Load().Length()-1)
624625

625626
incompleteSubtree, err := stp.createIncompleteSubtreeCopy()
626627
if err != nil {
@@ -757,14 +758,14 @@ func (stp *SubtreeProcessor) createIncompleteSubtreeCopy() (*subtreepkg.Subtree,
757758
}
758759

759760
// Copy all nodes from current subtree (skipping the coinbase placeholder at index 0)
760-
for _, node := range stp.currentSubtree.Nodes[1:] {
761+
for _, node := range stp.currentSubtree.Load().Nodes[1:] {
761762
if err = incompleteSubtree.AddSubtreeNode(node); err != nil {
762763
return nil, err
763764
}
764765
}
765766

766767
// Copy fees
767-
incompleteSubtree.Fees = stp.currentSubtree.Fees
768+
incompleteSubtree.Fees = stp.currentSubtree.Load().Fees
768769

769770
return incompleteSubtree, nil
770771
}
@@ -827,8 +828,9 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock
827828
stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees)
828829
stp.chainedSubtreeCount.Store(0)
829830

830-
stp.currentSubtree, _ = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
831-
if err := stp.currentSubtree.AddCoinbaseNode(); err != nil {
831+
newSubtree, _ := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
832+
stp.currentSubtree.Store(newSubtree)
833+
if err := stp.currentSubtree.Load().AddCoinbaseNode(); err != nil {
832834
return errors.NewProcessingError("[SubtreeProcessor][Reset] error adding coinbase placeholder to new current subtree", err)
833835
}
834836

@@ -1042,11 +1044,12 @@ func (stp *SubtreeProcessor) SetCurrentBlockHeader(blockHeader *model.BlockHeade
10421044
}
10431045

10441046
// GetCurrentSubtree returns the subtree currently being built.
1047+
// This method is safe for concurrent access.
10451048
//
10461049
// Returns:
10471050
// - *util.Subtree: Current subtree
10481051
func (stp *SubtreeProcessor) GetCurrentSubtree() *subtreepkg.Subtree {
1049-
return stp.currentSubtree
1052+
return stp.currentSubtree.Load()
10501053
}
10511054

10521055
// GetCurrentTxMap returns the map of transactions currently held in the subtree processor.
@@ -1348,27 +1351,28 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T
13481351
}
13491352
}
13501353

1351-
if stp.currentSubtree == nil {
1352-
stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
1354+
if stp.currentSubtree.Load() == nil {
1355+
newSubtree, err := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
13531356
if err != nil {
13541357
return err
13551358
}
1359+
stp.currentSubtree.Store(newSubtree)
13561360

13571361
// This is the first subtree for this block - we need a coinbase placeholder
1358-
err = stp.currentSubtree.AddCoinbaseNode()
1362+
err = stp.currentSubtree.Load().AddCoinbaseNode()
13591363
if err != nil {
13601364
return err
13611365
}
13621366

13631367
stp.txCount.Add(1)
13641368
}
13651369

1366-
err = stp.currentSubtree.AddSubtreeNode(node)
1370+
err = stp.currentSubtree.Load().AddSubtreeNode(node)
13671371
if err != nil {
13681372
return errors.NewProcessingError("error adding node to subtree", err)
13691373
}
13701374

1371-
if stp.currentSubtree.IsComplete() {
1375+
if stp.currentSubtree.Load().IsComplete() {
13721376
if err = stp.processCompleteSubtree(skipNotification); err != nil {
13731377
return err
13741378
}
@@ -1378,36 +1382,38 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T
13781382
}
13791383

13801384
func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err error) {
1385+
currentSubtree := stp.currentSubtree.Load()
13811386
if !skipNotification {
1382-
stp.logger.Debugf("[%s] append subtree", stp.currentSubtree.RootHash().String())
1387+
stp.logger.Debugf("[%s] append subtree", currentSubtree.RootHash().String())
13831388
}
13841389

13851390
// Track the actual number of nodes in this subtree
13861391
// We don't exclude coinbase because:
13871392
// 1. Only the first subtree in a block has a coinbase
13881393
// 2. The coinbase is still a transaction that takes space
13891394
// 3. For sizing decisions, we care about total throughput
1390-
actualNodeCount := len(stp.currentSubtree.Nodes)
1395+
actualNodeCount := len(currentSubtree.Nodes)
13911396
if actualNodeCount > 0 {
13921397
// Add to ring buffer (overwrites oldest value automatically)
13931398
stp.subtreeNodeCounts.Value = actualNodeCount
13941399
stp.subtreeNodeCounts = stp.subtreeNodeCounts.Next()
13951400
}
13961401

13971402
// Add the subtree to the chain
1398-
stp.chainedSubtrees = append(stp.chainedSubtrees, stp.currentSubtree)
1403+
stp.chainedSubtrees = append(stp.chainedSubtrees, currentSubtree)
13991404
stp.chainedSubtreeCount.Add(1)
14001405

14011406
stp.subtreesInBlock++ // Track number of subtrees in current block
14021407

1403-
oldSubtree := stp.currentSubtree
1408+
oldSubtree := currentSubtree
14041409
oldSubtreeHash := oldSubtree.RootHash()
14051410

14061411
// create a new subtree with the same height as the previous subtree
1407-
stp.currentSubtree, err = subtreepkg.NewTree(stp.currentSubtree.Height)
1412+
newSubtree, err := subtreepkg.NewTree(oldSubtree.Height)
14081413
if err != nil {
14091414
return errors.NewProcessingError("[%s] error creating new subtree", oldSubtreeHash.String(), err)
14101415
}
1416+
stp.currentSubtree.Store(newSubtree)
14111417

14121418
// Send the subtree to the newSubtreeChan, including a reference to the parent transactions map
14131419
errCh := make(chan error)
@@ -1506,7 +1512,7 @@ func (stp *SubtreeProcessor) removeTxFromSubtrees(ctx context.Context, hash chai
15061512
defer deferFn()
15071513

15081514
// find the transaction in the current and all chained subtrees
1509-
foundIndex := stp.currentSubtree.NodeIndex(hash)
1515+
foundIndex := stp.currentSubtree.Load().NodeIndex(hash)
15101516
foundSubtreeIndex := -1
15111517

15121518
if foundIndex == -1 {
@@ -1528,7 +1534,7 @@ func (stp *SubtreeProcessor) removeTxFromSubtrees(ctx context.Context, hash chai
15281534
if foundSubtreeIndex == -1 {
15291535
// it was found in the current tree, remove it from there
15301536
// further processing is not needed, as the subtrees in the chainedSubtrees are older than the current subtree
1531-
return stp.currentSubtree.RemoveNodeAtIndex(foundIndex)
1537+
return stp.currentSubtree.Load().RemoveNodeAtIndex(foundIndex)
15321538
}
15331539

15341540
// it was found in a chained subtree, remove it from there and chain the subtrees again from the point it was removed
@@ -1570,7 +1576,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [
15701576

15711577
for _, hash := range hashes {
15721578
// find the transaction in the current and all chained subtrees
1573-
foundIndex := stp.currentSubtree.NodeIndex(hash)
1579+
foundIndex := stp.currentSubtree.Load().NodeIndex(hash)
15741580
foundSubtreeIndex := -1
15751581

15761582
if foundIndex == -1 {
@@ -1592,7 +1598,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [
15921598
if foundSubtreeIndex == -1 {
15931599
// it was found in the current tree, remove it from there
15941600
// further processing is not needed, as the subtrees in the chainedSubtrees are older than the current subtree
1595-
return stp.currentSubtree.RemoveNodeAtIndex(foundIndex)
1601+
return stp.currentSubtree.Load().RemoveNodeAtIndex(foundIndex)
15961602
}
15971603

15981604
// it was found in a chained subtree, remove it from there and chain the subtrees again from the point it was removed
@@ -1625,7 +1631,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [
16251631
func (stp *SubtreeProcessor) reChainSubtrees(fromIndex int) error {
16261632
// copy the original subtrees from the given index into a new structure
16271633
originalSubtrees := stp.chainedSubtrees[fromIndex:]
1628-
originalSubtrees = append(originalSubtrees, stp.currentSubtree)
1634+
originalSubtrees = append(originalSubtrees, stp.currentSubtree.Load())
16291635

16301636
// reset the chained subtrees and the current subtree
16311637
stp.chainedSubtrees = stp.chainedSubtrees[:fromIndex]
@@ -1637,14 +1643,15 @@ func (stp *SubtreeProcessor) reChainSubtrees(fromIndex int) error {
16371643

16381644
stp.chainedSubtreeCount.Store(fromIndexInt32)
16391645

1640-
stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
1646+
newSubtree, err := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile)
16411647
if err != nil {
16421648
return errors.NewProcessingError("error creating new current subtree", err)
16431649
}
1650+
stp.currentSubtree.Store(newSubtree)
16441651

16451652
if len(originalSubtrees) == 0 {
16461653
// we must add the coinbase tx if we have no original subtrees
1647-
if err = stp.currentSubtree.AddCoinbaseNode(); err != nil {
1654+
if err = stp.currentSubtree.Load().AddCoinbaseNode(); err != nil {
16481655
return errors.NewProcessingError("error adding coinbase node to new current subtree", err)
16491656
}
16501657
}
@@ -1724,7 +1731,7 @@ func (stp *SubtreeProcessor) checkSubtreeProcessor(errCh chan error) {
17241731
}
17251732

17261733
// check all the transactions in the subtrees are in the currentTxMap
1727-
for nodeIdx, node := range stp.currentSubtree.Nodes {
1734+
for nodeIdx, node := range stp.currentSubtree.Load().Nodes {
17281735
if _, ok := stp.currentTxMap.Get(node.Hash); !ok {
17291736
if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) {
17301737
// check that the coinbase placeholder is in the first subtree
@@ -1880,15 +1887,15 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []*
18801887

18811888
// store current state for restore in case of error
18821889
originalChainedSubtrees := stp.chainedSubtrees
1883-
originalCurrentSubtree := stp.currentSubtree
1890+
originalCurrentSubtree := stp.currentSubtree.Load()
18841891
originalCurrentTxMap := stp.currentTxMap
18851892
currentBlockHeader := stp.currentBlockHeader
18861893

18871894
defer func() {
18881895
if err != nil {
18891896
// restore the original state
18901897
stp.chainedSubtrees = originalChainedSubtrees
1891-
stp.currentSubtree = originalCurrentSubtree
1898+
stp.currentSubtree.Store(originalCurrentSubtree)
18921899
stp.currentTxMap = originalCurrentTxMap
18931900
stp.currentBlockHeader = currentBlockHeader
18941901

@@ -2007,9 +2014,10 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []*
20072014
}
20082015

20092016
// also for the current subtree
2010-
notOnLongestChain := make([]chainhash.Hash, 0, len(stp.currentSubtree.Nodes))
2017+
currentSubtree := stp.currentSubtree.Load()
2018+
notOnLongestChain := make([]chainhash.Hash, 0, len(currentSubtree.Nodes))
20112019

2012-
for _, node := range stp.currentSubtree.Nodes {
2020+
for _, node := range currentSubtree.Nodes {
20132021
if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) {
20142022
// skip coinbase placeholder
20152023
continue
@@ -2182,7 +2190,7 @@ func (stp *SubtreeProcessor) setTxCountFromSubtrees() {
21822190
stp.txCount.Add(subtreeLen)
21832191
}
21842192

2185-
currSubtreeLenUint64, err := safeconversion.IntToUint64(stp.currentSubtree.Length())
2193+
currSubtreeLenUint64, err := safeconversion.IntToUint64(stp.currentSubtree.Load().Length())
21862194
if err != nil {
21872195
stp.logger.Errorf("error converting current subtree length: %s", err)
21882196
return
@@ -2220,7 +2228,7 @@ func (stp *SubtreeProcessor) moveBackBlock(ctx context.Context, block *model.Blo
22202228
deferFn()
22212229
}()
22222230

2223-
lastIncompleteSubtree := stp.currentSubtree
2231+
lastIncompleteSubtree := stp.currentSubtree.Load()
22242232
chainedSubtrees := stp.chainedSubtrees
22252233

22262234
// process coinbase utxos
@@ -2308,16 +2316,17 @@ func (stp *SubtreeProcessor) moveBackBlockCreateNewSubtrees(ctx context.Context,
23082316
// we create as few subtrees as possible when moving back, to avoid fragmentation and lots of small writes to disk
23092317
subtreeSize = 1024 * 1024
23102318
}
2311-
stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(subtreeSize)
2319+
newSubtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize)
23122320
if err != nil {
23132321
return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s] error creating new subtree", block.String(), err)
23142322
}
2323+
stp.currentSubtree.Store(newSubtree)
23152324

23162325
stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees)
23172326
stp.chainedSubtreeCount.Store(0)
23182327

23192328
// add first coinbase placeholder transaction
2320-
_ = stp.currentSubtree.AddCoinbaseNode()
2329+
_ = stp.currentSubtree.Load().AddCoinbaseNode()
23212330

23222331
// run through the nodes of the subtrees in order and add to the new subtrees
23232332
if len(subtreesNodes) > 0 {
@@ -2576,16 +2585,17 @@ func (stp *SubtreeProcessor) resetSubtreeState(createProperlySizedSubtrees bool)
25762585
subtreeSize = 1024 * 1024
25772586
}
25782587

2579-
stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(subtreeSize)
2588+
newSubtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize)
25802589
if err != nil {
25812590
return err
25822591
}
2592+
stp.currentSubtree.Store(newSubtree)
25832593

25842594
stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees)
25852595
stp.chainedSubtreeCount.Store(0)
25862596

25872597
// Add first coinbase placeholder transaction
2588-
_ = stp.currentSubtree.AddCoinbaseNode()
2598+
_ = stp.currentSubtree.Load().AddCoinbaseNode()
25892599

25902600
return nil
25912601
}
@@ -2762,7 +2772,7 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model.
27622772
return nil, err
27632773
}
27642774

2765-
originalCurrentSubtree := stp.currentSubtree
2775+
originalCurrentSubtree := stp.currentSubtree.Load()
27662776
originalCurrentTxMap := stp.currentTxMap
27672777

27682778
// Reset subtree state

0 commit comments

Comments
 (0)