From e4133dce5443aaa144f093391f85fb37eb7ec03f Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 1 Dec 2025 16:44:11 +0100 Subject: [PATCH 1/5] Stream legacy blocks straight to legacy without deserialization --- services/legacy/peer_server.go | 11 +-- services/legacy/raw_block_message.go | 59 ++++++++++++++++ services/legacy/raw_block_message_test.go | 82 +++++++++++++++++++++++ 3 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 services/legacy/raw_block_message.go create mode 100644 services/legacy/raw_block_message_test.go diff --git a/services/legacy/peer_server.go b/services/legacy/peer_server.go index 23e2ba6bb..b496b5d70 100644 --- a/services/legacy/peer_server.go +++ b/services/legacy/peer_server.go @@ -1387,9 +1387,12 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha _ = reader.Close() }() - var msgBlock wire.MsgBlock - if err = msgBlock.Deserialize(reader); err != nil { - sp.server.logger.Errorf("Unable to deserialize requested block hash %v: %v", hash, err) + // Use RawBlockMessage to avoid deserialize/serialize overhead for large blocks. + // This reads raw bytes directly and writes them to the wire, bypassing the + // expensive process of creating Go structs for millions of transactions. + rawBlockMsg, err := NewRawBlockMessage(reader) + if err != nil { + sp.server.logger.Errorf("Unable to read requested block hash %v: %v", hash, err) if doneChan != nil { doneChan <- struct{}{} @@ -1414,7 +1417,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha dc = doneChan } - sp.QueueMessageWithEncoding(&msgBlock, dc, encoding) + sp.QueueMessageWithEncoding(rawBlockMsg, dc, encoding) // When the peer requests the final block that was advertised in // response to a getblocks message which requested more blocks than diff --git a/services/legacy/raw_block_message.go b/services/legacy/raw_block_message.go new file mode 100644 index 000000000..d20ade468 --- /dev/null +++ b/services/legacy/raw_block_message.go @@ -0,0 +1,59 @@ +package legacy + +import ( + "io" + + "github.com/bsv-blockchain/go-wire" + "github.com/bsv-blockchain/teranode/errors" +) + +// RawBlockMessage implements wire.Message for streaming raw block bytes +// without the overhead of deserializing into wire.MsgBlock structure. +// +// This is significantly more efficient for large blocks (e.g., 4GB+) because: +// - No CPU time spent deserializing millions of transactions into Go structs +// - No CPU time spent re-serializing those structs back to bytes +// - Reduced memory overhead (no Go struct allocation per transaction) +// +// The raw bytes are read from the source and written directly to the wire, +// bypassing the deserialize/serialize roundtrip that would otherwise require +// allocating memory for each transaction struct. +type RawBlockMessage struct { + data []byte +} + +// NewRawBlockMessage creates a RawBlockMessage by reading all bytes from the reader. +// The data must be in wire-format block encoding (header + txcount + transactions). +func NewRawBlockMessage(reader io.Reader) (*RawBlockMessage, error) { + data, err := io.ReadAll(reader) + if err != nil { + return nil, errors.NewProcessingError("failed to read block data", err) + } + + return &RawBlockMessage{data: data}, nil +} + +// Command returns the protocol command string for the message. +// Implements wire.Message interface. +func (m *RawBlockMessage) Command() string { + return wire.CmdBlock +} + +// BsvEncode writes the raw block bytes to the writer. +// Implements wire.Message interface. +func (m *RawBlockMessage) BsvEncode(w io.Writer, _ uint32, _ wire.MessageEncoding) error { + _, err := w.Write(m.data) + return err +} + +// Bsvdecode is not supported for RawBlockMessage. +// Implements wire.Message interface. +func (m *RawBlockMessage) Bsvdecode(_ io.Reader, _ uint32, _ wire.MessageEncoding) error { + return errors.NewProcessingError("RawBlockMessage does not support decoding") +} + +// MaxPayloadLength returns the length of the raw block data. +// Implements wire.Message interface. +func (m *RawBlockMessage) MaxPayloadLength(_ uint32) uint64 { + return uint64(len(m.data)) +} diff --git a/services/legacy/raw_block_message_test.go b/services/legacy/raw_block_message_test.go new file mode 100644 index 000000000..73f01695f --- /dev/null +++ b/services/legacy/raw_block_message_test.go @@ -0,0 +1,82 @@ +package legacy + +import ( + "bytes" + "io" + "testing" + + "github.com/bsv-blockchain/go-wire" + "github.com/stretchr/testify/require" +) + +func TestRawBlockMessage_Command(t *testing.T) { + msg := &RawBlockMessage{data: []byte{}} + require.Equal(t, wire.CmdBlock, msg.Command()) +} + +func TestRawBlockMessage_MaxPayloadLength(t *testing.T) { + data := make([]byte, 1000) + msg := &RawBlockMessage{data: data} + require.Equal(t, uint64(1000), msg.MaxPayloadLength(0)) +} + +func TestRawBlockMessage_BsvEncode(t *testing.T) { + testData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + msg := &RawBlockMessage{data: testData} + + var buf bytes.Buffer + err := msg.BsvEncode(&buf, 0, wire.BaseEncoding) + require.NoError(t, err) + require.Equal(t, testData, buf.Bytes()) +} + +func TestRawBlockMessage_Bsvdecode(t *testing.T) { + msg := &RawBlockMessage{} + err := msg.Bsvdecode(nil, 0, wire.BaseEncoding) + require.Error(t, err) + require.Contains(t, err.Error(), "does not support decoding") +} + +func TestNewRawBlockMessage(t *testing.T) { + testData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + reader := bytes.NewReader(testData) + + msg, err := NewRawBlockMessage(reader) + require.NoError(t, err) + require.Equal(t, testData, msg.data) +} + +func TestNewRawBlockMessage_ReadError(t *testing.T) { + // Create a reader that will return an error + reader := &errorReader{} + + msg, err := NewRawBlockMessage(reader) + require.Error(t, err) + require.Nil(t, msg) +} + +// errorReader is a test helper that always returns an error +type errorReader struct{} + +func (r *errorReader) Read(_ []byte) (int, error) { + return 0, io.ErrUnexpectedEOF +} + +func TestRawBlockMessage_LargeData(t *testing.T) { + // Test with a larger block-like structure (1MB) + data := make([]byte, 1024*1024) + for i := range data { + data[i] = byte(i % 256) + } + + reader := bytes.NewReader(data) + msg, err := NewRawBlockMessage(reader) + require.NoError(t, err) + require.Equal(t, uint64(1024*1024), msg.MaxPayloadLength(0)) + + // Verify encode produces the same data + var buf bytes.Buffer + err = msg.BsvEncode(&buf, 0, wire.BaseEncoding) + require.NoError(t, err) + require.Equal(t, data, buf.Bytes()) +} From 3a6903d5b133f609db453f77613f7579d83e8b6d Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Mon, 1 Dec 2025 19:56:48 +0100 Subject: [PATCH 2/5] Rename test helper --- services/legacy/raw_block_message_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/legacy/raw_block_message_test.go b/services/legacy/raw_block_message_test.go index 73f01695f..89d6bb3d2 100644 --- a/services/legacy/raw_block_message_test.go +++ b/services/legacy/raw_block_message_test.go @@ -48,17 +48,17 @@ func TestNewRawBlockMessage(t *testing.T) { func TestNewRawBlockMessage_ReadError(t *testing.T) { // Create a reader that will return an error - reader := &errorReader{} + reader := &failingReader{} msg, err := NewRawBlockMessage(reader) require.Error(t, err) require.Nil(t, msg) } -// errorReader is a test helper that always returns an error -type errorReader struct{} +// failingReader is a test helper that always fails on Read +type failingReader struct{} -func (r *errorReader) Read(_ []byte) (int, error) { +func (r *failingReader) Read(_ []byte) (int, error) { return 0, io.ErrUnexpectedEOF } From 95e03ed160ebce2394dca401a2e455e247660414 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Wed, 3 Dec 2025 20:57:29 +0100 Subject: [PATCH 3/5] Stream legacy blocks straight from asset --- services/legacy/peer_server.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/services/legacy/peer_server.go b/services/legacy/peer_server.go index b496b5d70..deb2b0891 100644 --- a/services/legacy/peer_server.go +++ b/services/legacy/peer_server.go @@ -13,7 +13,6 @@ import ( "encoding/binary" "errors" "fmt" - "io" "math" "net" "os" @@ -31,7 +30,6 @@ import ( safeconversion "github.com/bsv-blockchain/go-safe-conversion" txmap "github.com/bsv-blockchain/go-tx-map" "github.com/bsv-blockchain/go-wire" - "github.com/bsv-blockchain/teranode/pkg/fileformat" "github.com/bsv-blockchain/teranode/services/blockassembly" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/blockvalidation" @@ -1367,12 +1365,13 @@ func (s *server) getTxFromStore(hash *chainhash.Hash) (*bsvutil.Tx, int64, error // connected peer. An error is returned if the block hash is not known. func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error { - // use a concurrent store to make sure we do not request the legacy block multiple times - // for different peers. This makes sure we serve the block from a local cache store and not from the utxo store. - reader, err := s.concurrentStore.Get(s.ctx, *hash, fileformat.FileTypeMsgBlock, func() (io.ReadCloser, error) { - url := fmt.Sprintf("%s/block_legacy/%s?wire=1", s.assetHTTPAddress, hash.String()) - return util.DoHTTPRequestBodyReader(s.ctx, url) - }) + // Stream directly from Asset Server without disk caching. + // For large blocks (4GB+), the disk I/O overhead of ConcurrentBlob causes timeouts. + // The Asset Server handles caching internally, so we stream directly to avoid: + // 1. Writing the entire block to disk (slow, causes context deadline exceeded) + // 2. Reading it back from disk (additional I/O overhead) + url := fmt.Sprintf("%s/block_legacy/%s?wire=1", s.assetHTTPAddress, hash.String()) + reader, err := util.DoHTTPRequestBodyReader(s.ctx, url) if err != nil { sp.server.logger.Errorf("Unable to fetch requested block %v: %v", hash, err) From 325af92ca86980618552d0d033473406740b504b Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Wed, 3 Dec 2025 21:00:26 +0100 Subject: [PATCH 4/5] Increase http streaming timeout --- settings.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/settings.conf b/settings.conf index ae82e1722..2bb51bee3 100644 --- a/settings.conf +++ b/settings.conf @@ -649,8 +649,8 @@ http_timeout = 30000 # HTTP streaming timeout in milliseconds for large file downloads (subtree data, blocks during catchup) # This is longer than http_timeout to accommodate large subtree data files which can be 100+ MB -# Default: 300000ms (5 minutes) -http_streaming_timeout = 300000 +# Default: 600000ms (10 minutes) +http_streaming_timeout = 600000 # IPV6 Addresses # -------------- From a2720a25efba82073544d21d382d083eb95a7fa5 Mon Sep 17 00:00:00 2001 From: oskarszoon <1449115+oskarszoon@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:50:18 +0100 Subject: [PATCH 5/5] Prevent subtree data race --- .../subtreeprocessor/SubtreeProcessor.go | 102 ++++++++++-------- .../SubtreeProcessor_dynamic_sizing_test.go | 21 ++-- .../subtreeprocessor/SubtreeProcessor_test.go | 74 ++++++------- .../reorg_duplicate_bug_test.go | 4 +- .../subtree_size_benchmark_test.go | 4 +- 5 files changed, 110 insertions(+), 95 deletions(-) diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index 3af9a8801..f2ce69cb6 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -207,7 +207,8 @@ type SubtreeProcessor struct { chainedSubtreeCount atomic.Int32 // currentSubtree represents the subtree currently being built - currentSubtree *subtreepkg.Subtree + // Uses atomic.Pointer for safe concurrent access from external callers (e.g., gRPC handlers) + currentSubtree atomic.Pointer[subtreepkg.Subtree] // currentBlockHeader stores the current block header being processed currentBlockHeader *model.BlockHeader @@ -384,7 +385,6 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings * newSubtreeChan: newSubtreeChan, chainedSubtrees: make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees), chainedSubtreeCount: atomic.Int32{}, - currentSubtree: firstSubtree, batcher: NewTxIDAndFeeBatch(tSettings.BlockAssembly.SubtreeProcessorBatcherSize), queue: queue, currentTxMap: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](), @@ -397,6 +397,7 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings * currentRunningState: atomic.Value{}, announcementTicker: time.NewTicker(tSettings.BlockAssembly.SubtreeAnnouncementInterval), } + stp.currentSubtree.Store(firstSubtree) stp.setCurrentRunningState(StateStarting) // need to make sure first coinbase tx is counted when we start @@ -457,7 +458,7 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { completeSubtrees = append(completeSubtrees, stp.chainedSubtrees...) // incomplete subtrees ? - if chainedCount == 0 && stp.currentSubtree.Length() > 1 { + if chainedCount == 0 && stp.currentSubtree.Load().Length() > 1 { incompleteSubtree, err := stp.createIncompleteSubtreeCopy() if err != nil { logger.Errorf("[SubtreeProcessor] error creating incomplete subtree: %s", err.Error()) @@ -513,8 +514,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { subtreeHashes = append(subtreeHashes, *subtree.RootHash()) } - if stp.currentSubtree.Length() > 0 { - subtreeHashes = append(subtreeHashes, *stp.currentSubtree.RootHash()) + if stp.currentSubtree.Load().Length() > 0 { + subtreeHashes = append(subtreeHashes, *stp.currentSubtree.Load().RootHash()) } getSubtreeHashesChan <- subtreeHashes @@ -530,8 +531,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { transactionHashes = append(transactionHashes, node.Hash) } } - if stp.currentSubtree.Length() > 0 { - for _, node := range stp.currentSubtree.Nodes { + if stp.currentSubtree.Load().Length() > 0 { + for _, node := range stp.currentSubtree.Load().Nodes { transactionHashes = append(transactionHashes, node.Hash) } } @@ -560,14 +561,14 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { // store current state before attempting to move forward the block originalChainedSubtrees := stp.chainedSubtrees - originalCurrentSubtree := stp.currentSubtree + originalCurrentSubtree := stp.currentSubtree.Load() originalCurrentTxMap := stp.currentTxMap currentBlockHeader := stp.currentBlockHeader if _, err = stp.moveForwardBlock(processorCtx, moveForwardReq.block, false, processedConflictingHashesMap, false, true); err != nil { // rollback to previous state stp.chainedSubtrees = originalChainedSubtrees - stp.currentSubtree = originalCurrentSubtree + stp.currentSubtree.Store(originalCurrentSubtree) stp.currentTxMap = originalCurrentTxMap stp.currentBlockHeader = currentBlockHeader @@ -608,7 +609,7 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { case lengthCh := <-stp.lengthCh: // return the length of the current subtree - lengthCh <- stp.currentSubtree.Length() + lengthCh <- stp.currentSubtree.Load().Length() case errCh := <-stp.checkSubtreeProcessorCh: stp.setCurrentRunningState(StateCheckSubtreeProcessor) @@ -619,8 +620,8 @@ func (stp *SubtreeProcessor) Start(ctx context.Context) { case <-stp.announcementTicker.C: // Periodically announce the current subtree if it has transactions - if stp.currentSubtree.Length() > 1 { - logger.Debugf("[SubtreeProcessor] periodic announcement of current subtree with %d transactions", stp.currentSubtree.Length()-1) + if stp.currentSubtree.Load().Length() > 1 { + logger.Debugf("[SubtreeProcessor] periodic announcement of current subtree with %d transactions", stp.currentSubtree.Load().Length()-1) incompleteSubtree, err := stp.createIncompleteSubtreeCopy() if err != nil { @@ -757,14 +758,14 @@ func (stp *SubtreeProcessor) createIncompleteSubtreeCopy() (*subtreepkg.Subtree, } // Copy all nodes from current subtree (skipping the coinbase placeholder at index 0) - for _, node := range stp.currentSubtree.Nodes[1:] { + for _, node := range stp.currentSubtree.Load().Nodes[1:] { if err = incompleteSubtree.AddSubtreeNode(node); err != nil { return nil, err } } // Copy fees - incompleteSubtree.Fees = stp.currentSubtree.Fees + incompleteSubtree.Fees = stp.currentSubtree.Load().Fees return incompleteSubtree, nil } @@ -827,8 +828,9 @@ func (stp *SubtreeProcessor) reset(blockHeader *model.BlockHeader, moveBackBlock stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees) stp.chainedSubtreeCount.Store(0) - stp.currentSubtree, _ = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) - if err := stp.currentSubtree.AddCoinbaseNode(); err != nil { + newSubtree, _ := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) + stp.currentSubtree.Store(newSubtree) + if err := stp.currentSubtree.Load().AddCoinbaseNode(); err != nil { return errors.NewProcessingError("[SubtreeProcessor][Reset] error adding coinbase placeholder to new current subtree", err) } @@ -1042,11 +1044,12 @@ func (stp *SubtreeProcessor) SetCurrentBlockHeader(blockHeader *model.BlockHeade } // GetCurrentSubtree returns the subtree currently being built. +// This method is safe for concurrent access. // // Returns: // - *util.Subtree: Current subtree func (stp *SubtreeProcessor) GetCurrentSubtree() *subtreepkg.Subtree { - return stp.currentSubtree + return stp.currentSubtree.Load() } // GetCurrentTxMap returns the map of transactions currently held in the subtree processor. @@ -1348,14 +1351,15 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T } } - if stp.currentSubtree == nil { - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) + if stp.currentSubtree.Load() == nil { + newSubtree, err := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) if err != nil { return err } + stp.currentSubtree.Store(newSubtree) // This is the first subtree for this block - we need a coinbase placeholder - err = stp.currentSubtree.AddCoinbaseNode() + err = stp.currentSubtree.Load().AddCoinbaseNode() if err != nil { return err } @@ -1363,12 +1367,12 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T stp.txCount.Add(1) } - err = stp.currentSubtree.AddSubtreeNode(node) + err = stp.currentSubtree.Load().AddSubtreeNode(node) if err != nil { return errors.NewProcessingError("error adding node to subtree", err) } - if stp.currentSubtree.IsComplete() { + if stp.currentSubtree.Load().IsComplete() { if err = stp.processCompleteSubtree(skipNotification); err != nil { return err } @@ -1378,8 +1382,9 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T } func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err error) { + currentSubtree := stp.currentSubtree.Load() if !skipNotification { - stp.logger.Debugf("[%s] append subtree", stp.currentSubtree.RootHash().String()) + stp.logger.Debugf("[%s] append subtree", currentSubtree.RootHash().String()) } // Track the actual number of nodes in this subtree @@ -1387,7 +1392,7 @@ func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err // 1. Only the first subtree in a block has a coinbase // 2. The coinbase is still a transaction that takes space // 3. For sizing decisions, we care about total throughput - actualNodeCount := len(stp.currentSubtree.Nodes) + actualNodeCount := len(currentSubtree.Nodes) if actualNodeCount > 0 { // Add to ring buffer (overwrites oldest value automatically) stp.subtreeNodeCounts.Value = actualNodeCount @@ -1395,19 +1400,20 @@ func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err } // Add the subtree to the chain - stp.chainedSubtrees = append(stp.chainedSubtrees, stp.currentSubtree) + stp.chainedSubtrees = append(stp.chainedSubtrees, currentSubtree) stp.chainedSubtreeCount.Add(1) stp.subtreesInBlock++ // Track number of subtrees in current block - oldSubtree := stp.currentSubtree + oldSubtree := currentSubtree oldSubtreeHash := oldSubtree.RootHash() // create a new subtree with the same height as the previous subtree - stp.currentSubtree, err = subtreepkg.NewTree(stp.currentSubtree.Height) + newSubtree, err := subtreepkg.NewTree(oldSubtree.Height) if err != nil { return errors.NewProcessingError("[%s] error creating new subtree", oldSubtreeHash.String(), err) } + stp.currentSubtree.Store(newSubtree) // Send the subtree to the newSubtreeChan, including a reference to the parent transactions map errCh := make(chan error) @@ -1506,7 +1512,7 @@ func (stp *SubtreeProcessor) removeTxFromSubtrees(ctx context.Context, hash chai defer deferFn() // find the transaction in the current and all chained subtrees - foundIndex := stp.currentSubtree.NodeIndex(hash) + foundIndex := stp.currentSubtree.Load().NodeIndex(hash) foundSubtreeIndex := -1 if foundIndex == -1 { @@ -1528,7 +1534,7 @@ func (stp *SubtreeProcessor) removeTxFromSubtrees(ctx context.Context, hash chai if foundSubtreeIndex == -1 { // it was found in the current tree, remove it from there // further processing is not needed, as the subtrees in the chainedSubtrees are older than the current subtree - return stp.currentSubtree.RemoveNodeAtIndex(foundIndex) + return stp.currentSubtree.Load().RemoveNodeAtIndex(foundIndex) } // it was found in a chained subtree, remove it from there and chain the subtrees again from the point it was removed @@ -1570,7 +1576,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [ for _, hash := range hashes { // find the transaction in the current and all chained subtrees - foundIndex := stp.currentSubtree.NodeIndex(hash) + foundIndex := stp.currentSubtree.Load().NodeIndex(hash) foundSubtreeIndex := -1 if foundIndex == -1 { @@ -1592,7 +1598,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [ if foundSubtreeIndex == -1 { // it was found in the current tree, remove it from there // further processing is not needed, as the subtrees in the chainedSubtrees are older than the current subtree - return stp.currentSubtree.RemoveNodeAtIndex(foundIndex) + return stp.currentSubtree.Load().RemoveNodeAtIndex(foundIndex) } // it was found in a chained subtree, remove it from there and chain the subtrees again from the point it was removed @@ -1625,7 +1631,7 @@ func (stp *SubtreeProcessor) removeTxsFromSubtrees(ctx context.Context, hashes [ func (stp *SubtreeProcessor) reChainSubtrees(fromIndex int) error { // copy the original subtrees from the given index into a new structure originalSubtrees := stp.chainedSubtrees[fromIndex:] - originalSubtrees = append(originalSubtrees, stp.currentSubtree) + originalSubtrees = append(originalSubtrees, stp.currentSubtree.Load()) // reset the chained subtrees and the current subtree stp.chainedSubtrees = stp.chainedSubtrees[:fromIndex] @@ -1637,14 +1643,15 @@ func (stp *SubtreeProcessor) reChainSubtrees(fromIndex int) error { stp.chainedSubtreeCount.Store(fromIndexInt32) - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) + newSubtree, err := subtreepkg.NewTreeByLeafCount(stp.currentItemsPerFile) if err != nil { return errors.NewProcessingError("error creating new current subtree", err) } + stp.currentSubtree.Store(newSubtree) if len(originalSubtrees) == 0 { // we must add the coinbase tx if we have no original subtrees - if err = stp.currentSubtree.AddCoinbaseNode(); err != nil { + if err = stp.currentSubtree.Load().AddCoinbaseNode(); err != nil { return errors.NewProcessingError("error adding coinbase node to new current subtree", err) } } @@ -1724,7 +1731,7 @@ func (stp *SubtreeProcessor) checkSubtreeProcessor(errCh chan error) { } // check all the transactions in the subtrees are in the currentTxMap - for nodeIdx, node := range stp.currentSubtree.Nodes { + for nodeIdx, node := range stp.currentSubtree.Load().Nodes { if _, ok := stp.currentTxMap.Get(node.Hash); !ok { if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { // check that the coinbase placeholder is in the first subtree @@ -1880,7 +1887,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* // store current state for restore in case of error originalChainedSubtrees := stp.chainedSubtrees - originalCurrentSubtree := stp.currentSubtree + originalCurrentSubtree := stp.currentSubtree.Load() originalCurrentTxMap := stp.currentTxMap currentBlockHeader := stp.currentBlockHeader @@ -1888,7 +1895,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* if err != nil { // restore the original state stp.chainedSubtrees = originalChainedSubtrees - stp.currentSubtree = originalCurrentSubtree + stp.currentSubtree.Store(originalCurrentSubtree) stp.currentTxMap = originalCurrentTxMap stp.currentBlockHeader = currentBlockHeader @@ -2007,9 +2014,10 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* } // also for the current subtree - notOnLongestChain := make([]chainhash.Hash, 0, len(stp.currentSubtree.Nodes)) + currentSubtree := stp.currentSubtree.Load() + notOnLongestChain := make([]chainhash.Hash, 0, len(currentSubtree.Nodes)) - for _, node := range stp.currentSubtree.Nodes { + for _, node := range currentSubtree.Nodes { if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { // skip coinbase placeholder continue @@ -2182,7 +2190,7 @@ func (stp *SubtreeProcessor) setTxCountFromSubtrees() { stp.txCount.Add(subtreeLen) } - currSubtreeLenUint64, err := safeconversion.IntToUint64(stp.currentSubtree.Length()) + currSubtreeLenUint64, err := safeconversion.IntToUint64(stp.currentSubtree.Load().Length()) if err != nil { stp.logger.Errorf("error converting current subtree length: %s", err) return @@ -2220,7 +2228,7 @@ func (stp *SubtreeProcessor) moveBackBlock(ctx context.Context, block *model.Blo deferFn() }() - lastIncompleteSubtree := stp.currentSubtree + lastIncompleteSubtree := stp.currentSubtree.Load() chainedSubtrees := stp.chainedSubtrees // process coinbase utxos @@ -2308,16 +2316,17 @@ func (stp *SubtreeProcessor) moveBackBlockCreateNewSubtrees(ctx context.Context, // we create as few subtrees as possible when moving back, to avoid fragmentation and lots of small writes to disk subtreeSize = 1024 * 1024 } - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(subtreeSize) + newSubtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize) if err != nil { return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s] error creating new subtree", block.String(), err) } + stp.currentSubtree.Store(newSubtree) stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees) stp.chainedSubtreeCount.Store(0) // add first coinbase placeholder transaction - _ = stp.currentSubtree.AddCoinbaseNode() + _ = stp.currentSubtree.Load().AddCoinbaseNode() // run through the nodes of the subtrees in order and add to the new subtrees if len(subtreesNodes) > 0 { @@ -2576,16 +2585,17 @@ func (stp *SubtreeProcessor) resetSubtreeState(createProperlySizedSubtrees bool) subtreeSize = 1024 * 1024 } - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(subtreeSize) + newSubtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize) if err != nil { return err } + stp.currentSubtree.Store(newSubtree) stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees) stp.chainedSubtreeCount.Store(0) // Add first coinbase placeholder transaction - _ = stp.currentSubtree.AddCoinbaseNode() + _ = stp.currentSubtree.Load().AddCoinbaseNode() return nil } @@ -2762,7 +2772,7 @@ func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model. return nil, err } - originalCurrentSubtree := stp.currentSubtree + originalCurrentSubtree := stp.currentSubtree.Load() originalCurrentTxMap := stp.currentTxMap // Reset subtree state diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor_dynamic_sizing_test.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor_dynamic_sizing_test.go index 04c9cb81d..2b2482bfc 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor_dynamic_sizing_test.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor_dynamic_sizing_test.go @@ -395,11 +395,12 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { t.Run("tracks node counts correctly", func(t *testing.T) { // Create a subtree with known node count - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(8) + newSubtree, err := subtreepkg.NewTreeByLeafCount(8) require.NoError(t, err) + stp.currentSubtree.Store(newSubtree) // Add some nodes (including coinbase) - err = stp.currentSubtree.AddCoinbaseNode() + err = stp.currentSubtree.Load().AddCoinbaseNode() require.NoError(t, err) // Add 4 more transaction nodes @@ -411,7 +412,7 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { Fee: 100, SizeInBytes: 250, } - err = stp.currentSubtree.AddSubtreeNode(node) + err = stp.currentSubtree.Load().AddSubtreeNode(node) require.NoError(t, err) } @@ -444,9 +445,10 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { } // Create another subtree that should trigger the limit - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(8) + newSubtree2, err := subtreepkg.NewTreeByLeafCount(8) require.NoError(t, err) - err = stp.currentSubtree.AddCoinbaseNode() + stp.currentSubtree.Store(newSubtree2) + err = stp.currentSubtree.Load().AddCoinbaseNode() require.NoError(t, err) for i := 0; i < 3; i++ { @@ -457,7 +459,7 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { Fee: 100, SizeInBytes: 250, } - err = stp.currentSubtree.AddSubtreeNode(node) + err = stp.currentSubtree.Load().AddSubtreeNode(node) require.NoError(t, err) } @@ -475,9 +477,10 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { require.Equal(t, 18, count, "Should have 18 samples (full buffer)") // Add one more to test that it maintains the limit - stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(8) + newSubtree3, err := subtreepkg.NewTreeByLeafCount(8) require.NoError(t, err) - err = stp.currentSubtree.AddCoinbaseNode() + stp.currentSubtree.Store(newSubtree3) + err = stp.currentSubtree.Load().AddCoinbaseNode() require.NoError(t, err) hash := chainhash.Hash{} @@ -487,7 +490,7 @@ func TestSubtreeProcessor_CompleteSubtreeTracking(t *testing.T) { Fee: 100, SizeInBytes: 250, } - err = stp.currentSubtree.AddSubtreeNode(node) + err = stp.currentSubtree.Load().AddSubtreeNode(node) require.NoError(t, err) err = stp.processCompleteSubtree(false) diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go index 5ca50c0f1..a28bd7b86 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go @@ -47,7 +47,7 @@ type SubtreeProcessorState struct { func captureSubtreeProcessorState(stp *SubtreeProcessor) SubtreeProcessorState { return SubtreeProcessorState{ ChainedSubtreesCount: len(stp.chainedSubtrees), - CurrentSubtreeLength: stp.currentSubtree.Length(), + CurrentSubtreeLength: stp.currentSubtree.Load().Length(), TxCount: stp.TxCount(), CurrentTxMapLength: stp.currentTxMap.Length(), } @@ -263,12 +263,12 @@ func Test_RemoveTxFromSubtrees(t *testing.T) { // Subtrees 2-10: 4 txs each = 36 txs // Remaining in current: 42 - 39 = 3 txs t.Logf("Number of chained subtrees: %d", len(stp.chainedSubtrees)) - t.Logf("Current subtree nodes: %d", len(stp.currentSubtree.Nodes)) + t.Logf("Current subtree nodes: %d", len(stp.currentSubtree.Load().Nodes)) if len(stp.chainedSubtrees) > 5 { t.Logf("Subtree 5 has %d nodes", len(stp.chainedSubtrees[5].Nodes)) } assert.Len(t, stp.chainedSubtrees, 10) - assert.Len(t, stp.currentSubtree.Nodes, 3) + assert.Len(t, stp.currentSubtree.Load().Nodes, 3) // get the middle transaction from the middle subtree txHash := stp.chainedSubtrees[5].Nodes[2].Hash @@ -299,11 +299,11 @@ func Test_RemoveTxFromSubtrees(t *testing.T) { // After removing and rechaining, we may have fewer subtrees due to proper duplicate handling // The rechaining process rebuilds from the removal point, properly detecting duplicates t.Logf("After rechaining - Number of chained subtrees: %d", len(stp.chainedSubtrees)) - t.Logf("After rechaining - Current subtree nodes: %d", len(stp.currentSubtree.Nodes)) + t.Logf("After rechaining - Current subtree nodes: %d", len(stp.currentSubtree.Load().Nodes)) // We should have at least the subtrees before the removal point assert.GreaterOrEqual(t, len(stp.chainedSubtrees), 5) // Current subtree should have some nodes but may vary due to rechaining - assert.GreaterOrEqual(t, len(stp.currentSubtree.Nodes), 0) + assert.GreaterOrEqual(t, len(stp.currentSubtree.Load().Nodes), 0) // check that the txHash node has been removed from the currentTxMap _, ok = stp.currentTxMap.Get(txHash) @@ -360,7 +360,7 @@ func TestReChainSubtrees(t *testing.T) { // With 42 unique transactions and 4 items per subtree: // 10 complete subtrees + 3 remaining in current assert.Len(t, stp.chainedSubtrees, 10) - assert.Len(t, stp.currentSubtree.Nodes, 3) + assert.Len(t, stp.currentSubtree.Load().Nodes, 3) // check the fee in the middle node the middle subtree assert.Len(t, stp.chainedSubtrees[5].Nodes, 4) @@ -394,7 +394,7 @@ func TestReChainSubtrees(t *testing.T) { assert.Len(t, stp.chainedSubtrees[5].Nodes, 4) // currentSubtree should have 2 nodes - assert.Len(t, stp.currentSubtree.Nodes, 2) + assert.Len(t, stp.currentSubtree.Load().Nodes, 2) // all chainedSubtrees should have 4 nodes for i := 0; i < 10; i++ { @@ -457,7 +457,7 @@ func TestGetMerkleProofForCoinbase(t *testing.T) { require.NoError(t, err) if i == 0 { - stp.currentSubtree.ReplaceRootNode(hash, 0, 0) + stp.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { stp.Add(subtreepkg.Node{Hash: *hash, Fee: 1}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -499,7 +499,7 @@ func TestGetMerkleProofForCoinbase(t *testing.T) { require.NoError(t, err) if i == 0 { - stp.currentSubtree.ReplaceRootNode(hash, 0, 0) + stp.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { stp.Add(subtreepkg.Node{Hash: *hash, Fee: 1}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -592,7 +592,7 @@ func TestMoveForwardBlock(t *testing.T) { require.NoError(t, err) if i == 0 { - stp.currentSubtree.ReplaceRootNode(hash, 0, 0) + stp.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { stp.Add(subtreepkg.Node{Hash: *hash, Fee: 1}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -608,7 +608,7 @@ func TestMoveForwardBlock(t *testing.T) { // there should be 4 chained subtrees assert.Equal(t, 4, len(stp.chainedSubtrees)) assert.Equal(t, 4, stp.chainedSubtrees[0].Size()) - assert.Equal(t, 2, stp.currentSubtree.Length()) + assert.Equal(t, 2, stp.currentSubtree.Load().Length()) assert.Equal(t, int(n-1), stp.currentTxMap.Length()) //nolint:gosec @@ -637,7 +637,7 @@ func TestMoveForwardBlock(t *testing.T) { // we added the coinbase placeholder assert.Equal(t, 5, len(stp.chainedSubtrees)) assert.Equal(t, 2, stp.chainedSubtrees[0].Size()) - assert.Equal(t, 1, stp.currentSubtree.Length()) + assert.Equal(t, 1, stp.currentSubtree.Load().Length()) // check the currentTxMap, it will have 1 less than the tx count, which has the coinbase placeholder assert.Equal(t, int(stp.TxCount()), stp.currentTxMap.Length()+1) // nolint:gosec @@ -700,8 +700,8 @@ func TestMoveForwardBlock_LeftInQueue(t *testing.T) { require.NoError(t, err) assert.Len(t, subtreeProcessor.chainedSubtrees, 0) - assert.Len(t, subtreeProcessor.currentSubtree.Nodes, 1) - assert.Equal(t, *subtreepkg.CoinbasePlaceholderHash, subtreeProcessor.currentSubtree.Nodes[0].Hash) + assert.Len(t, subtreeProcessor.currentSubtree.Load().Nodes, 1) + assert.Equal(t, *subtreepkg.CoinbasePlaceholderHash, subtreeProcessor.currentSubtree.Load().Nodes[0].Hash) } func TestIncompleteSubtreeMoveForwardBlock(t *testing.T) { @@ -759,7 +759,7 @@ func TestIncompleteSubtreeMoveForwardBlock(t *testing.T) { require.NoError(t, err) if i == 0 { - stp.currentSubtree.ReplaceRootNode(hash, 0, 0) + stp.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { stp.Add(subtreepkg.Node{Hash: *hash, Fee: 1}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -776,7 +776,7 @@ func TestIncompleteSubtreeMoveForwardBlock(t *testing.T) { assert.Equal(t, 4, len(stp.chainedSubtrees)) assert.Equal(t, 4, stp.chainedSubtrees[0].Size()) // and 1 tx in the current subtree - assert.Equal(t, 1, stp.currentSubtree.Length()) + assert.Equal(t, 1, stp.currentSubtree.Load().Length()) stp.currentItemsPerFile = 2 _ = stp.utxoStore.SetBlockHeight(1) @@ -801,7 +801,7 @@ func TestIncompleteSubtreeMoveForwardBlock(t *testing.T) { wg.Wait() assert.Equal(t, 5, len(stp.chainedSubtrees)) - assert.Equal(t, 0, stp.currentSubtree.Length()) + assert.Equal(t, 0, stp.currentSubtree.Load().Length()) } // current subtree should have 1 tx which due to the new added coinbase placeholder @@ -862,7 +862,7 @@ func TestSubtreeMoveForwardBlockNewCurrent(t *testing.T) { require.NoError(t, err) if i == 0 { - stp.currentSubtree.ReplaceRootNode(hash, 0, 0) + stp.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { stp.Add(subtreepkg.Node{Hash: *hash, Fee: 1}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -877,7 +877,7 @@ func TestSubtreeMoveForwardBlockNewCurrent(t *testing.T) { assert.Equal(t, 4, len(stp.chainedSubtrees)) assert.Equal(t, 4, stp.chainedSubtrees[0].Size()) // and 0 tx in the current subtree - assert.Equal(t, 0, stp.currentSubtree.Length()) + assert.Equal(t, 0, stp.currentSubtree.Load().Length()) stp.currentItemsPerFile = 2 _ = stp.utxoStore.SetBlockHeight(1) @@ -902,7 +902,7 @@ func TestSubtreeMoveForwardBlockNewCurrent(t *testing.T) { wg.Wait() require.NoError(t, err) assert.Equal(t, 4, len(stp.chainedSubtrees)) - assert.Equal(t, 1, stp.currentSubtree.Length()) + assert.Equal(t, 1, stp.currentSubtree.Load().Length()) } func TestCompareMerkleProofsToSubtrees(t *testing.T) { @@ -948,7 +948,7 @@ func TestCompareMerkleProofsToSubtrees(t *testing.T) { for i, hash := range hashes { if i == 0 { - subtreeProcessor.currentSubtree.ReplaceRootNode(hash, 0, 0) + subtreeProcessor.currentSubtree.Load().ReplaceRootNode(hash, 0, 0) } else { subtreeProcessor.Add(subtreepkg.Node{Hash: *hash, Fee: 111}, subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{*hash}}) } @@ -1081,10 +1081,11 @@ func TestSubtreeProcessor_getRemainderTxHashes(t *testing.T) { chainedSubtrees = append(chainedSubtrees, lastSubtree) // Setup fresh subtree processor state - subtreeProcessor.currentSubtree, err = subtreepkg.NewTree(4) + newSubtree, err := subtreepkg.NewTree(4) require.NoError(t, err) + subtreeProcessor.currentSubtree.Store(newSubtree) subtreeProcessor.chainedSubtrees = make([]*subtreepkg.Subtree, 0) - _ = subtreeProcessor.currentSubtree.AddCoinbaseNode() + _ = subtreeProcessor.currentSubtree.Load().AddCoinbaseNode() // Setup maps transactionMap := txmap.NewSplitSwissMap(4) // Transactions that are in the new block @@ -1108,7 +1109,7 @@ func TestSubtreeProcessor_getRemainderTxHashes(t *testing.T) { for _, subtree := range subtreeProcessor.chainedSubtrees { remainder = append(remainder, subtree.Nodes...) } - remainder = append(remainder, subtreeProcessor.currentSubtree.Nodes...) + remainder = append(remainder, subtreeProcessor.currentSubtree.Load().Nodes...) // With duplicate detection, only the coinbase should remain (no duplicates added) assert.Equal(t, 1, len(remainder)) @@ -1146,12 +1147,13 @@ func TestSubtreeProcessor_getRemainderTxHashes(t *testing.T) { // "f923a14068167a9107a0b7cd6102bfa5c0a4c8a72726a82f12e91009fd7e33be", } - subtreeProcessor.currentSubtree, err = subtreepkg.NewTree(4) + newSubtree2, err := subtreepkg.NewTree(4) require.NoError(t, err) + subtreeProcessor.currentSubtree.Store(newSubtree2) subtreeProcessor.chainedSubtrees = make([]*subtreepkg.Subtree, 0) - _ = subtreeProcessor.currentSubtree.AddCoinbaseNode() + _ = subtreeProcessor.currentSubtree.Load().AddCoinbaseNode() // Test scenario 2: Some transactions are in the new block (transactionMap) // Clear the subtreeProcessor state but keep currentTxMap populated @@ -1180,7 +1182,7 @@ func TestSubtreeProcessor_getRemainderTxHashes(t *testing.T) { remainder = append(remainder, subtree.Nodes...) } - remainder = append(remainder, subtreeProcessor.currentSubtree.Nodes...) + remainder = append(remainder, subtreeProcessor.currentSubtree.Load().Nodes...) // With duplicate detection, only the coinbase remains (no duplicates added) assert.Equal(t, 1, len(remainder)) @@ -1346,7 +1348,7 @@ func TestSubtreeProcessor_moveBackBlock(t *testing.T) { // there should be 4 chained subtrees assert.Equal(t, 4, len(stp.chainedSubtrees)) assert.Equal(t, 4, stp.chainedSubtrees[0].Size()) - assert.Equal(t, 3, stp.currentSubtree.Length()) + assert.Equal(t, 3, stp.currentSubtree.Load().Length()) // create 2 subtrees from the previous block subtree1 := createSubtree(t, 4, true) @@ -1388,7 +1390,7 @@ func TestSubtreeProcessor_moveBackBlock(t *testing.T) { assert.Equal(t, 6, len(stp.chainedSubtrees)) assert.Equal(t, 4, stp.chainedSubtrees[0].Size()) - assert.Equal(t, 2, stp.currentSubtree.Length()) + assert.Equal(t, 2, stp.currentSubtree.Load().Length()) // check that the nodes from subtree1 and subtree2 are the first nodes for i := 0; i < 4; i++ { @@ -1402,7 +1404,7 @@ func TestSubtreeProcessor_moveBackBlock(t *testing.T) { shouldBeInNode := idx % 4 if shouldBeInSubtree > len(stp.chainedSubtrees)-1 { - assert.Equal(t, txHash, stp.currentSubtree.Nodes[shouldBeInNode].Hash) + assert.Equal(t, txHash, stp.currentSubtree.Load().Nodes[shouldBeInNode].Hash) } else { assert.Equal(t, txHash, stp.chainedSubtrees[shouldBeInSubtree].Nodes[shouldBeInNode].Hash) } @@ -1489,7 +1491,7 @@ func TestSubtreeProcessor_moveBackBlock(t *testing.T) { // Verify state after processing empty block assert.Equal(t, 0, len(stp.chainedSubtrees)) - assert.Equal(t, 1, stp.currentSubtree.Length()) // Should only have coinbase placeholder + assert.Equal(t, 1, stp.currentSubtree.Load().Length()) // Should only have coinbase placeholder }) // Test subtree store errors with state reset verification @@ -1595,7 +1597,7 @@ func TestSubtreeProcessor_moveBackBlock(t *testing.T) { // Verify the coinbase placeholder was handled correctly assert.Equal(t, 0, len(stp.chainedSubtrees)) - assert.Equal(t, 1, stp.currentSubtree.Length()) + assert.Equal(t, 1, stp.currentSubtree.Load().Length()) }) // Test SetBlockProcessedAt error (non-critical path) @@ -2879,7 +2881,7 @@ func TestRemoveTxsFromSubtreesBasic(t *testing.T) { // Verify transaction was added _, exists := stp.currentTxMap.Get(txHash) require.True(t, exists, "Transaction should be in currentTxMap") - require.True(t, stp.currentSubtree.NodeIndex(txHash) >= 0, "Transaction should be in current subtree") + require.True(t, stp.currentSubtree.Load().NodeIndex(txHash) >= 0, "Transaction should be in current subtree") initialTxCount := stp.TxCount() @@ -2896,7 +2898,7 @@ func TestRemoveTxsFromSubtreesBasic(t *testing.T) { t.Logf("Transaction still in currentTxMap: %v", stillExists) // Check if transaction was removed from current subtree - indexAfter := stp.currentSubtree.NodeIndex(txHash) + indexAfter := stp.currentSubtree.Load().NodeIndex(txHash) t.Logf("Transaction index in current subtree after removal: %d", indexAfter) }) @@ -2930,7 +2932,7 @@ func TestRemoveTxsFromSubtreesBasic(t *testing.T) { for _, hash := range txHashes { _, stillExists := stp.currentTxMap.Get(hash) - indexAfter := stp.currentSubtree.NodeIndex(hash) + indexAfter := stp.currentSubtree.Load().NodeIndex(hash) t.Logf("Hash %s - still in map: %v, index: %d", hash.String()[:8], stillExists, indexAfter) } }) @@ -2967,7 +2969,7 @@ func TestRemoveTxsFromSubtreesBasic(t *testing.T) { t.Logf("Chained subtrees after: %d", len(stp.chainedSubtrees)) _, stillExists := stp.currentTxMap.Get(targetHash) - currentIndex := stp.currentSubtree.NodeIndex(targetHash) + currentIndex := stp.currentSubtree.Load().NodeIndex(targetHash) t.Logf("Target hash still in map: %v, current subtree index: %d", stillExists, currentIndex) // Check chained subtrees diff --git a/services/blockassembly/subtreeprocessor/reorg_duplicate_bug_test.go b/services/blockassembly/subtreeprocessor/reorg_duplicate_bug_test.go index 17cec0dc6..50a6fa684 100644 --- a/services/blockassembly/subtreeprocessor/reorg_duplicate_bug_test.go +++ b/services/blockassembly/subtreeprocessor/reorg_duplicate_bug_test.go @@ -224,8 +224,8 @@ func countTransactionInSubtreesForTest(stp *SubtreeProcessor, txHash chainhash.H } // Check current subtree - if stp.currentSubtree != nil { - for _, node := range stp.currentSubtree.Nodes { + if currentSubtree := stp.currentSubtree.Load(); currentSubtree != nil { + for _, node := range currentSubtree.Nodes { if node.Hash.Equal(txHash) { count++ } diff --git a/services/blockassembly/subtreeprocessor/subtree_size_benchmark_test.go b/services/blockassembly/subtreeprocessor/subtree_size_benchmark_test.go index 7d2946c6c..ebc0c2715 100644 --- a/services/blockassembly/subtreeprocessor/subtree_size_benchmark_test.go +++ b/services/blockassembly/subtreeprocessor/subtree_size_benchmark_test.go @@ -74,7 +74,7 @@ func TestSubtreeProcessorSizePerformance(t *testing.T) { duration := time.Since(start) subtreeCount := len(stp.chainedSubtrees) - if stp.currentSubtree.Length() > 1 { // >1 because of coinbase placeholder + if stp.currentSubtree.Load().Length() > 1 { // >1 because of coinbase placeholder subtreeCount++ } @@ -567,7 +567,7 @@ func BenchmarkSubtreeProcessorOverheadBreakdown(b *testing.B) { b.StopTimer() subtreeCount := len(stp.chainedSubtrees) - if stp.currentSubtree != nil && stp.currentSubtree.Length() > 0 { + if currentSubtree := stp.currentSubtree.Load(); currentSubtree != nil && currentSubtree.Length() > 0 { subtreeCount++ } rotations := len(stp.chainedSubtrees)