Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions settings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ type UtxoStoreSettings struct {
MaxMinedBatchSize int
BlockHeightRetentionAdjustment int32 // Adjustment to GlobalBlockHeightRetention (can be positive or negative)
DisableDAHCleaner bool // Disable the DAH cleaner process completely
DefensiveCleanupEnabled bool // Enable defensive checks before deleting transactions (verify children are mined > BlockHeightRetention blocks ago)
// Cleanup-specific settings
CleanupParentUpdateBatcherSize int // Batch size for parent record updates during cleanup
CleanupParentUpdateBatcherDurationMillis int // Batch duration for parent record updates during cleanup (ms)
Expand Down
1 change: 1 addition & 0 deletions settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func NewSettings(alternativeContext ...string) *Settings {
MaxMinedBatchSize: getInt("utxostore_maxMinedBatchSize", 1024, alternativeContext...),
BlockHeightRetentionAdjustment: getInt32("utxostore_blockHeightRetentionAdjustment", 0, alternativeContext...),
DisableDAHCleaner: getBool("utxostore_disableDAHCleaner", false, alternativeContext...),
DefensiveCleanupEnabled: getBool("utxostore_defensiveCleanupEnabled", false, alternativeContext...),
// Cleanup-specific settings with reasonable defaults
CleanupParentUpdateBatcherSize: getInt("utxostore_cleanupParentUpdateBatcherSize", 100, alternativeContext...),
CleanupParentUpdateBatcherDurationMillis: getInt("utxostore_cleanupParentUpdateBatcherDurationMillis", 10, alternativeContext...),
Expand Down
135 changes: 134 additions & 1 deletion stores/utxo/aerospike/cleanup/cleanup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cleanup
import (
"bytes"
"context"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -351,7 +352,13 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) {

// Create a query statement
stmt := aerospike.NewStatement(s.namespace, s.set)
stmt.BinNames = []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()}

// Add Outputs field if defensive cleanup is enabled
binNames := []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()}
if s.settings.UtxoStore.DefensiveCleanupEnabled {
binNames = append(binNames, fields.Outputs.String())
}
stmt.BinNames = binNames

// Set the filter to find records with a delete_at_height less than or equal to the safe cleanup height
// This will automatically use the index since the filter is on the indexed bin
Expand Down Expand Up @@ -668,6 +675,123 @@ func (s *Service) ProcessSingleRecord(txid *chainhash.Hash, inputs []*bt.Input)
return <-errCh
}

// verifyChildrenBeforeDeletion checks if all children of a transaction are mined deep enough
// Returns (canDelete bool, reason string)
func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *chainhash.Hash, currentHeight uint32, workerID int) (bool, string) {
retention := s.settings.GetUtxoStoreBlockHeightRetention()

// Get outputs from bins
outputsData, ok := bins[fields.Outputs.String()]
if !ok {
// No outputs data, safe to delete
return true, ""
}

// Parse outputs - the format depends on how outputs are stored in Aerospike
// Outputs are stored as a map[uint32][]byte where key is output index and value is spending data
outputsMap, ok := outputsData.(map[interface{}]interface{})
if !ok {
// Cannot parse outputs, skip deletion to be safe
return false, "cannot parse outputs data"
}

problematicChildren := []string{}

for _, spendingDataInterface := range outputsMap {
if spendingDataInterface == nil {
// Output not spent, continue
continue
}

// Extract spending transaction hash from spending data
// spending_data format: [32 bytes tx hash][4 bytes output index]
spendingData, ok := spendingDataInterface.([]byte)
if !ok || len(spendingData) < 32 {
continue
}

// Get the spending transaction hash (child)
childTxHash, err := chainhash.NewHash(spendingData[:32])
if err != nil {
s.logger.Warnf("Worker %d: failed to parse child tx hash for tx %s: %v", workerID, txHash.String()[:16], err)
continue
}

// Query the child transaction to check its block height
childKey, err := aerospike.NewKey(s.namespace, s.set, childTxHash.CloneBytes())
if err != nil {
s.logger.Warnf("Worker %d: failed to create key for child tx %s: %v", workerID, childTxHash.String()[:16], err)
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (key error)")
continue
}

childRec, err := s.client.Get(nil, childKey, fields.BlockHeights.String())
if err != nil {
// Child not found or error reading - be conservative and skip deletion
s.logger.Warnf("Worker %d: failed to read child tx %s: %v", workerID, childTxHash.String()[:16], err)
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (read error)")
continue
}

if childRec == nil || childRec.Bins == nil {
// Child transaction not found, skip deletion to be safe
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not found)")
continue
}

// Check block heights
blockHeightsData, ok := childRec.Bins[fields.BlockHeights.String()]
if !ok || blockHeightsData == nil {
// Child is unmined
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)")
continue
}

// BlockHeights is stored as []uint32
blockHeights, ok := blockHeightsData.([]interface{})
if !ok || len(blockHeights) == 0 {
// No block heights, child is unmined
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)")
continue
}

// Get the first (primary) block height
var childBlockHeight uint32
switch v := blockHeights[0].(type) {
case uint32:
childBlockHeight = v
case int:
childBlockHeight = uint32(v)
case int64:
childBlockHeight = uint32(v)
default:
s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, childTxHash.String()[:16], v)
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (invalid height type)")
continue
}

// Check if child is confirmed deep enough
if currentHeight <= childBlockHeight {
// Current height is less than or equal to child height - this shouldn't happen
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (height inconsistency)")
continue
}

confirmationDepth := currentHeight - childBlockHeight
if confirmationDepth <= retention {
problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not confirmed deep enough)")
continue
}
}

if len(problematicChildren) > 0 {
reason := "has children that are not safe to delete: " + strings.Join(problematicChildren, ", ")
return false, reason
}

return true, ""
}

// processRecordCleanup processes a single record for cleanup using batchers
func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aerospike.Result) error {
if rec.Err != nil {
Expand All @@ -690,6 +814,15 @@ func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aero
return errors.NewProcessingError("Worker %d: invalid txid bytes for record in cleanup job %d", workerID, job.BlockHeight)
}

// Defensive cleanup: verify children before deletion if enabled
if s.settings.UtxoStore.DefensiveCleanupEnabled {
canDelete, reason := s.verifyChildrenBeforeDeletion(bins, txHash, job.BlockHeight, workerID)
if !canDelete {
s.logger.Warnf("Worker %d: skipping deletion of tx %s: %s", workerID, txHash.String()[:16], reason)
return nil // Skip this transaction, don't return error
}
}

inputs, err := s.getTxInputsFromBins(job, workerID, bins, txHash)
if err != nil {
return err
Expand Down
Loading
Loading