From d38ebee2d0483f58713426d7f80e156e5da909ab Mon Sep 17 00:00:00 2001 From: freemans13 Date: Thu, 6 Nov 2025 14:52:54 +0000 Subject: [PATCH 01/12] cleanup defensive checks --- settings/interface.go | 1 + settings/settings.go | 1 + .../utxo/aerospike/cleanup/cleanup_service.go | 135 ++++++++- stores/utxo/sql/cleanup/cleanup_service.go | 260 +++++++++++++++++- .../utxo/sql/cleanup/cleanup_service_test.go | 58 ++++ 5 files changed, 448 insertions(+), 7 deletions(-) diff --git a/settings/interface.go b/settings/interface.go index 12dd0342d8..d1885b990c 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -365,6 +365,7 @@ type UtxoStoreSettings struct { MaxMinedBatchSize int BlockHeightRetentionAdjustment int32 // Adjustment to GlobalBlockHeightRetention (can be positive or negative) DisableDAHCleaner bool // Disable the DAH cleaner process completely + DefensiveCleanupEnabled bool // Enable defensive checks before deleting transactions (verify children are mined > BlockHeightRetention blocks ago) // Cleanup-specific settings CleanupParentUpdateBatcherSize int // Batch size for parent record updates during cleanup CleanupParentUpdateBatcherDurationMillis int // Batch duration for parent record updates during cleanup (ms) diff --git a/settings/settings.go b/settings/settings.go index 5114266afc..afce9124a2 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -350,6 +350,7 @@ func NewSettings(alternativeContext ...string) *Settings { MaxMinedBatchSize: getInt("utxostore_maxMinedBatchSize", 1024, alternativeContext...), BlockHeightRetentionAdjustment: getInt32("utxostore_blockHeightRetentionAdjustment", 0, alternativeContext...), DisableDAHCleaner: getBool("utxostore_disableDAHCleaner", false, alternativeContext...), + DefensiveCleanupEnabled: getBool("utxostore_defensiveCleanupEnabled", false, alternativeContext...), // Cleanup-specific settings with reasonable defaults CleanupParentUpdateBatcherSize: getInt("utxostore_cleanupParentUpdateBatcherSize", 100, alternativeContext...), CleanupParentUpdateBatcherDurationMillis: getInt("utxostore_cleanupParentUpdateBatcherDurationMillis", 10, alternativeContext...), diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index c3c8b2183a..51b3e099c7 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -3,6 +3,7 @@ package cleanup import ( "bytes" "context" + "strings" "sync" "sync/atomic" "time" @@ -351,7 +352,13 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { // Create a query statement stmt := aerospike.NewStatement(s.namespace, s.set) - stmt.BinNames = []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()} + + // Add Outputs field if defensive cleanup is enabled + binNames := []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()} + if s.settings.UtxoStore.DefensiveCleanupEnabled { + binNames = append(binNames, fields.Outputs.String()) + } + stmt.BinNames = binNames // Set the filter to find records with a delete_at_height less than or equal to the safe cleanup height // This will automatically use the index since the filter is on the indexed bin @@ -668,6 +675,123 @@ func (s *Service) ProcessSingleRecord(txid *chainhash.Hash, inputs []*bt.Input) return <-errCh } +// verifyChildrenBeforeDeletion checks if all children of a transaction are mined deep enough +// Returns (canDelete bool, reason string) +func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *chainhash.Hash, currentHeight uint32, workerID int) (bool, string) { + retention := s.settings.GetUtxoStoreBlockHeightRetention() + + // Get outputs from bins + outputsData, ok := bins[fields.Outputs.String()] + if !ok { + // No outputs data, safe to delete + return true, "" + } + + // Parse outputs - the format depends on how outputs are stored in Aerospike + // Outputs are stored as a map[uint32][]byte where key is output index and value is spending data + outputsMap, ok := outputsData.(map[interface{}]interface{}) + if !ok { + // Cannot parse outputs, skip deletion to be safe + return false, "cannot parse outputs data" + } + + problematicChildren := []string{} + + for _, spendingDataInterface := range outputsMap { + if spendingDataInterface == nil { + // Output not spent, continue + continue + } + + // Extract spending transaction hash from spending data + // spending_data format: [32 bytes tx hash][4 bytes output index] + spendingData, ok := spendingDataInterface.([]byte) + if !ok || len(spendingData) < 32 { + continue + } + + // Get the spending transaction hash (child) + childTxHash, err := chainhash.NewHash(spendingData[:32]) + if err != nil { + s.logger.Warnf("Worker %d: failed to parse child tx hash for tx %s: %v", workerID, txHash.String()[:16], err) + continue + } + + // Query the child transaction to check its block height + childKey, err := aerospike.NewKey(s.namespace, s.set, childTxHash.CloneBytes()) + if err != nil { + s.logger.Warnf("Worker %d: failed to create key for child tx %s: %v", workerID, childTxHash.String()[:16], err) + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (key error)") + continue + } + + childRec, err := s.client.Get(nil, childKey, fields.BlockHeights.String()) + if err != nil { + // Child not found or error reading - be conservative and skip deletion + s.logger.Warnf("Worker %d: failed to read child tx %s: %v", workerID, childTxHash.String()[:16], err) + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (read error)") + continue + } + + if childRec == nil || childRec.Bins == nil { + // Child transaction not found, skip deletion to be safe + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not found)") + continue + } + + // Check block heights + blockHeightsData, ok := childRec.Bins[fields.BlockHeights.String()] + if !ok || blockHeightsData == nil { + // Child is unmined + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)") + continue + } + + // BlockHeights is stored as []uint32 + blockHeights, ok := blockHeightsData.([]interface{}) + if !ok || len(blockHeights) == 0 { + // No block heights, child is unmined + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)") + continue + } + + // Get the first (primary) block height + var childBlockHeight uint32 + switch v := blockHeights[0].(type) { + case uint32: + childBlockHeight = v + case int: + childBlockHeight = uint32(v) + case int64: + childBlockHeight = uint32(v) + default: + s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, childTxHash.String()[:16], v) + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (invalid height type)") + continue + } + + // Check if child is confirmed deep enough + if currentHeight <= childBlockHeight { + // Current height is less than or equal to child height - this shouldn't happen + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (height inconsistency)") + continue + } + + confirmationDepth := currentHeight - childBlockHeight + if confirmationDepth <= retention { + problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not confirmed deep enough)") + continue + } + } + + if len(problematicChildren) > 0 { + reason := "has children that are not safe to delete: " + strings.Join(problematicChildren, ", ") + return false, reason + } + + return true, "" +} + // processRecordCleanup processes a single record for cleanup using batchers func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aerospike.Result) error { if rec.Err != nil { @@ -690,6 +814,15 @@ func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aero return errors.NewProcessingError("Worker %d: invalid txid bytes for record in cleanup job %d", workerID, job.BlockHeight) } + // Defensive cleanup: verify children before deletion if enabled + if s.settings.UtxoStore.DefensiveCleanupEnabled { + canDelete, reason := s.verifyChildrenBeforeDeletion(bins, txHash, job.BlockHeight, workerID) + if !canDelete { + s.logger.Warnf("Worker %d: skipping deletion of tx %s: %s", workerID, txHash.String()[:16], reason) + return nil // Skip this transaction, don't return error + } + } + inputs, err := s.getTxInputsFromBins(job, workerID, bins, txHash) if err != nil { return err diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index 79638eb3bd..19179bc414 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -2,6 +2,8 @@ package cleanup import ( "context" + "fmt" + "strings" "time" "github.com/bsv-blockchain/teranode/errors" @@ -179,7 +181,7 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { } // Execute the cleanup with safe height - err := deleteTombstoned(s.db, safeCleanupHeight) + err := deleteTombstoned(s.db, s.settings, s.logger, safeCleanupHeight, workerID) if err != nil { job.SetStatus(cleanup.JobStatusFailed) @@ -206,12 +208,258 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { } } +// ChildVerificationResult contains the result of verifying a transaction's children +type ChildVerificationResult struct { + CanDelete bool + Reason string + ProblematicChildren []ChildInfo +} + +// ChildInfo contains information about a child transaction +type ChildInfo struct { + Hash string + IsMined bool + BlockHeight *uint32 +} + +// batchVerifyChildrenBeforeDeletion checks all transactions' children in a single query +// Returns a map of tx hash -> verification result +// NOTE: Transactions with no spent outputs will NOT be in the returned map - caller must handle this +func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retention uint32) (map[string]*ChildVerificationResult, error) { + // Single query to get all transactions with their spent outputs and child transaction info + // This replaces N queries with 1 query, massively improving performance + // + // IMPORTANT: We only return transactions that HAVE spent outputs. Transactions with: + // - No outputs at all + // - Only unspent outputs + // Will NOT appear in the results map. The caller should treat these as safe to delete. + query := ` + WITH transactions_to_check AS ( + SELECT id, hash + FROM transactions + WHERE delete_at_height <= $1 + ) + SELECT + ttc.hash as parent_hash, + o.idx as output_index, + o.spending_data, + t_child.hash as child_hash, + t_child.block_height as child_block_height + FROM transactions_to_check ttc + INNER JOIN outputs o ON ttc.id = o.transaction_id + LEFT JOIN inputs i ON o.spending_data IS NOT NULL + AND i.input_hash = ttc.hash + AND i.input_index = o.idx + LEFT JOIN transactions t_child ON i.transaction_id = t_child.id + WHERE o.spending_data IS NOT NULL + ORDER BY ttc.hash + ` + + rows, err := db.Query(query, currentHeight) + if err != nil { + return nil, errors.NewStorageError("failed to batch query children", err) + } + defer rows.Close() + + // Build results map + results := make(map[string]*ChildVerificationResult) + + for rows.Next() { + var parentHash string + var outputIndex uint32 + var spendingData *[]byte + var childHash *string + var childBlockHeight *uint32 + + if err := rows.Scan(&parentHash, &outputIndex, &spendingData, &childHash, &childBlockHeight); err != nil { + return nil, errors.NewStorageError("failed to scan batch child row", err) + } + + // Initialize result for this parent if not exists + if _, exists := results[parentHash]; !exists { + results[parentHash] = &ChildVerificationResult{ + CanDelete: true, + ProblematicChildren: []ChildInfo{}, + } + } + + result := results[parentHash] + + // Analyze this spent output's child transaction + if spendingData != nil { + // Child transaction should exist if output is spent + if childHash == nil { + // Spent output but no child found - data inconsistency + result.CanDelete = false + result.Reason = "has spent outputs with missing child transactions" + result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ + Hash: fmt.Sprintf("output_%d", outputIndex), + IsMined: false, + BlockHeight: nil, + }) + continue + } + + child := ChildInfo{ + Hash: *childHash, + IsMined: childBlockHeight != nil, + BlockHeight: childBlockHeight, + } + + // Check if child is mined + if !child.IsMined { + result.CanDelete = false + result.Reason = "has unmined children" + result.ProblematicChildren = append(result.ProblematicChildren, child) + continue + } + + // Check if child is confirmed deep enough (current height - child height > retention) + confirmationDepth := currentHeight - *childBlockHeight + if confirmationDepth <= retention { + result.CanDelete = false + result.Reason = "has children not confirmed deep enough" + result.ProblematicChildren = append(result.ProblematicChildren, child) + } + } + } + + if err := rows.Err(); err != nil { + return nil, errors.NewStorageError("error iterating batch child rows", err) + } + + return results, nil +} + // deleteTombstoned removes transactions that have passed their expiration time. -func deleteTombstoned(db *usql.DB, blockHeight uint32) error { - // Delete transactions that have passed their expiration time - // this will cascade to inputs, outputs, block_ids and conflicting_children - if _, err := db.Exec("DELETE FROM transactions WHERE delete_at_height <= $1", blockHeight); err != nil { - return errors.NewStorageError("failed to delete transactions", err) +func deleteTombstoned(db *usql.DB, tSettings *settings.Settings, logger ulogger.Logger, blockHeight uint32, workerID int) error { + // If defensive cleanup is disabled, use the simple delete approach + if !tSettings.UtxoStore.DefensiveCleanupEnabled { + // Delete transactions that have passed their expiration time + // this will cascade to inputs, outputs, block_ids and conflicting_children + if _, err := db.Exec("DELETE FROM transactions WHERE delete_at_height <= $1", blockHeight); err != nil { + return errors.NewStorageError("failed to delete transactions", err) + } + return nil + } + + // Defensive cleanup: verify children before deletion + logger.Debugf("[SQLCleanupService %d] defensive cleanup enabled, verifying children before deletion", workerID) + + retention := tSettings.GetUtxoStoreBlockHeightRetention() + + // Batch verify all transactions and their children in a single query + verificationResults, err := batchVerifyChildrenBeforeDeletion(db, blockHeight, retention) + if err != nil { + return errors.NewStorageError("failed to batch verify children", err) + } + + // Query all transactions that would be deleted + query := "SELECT hash FROM transactions WHERE delete_at_height <= $1" + rows, err := db.Query(query, blockHeight) + if err != nil { + return errors.NewStorageError("failed to query transactions for deletion", err) + } + defer rows.Close() + + var txHashesToDelete []string + skippedCount := 0 + + for rows.Next() { + var txHash string + if err := rows.Scan(&txHash); err != nil { + return errors.NewStorageError("failed to scan transaction hash", err) + } + + // Check verification result from batch query + verificationResult, hasSpentOutputs := verificationResults[txHash] + + // DEFENSIVE: If transaction is not in verification results, it means we found no spent outputs + // This could mean: + // 1. Transaction has unspent outputs (should NOT delete - UTXOs still in use!) + // 2. Transaction has no outputs at all (rare edge case) + // 3. Data inconsistency (delete_at_height set but outputs not properly tracked) + // + // In defensive mode, we SKIP deletion if we don't have positive confirmation + // that all children are safe. This is the conservative/safe approach. + if !hasSpentOutputs { + skippedCount++ + logger.Warnf("[SQLCleanupService %d] skipping deletion of tx %s: no spent outputs found (may have unspent UTXOs or data inconsistency)", + workerID, txHash[:16]) + continue + } + + // Transaction has spent outputs, check if all their children are safe + if verificationResult.CanDelete { + txHashesToDelete = append(txHashesToDelete, txHash) + } else { + skippedCount++ + + // Log warning with details + problematicChildrenInfo := "" + for i, child := range verificationResult.ProblematicChildren { + if i > 0 { + problematicChildrenInfo += ", " + } + if child.IsMined { + problematicChildrenInfo += fmt.Sprintf("%s... (mined at height %d)", child.Hash[:8], *child.BlockHeight) + } else { + problematicChildrenInfo += child.Hash[:8] + "... (unmined)" + } + } + + logger.Warnf("[SQLCleanupService %d] skipping deletion of tx %s: %s [children: %s]", + workerID, txHash[:16], verificationResult.Reason, problematicChildrenInfo) + } + } + + if err := rows.Err(); err != nil { + return errors.NewStorageError("error iterating transactions", err) + } + + // Delete transactions that passed verification + if len(txHashesToDelete) > 0 { + logger.Infof("[SQLCleanupService %d] deleting %d transactions (skipped %d)", workerID, len(txHashesToDelete), skippedCount) + + // Delete in batches to avoid exceeding database parameter limits + // PostgreSQL limit is ~32,767 parameters, we use 1000 for safety margin + const maxBatchSize = 1000 + totalDeleted := 0 + + for i := 0; i < len(txHashesToDelete); i += maxBatchSize { + end := i + maxBatchSize + if end > len(txHashesToDelete) { + end = len(txHashesToDelete) + } + batch := txHashesToDelete[i:end] + + // Build parameterized IN clause for cross-database compatibility + placeholders := make([]string, len(batch)) + args := make([]interface{}, len(batch)) + for j, hash := range batch { + placeholders[j] = fmt.Sprintf("$%d", j+1) + args[j] = hash + } + + // This will cascade to inputs, outputs, block_ids and conflicting_children + deleteQuery := fmt.Sprintf("DELETE FROM transactions WHERE hash IN (%s)", strings.Join(placeholders, ",")) + result, err := db.Exec(deleteQuery, args...) + if err != nil { + return errors.NewStorageError(fmt.Sprintf("failed to delete batch of %d transactions", len(batch)), err) + } + + if result != nil { + if rowsAffected, err := result.RowsAffected(); err == nil { + totalDeleted += int(rowsAffected) + } + } + + logger.Debugf("[SQLCleanupService %d] deleted batch %d-%d (%d transactions)", workerID, i, end, len(batch)) + } + + logger.Infof("[SQLCleanupService %d] completed deletion of %d transactions", workerID, totalDeleted) + } else { + logger.Debugf("[SQLCleanupService %d] no transactions to delete (skipped %d)", workerID, skippedCount) } return nil diff --git a/stores/utxo/sql/cleanup/cleanup_service_test.go b/stores/utxo/sql/cleanup/cleanup_service_test.go index 369a0a389d..b15e64ca6f 100644 --- a/stores/utxo/sql/cleanup/cleanup_service_test.go +++ b/stores/utxo/sql/cleanup/cleanup_service_test.go @@ -681,3 +681,61 @@ func TestSQLCleanupWithBlockPersisterCoordination(t *testing.T) { } }) } + +func TestDefensiveCleanup(t *testing.T) { + t.Run("DefensiveCleanupDisabled_DeletesNormally", func(t *testing.T) { + logger := &MockLogger{} + db := NewMockDB() + + deleteCallCount := 0 + db.ExecFunc = func(query string, args ...interface{}) (sql.Result, error) { + deleteCallCount++ + assert.Contains(t, query, "DELETE FROM transactions WHERE delete_at_height") + return &MockResult{rowsAffected: 10}, nil + } + + tSettings := createTestSettings() + tSettings.UtxoStore.DefensiveCleanupEnabled = false + + service, err := NewService(tSettings, Options{ + Logger: logger, + DB: db.DB, + Ctx: context.Background(), + }) + require.NoError(t, err) + + service.Start(context.Background()) + + doneCh := make(chan string, 1) + err = service.UpdateBlockHeight(100, doneCh) + require.NoError(t, err) + + select { + case <-doneCh: + // Success - should have deleted directly + assert.Equal(t, 1, deleteCallCount, "Should have called DELETE once") + case <-time.After(2 * time.Second): + t.Fatal("Cleanup should complete") + } + }) + + t.Run("DefensiveCleanupEnabled_SettingRespected", func(t *testing.T) { + logger := &MockLogger{} + db := NewMockDB() + + tSettings := createTestSettings() + tSettings.UtxoStore.DefensiveCleanupEnabled = true + tSettings.UtxoStore.BlockHeightRetention = 288 + + service, err := NewService(tSettings, Options{ + Logger: logger, + DB: db.DB, + Ctx: context.Background(), + }) + require.NoError(t, err) + + // Verify the setting is properly stored + assert.True(t, service.settings.UtxoStore.DefensiveCleanupEnabled) + assert.Equal(t, uint32(288), service.settings.GetUtxoStoreBlockHeightRetention()) + }) +} From 8cd46bf4e9e8a3ed1a73f8dcdd0cd85eb840bb91 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Thu, 6 Nov 2025 15:02:50 +0000 Subject: [PATCH 02/12] fixes --- stores/utxo/sql/cleanup/cleanup_service.go | 91 ++++++++++--------- .../utxo/sql/cleanup/cleanup_service_test.go | 31 +++---- 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index 19179bc414..d3b2f9e356 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -224,15 +224,14 @@ type ChildInfo struct { // batchVerifyChildrenBeforeDeletion checks all transactions' children in a single query // Returns a map of tx hash -> verification result -// NOTE: Transactions with no spent outputs will NOT be in the returned map - caller must handle this +// NOTE: Transactions with no outputs at all will NOT be in the returned map - caller must handle this func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retention uint32) (map[string]*ChildVerificationResult, error) { - // Single query to get all transactions with their spent outputs and child transaction info + // Single query to get ALL outputs (spent and unspent) for transactions to be deleted // This replaces N queries with 1 query, massively improving performance // - // IMPORTANT: We only return transactions that HAVE spent outputs. Transactions with: - // - No outputs at all - // - Only unspent outputs - // Will NOT appear in the results map. The caller should treat these as safe to delete. + // IMPORTANT: We return ALL transactions that have outputs. For defensive verification: + // 1. Check that ALL outputs are spent (no unspent UTXOs remaining) + // 2. Check that all children of spent outputs are mined and confirmed deep enough query := ` WITH transactions_to_check AS ( SELECT id, hash @@ -251,8 +250,7 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent AND i.input_hash = ttc.hash AND i.input_index = o.idx LEFT JOIN transactions t_child ON i.transaction_id = t_child.id - WHERE o.spending_data IS NOT NULL - ORDER BY ttc.hash + ORDER BY ttc.hash, o.idx ` rows, err := db.Query(query, currentHeight) @@ -285,42 +283,53 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent result := results[parentHash] - // Analyze this spent output's child transaction - if spendingData != nil { - // Child transaction should exist if output is spent - if childHash == nil { - // Spent output but no child found - data inconsistency - result.CanDelete = false - result.Reason = "has spent outputs with missing child transactions" - result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ - Hash: fmt.Sprintf("output_%d", outputIndex), - IsMined: false, - BlockHeight: nil, - }) - continue - } + // First check: Is this an unspent output? + if spendingData == nil { + // UNSPENT OUTPUT - Transaction should NOT be deleted (has UTXOs still in use) + result.CanDelete = false + result.Reason = "has unspent outputs (UTXOs still in use)" + result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ + Hash: fmt.Sprintf("output_%d_unspent", outputIndex), + IsMined: false, + BlockHeight: nil, + }) + continue + } - child := ChildInfo{ - Hash: *childHash, - IsMined: childBlockHeight != nil, - BlockHeight: childBlockHeight, - } + // Output is spent - analyze the child transaction + // Child transaction should exist if output is spent + if childHash == nil { + // Spent output but no child found - data inconsistency + result.CanDelete = false + result.Reason = "has spent outputs with missing child transactions" + result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ + Hash: fmt.Sprintf("output_%d", outputIndex), + IsMined: false, + BlockHeight: nil, + }) + continue + } - // Check if child is mined - if !child.IsMined { - result.CanDelete = false - result.Reason = "has unmined children" - result.ProblematicChildren = append(result.ProblematicChildren, child) - continue - } + child := ChildInfo{ + Hash: *childHash, + IsMined: childBlockHeight != nil, + BlockHeight: childBlockHeight, + } - // Check if child is confirmed deep enough (current height - child height > retention) - confirmationDepth := currentHeight - *childBlockHeight - if confirmationDepth <= retention { - result.CanDelete = false - result.Reason = "has children not confirmed deep enough" - result.ProblematicChildren = append(result.ProblematicChildren, child) - } + // Check if child is mined + if !child.IsMined { + result.CanDelete = false + result.Reason = "has unmined children" + result.ProblematicChildren = append(result.ProblematicChildren, child) + continue + } + + // Check if child is confirmed deep enough (current height - child height > retention) + confirmationDepth := currentHeight - *childBlockHeight + if confirmationDepth <= retention { + result.CanDelete = false + result.Reason = "has children not confirmed deep enough" + result.ProblematicChildren = append(result.ProblematicChildren, child) } } diff --git a/stores/utxo/sql/cleanup/cleanup_service_test.go b/stores/utxo/sql/cleanup/cleanup_service_test.go index b15e64ca6f..22763a6695 100644 --- a/stores/utxo/sql/cleanup/cleanup_service_test.go +++ b/stores/utxo/sql/cleanup/cleanup_service_test.go @@ -18,6 +18,10 @@ import ( func createTestSettings() *settings.Settings { return &settings.Settings{ GlobalBlockHeightRetention: 288, // Default retention + UtxoStore: settings.UtxoStoreSettings{ + BlockHeightRetention: 288, + DefensiveCleanupEnabled: false, + }, } } @@ -683,17 +687,10 @@ func TestSQLCleanupWithBlockPersisterCoordination(t *testing.T) { } func TestDefensiveCleanup(t *testing.T) { - t.Run("DefensiveCleanupDisabled_DeletesNormally", func(t *testing.T) { + t.Run("DefensiveCleanupDisabled_FastPath", func(t *testing.T) { logger := &MockLogger{} db := NewMockDB() - deleteCallCount := 0 - db.ExecFunc = func(query string, args ...interface{}) (sql.Result, error) { - deleteCallCount++ - assert.Contains(t, query, "DELETE FROM transactions WHERE delete_at_height") - return &MockResult{rowsAffected: 10}, nil - } - tSettings := createTestSettings() tSettings.UtxoStore.DefensiveCleanupEnabled = false @@ -704,19 +701,15 @@ func TestDefensiveCleanup(t *testing.T) { }) require.NoError(t, err) - service.Start(context.Background()) + // Verify setting is stored correctly + assert.False(t, service.settings.UtxoStore.DefensiveCleanupEnabled) - doneCh := make(chan string, 1) - err = service.UpdateBlockHeight(100, doneCh) - require.NoError(t, err) + // Run a cleanup job and verify it completes + job := cleanup.NewJob(100, context.Background()) + service.processCleanupJob(job, 1) - select { - case <-doneCh: - // Success - should have deleted directly - assert.Equal(t, 1, deleteCallCount, "Should have called DELETE once") - case <-time.After(2 * time.Second): - t.Fatal("Cleanup should complete") - } + assert.Equal(t, cleanup.JobStatusCompleted, job.GetStatus()) + assert.Nil(t, job.Error) }) t.Run("DefensiveCleanupEnabled_SettingRespected", func(t *testing.T) { From 9321678d75bcdc155048a331de35faef5606251e Mon Sep 17 00:00:00 2001 From: freemans13 Date: Thu, 6 Nov 2025 15:08:08 +0000 Subject: [PATCH 03/12] fixes --- .../utxo/aerospike/cleanup/cleanup_service.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index 51b3e099c7..db27597275 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -3,6 +3,7 @@ package cleanup import ( "bytes" "context" + "fmt" "strings" "sync" "sync/atomic" @@ -695,11 +696,14 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch return false, "cannot parse outputs data" } + hasUnspentOutputs := false problematicChildren := []string{} - for _, spendingDataInterface := range outputsMap { + for outputIdx, spendingDataInterface := range outputsMap { if spendingDataInterface == nil { - // Output not spent, continue + // UNSPENT OUTPUT - Transaction should NOT be deleted (has UTXOs still in use) + hasUnspentOutputs = true + problematicChildren = append(problematicChildren, fmt.Sprintf("output_%v_unspent", outputIdx)) continue } @@ -784,6 +788,16 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch } } + // Check if transaction has unspent outputs + if hasUnspentOutputs { + reason := "has unspent outputs (UTXOs still in use)" + if len(problematicChildren) > 1 { + // Also report which outputs are unspent + reason = fmt.Sprintf("%s: %s", reason, strings.Join(problematicChildren, ", ")) + } + return false, reason + } + if len(problematicChildren) > 0 { reason := "has children that are not safe to delete: " + strings.Join(problematicChildren, ", ") return false, reason From be318ba50a5bd1ed7e3f63e8f93e06129c1cfa18 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Thu, 6 Nov 2025 15:25:59 +0000 Subject: [PATCH 04/12] fixes --- stores/utxo/aerospike/cleanup/cleanup_service.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index db27597275..a60d7fa2c9 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -711,13 +711,18 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch // spending_data format: [32 bytes tx hash][4 bytes output index] spendingData, ok := spendingDataInterface.([]byte) if !ok || len(spendingData) < 32 { + // Malformed spending data - be conservative and skip deletion + s.logger.Warnf("Worker %d: malformed spending data for output %v of tx %s", workerID, outputIdx, txHash.String()[:16]) + problematicChildren = append(problematicChildren, fmt.Sprintf("output_%v... (malformed spending data)", outputIdx)) continue } // Get the spending transaction hash (child) childTxHash, err := chainhash.NewHash(spendingData[:32]) if err != nil { - s.logger.Warnf("Worker %d: failed to parse child tx hash for tx %s: %v", workerID, txHash.String()[:16], err) + // Corrupt child transaction hash - be conservative and skip deletion + s.logger.Warnf("Worker %d: failed to parse child tx hash for output %v of tx %s: %v", workerID, outputIdx, txHash.String()[:16], err) + problematicChildren = append(problematicChildren, fmt.Sprintf("output_%v... (corrupt child hash)", outputIdx)) continue } @@ -791,7 +796,7 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch // Check if transaction has unspent outputs if hasUnspentOutputs { reason := "has unspent outputs (UTXOs still in use)" - if len(problematicChildren) > 1 { + if len(problematicChildren) > 0 { // Also report which outputs are unspent reason = fmt.Sprintf("%s: %s", reason, strings.Join(problematicChildren, ", ")) } From 33bb2d938148f4e13afbdb489ec57d199efcc120 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Thu, 6 Nov 2025 17:42:05 +0000 Subject: [PATCH 05/12] tweak --- stores/utxo/aerospike/cleanup/cleanup_service.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index a60d7fa2c9..8a31bf6b02 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -355,11 +355,10 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { stmt := aerospike.NewStatement(s.namespace, s.set) // Add Outputs field if defensive cleanup is enabled - binNames := []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()} + stmt.BinNames = []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String()} if s.settings.UtxoStore.DefensiveCleanupEnabled { - binNames = append(binNames, fields.Outputs.String()) + stmt.BinNames = append(stmt.BinNames, fields.Outputs.String()) } - stmt.BinNames = binNames // Set the filter to find records with a delete_at_height less than or equal to the safe cleanup height // This will automatically use the index since the filter is on the indexed bin From a5eadf09ea5da74f554ba1b4efeb7c5ee8b4bce0 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 09:35:34 +0000 Subject: [PATCH 06/12] fix SQL column names --- stores/utxo/sql/cleanup/cleanup_service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index d3b2f9e356..8a33f8d95f 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -247,8 +247,8 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent FROM transactions_to_check ttc INNER JOIN outputs o ON ttc.id = o.transaction_id LEFT JOIN inputs i ON o.spending_data IS NOT NULL - AND i.input_hash = ttc.hash - AND i.input_index = o.idx + AND i.previous_transaction_hash = ttc.hash + AND i.previous_tx_idx = o.idx LEFT JOIN transactions t_child ON i.transaction_id = t_child.id ORDER BY ttc.hash, o.idx ` From b05f45693a67f695f389e77c201e6ba945f5686c Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 10:01:29 +0000 Subject: [PATCH 07/12] use aerospike batches to fetch child txs --- .../utxo/aerospike/cleanup/cleanup_service.go | 198 ++++++++++++++---- 1 file changed, 158 insertions(+), 40 deletions(-) diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index 8a31bf6b02..c4e13fd62d 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -675,6 +675,99 @@ func (s *Service) ProcessSingleRecord(txid *chainhash.Hash, inputs []*bt.Input) return <-errCh } +// childInfo holds information about a child transaction needed for verification +type childInfo struct { + outputIdx uint32 + childTxHash *chainhash.Hash + blockHeights []interface{} // Block heights from Aerospike +} + +// batchGetChildrenBlockHeights performs batch reads for all child transactions +// Returns a map of child tx hash string -> block heights +func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (map[string][]interface{}, error) { + if len(childHashes) == 0 { + return make(map[string][]interface{}), nil + } + + result := make(map[string][]interface{}) + batchSize := 1024 // Process in chunks to avoid memory issues + + // Get batch policy from settings + batchPolicy := util.GetAerospikeBatchPolicy(s.settings) + batchPolicy.ReplicaPolicy = aerospike.SEQUENCE + + readPolicy := util.GetAerospikeBatchReadPolicy(s.settings) + + for i := 0; i < len(childHashes); i += batchSize { + end := i + batchSize + if end > len(childHashes) { + end = len(childHashes) + } + + chunk := childHashes[i:end] + batchRecords := make([]aerospike.BatchRecordIfc, len(chunk)) + + // Create batch read records + for j, hash := range chunk { + key, err := aerospike.NewKey(s.namespace, s.set, hash.CloneBytes()) + if err != nil { + // Log error but continue with other children + s.logger.Warnf("Failed to create key for child tx %s: %v", hash.String()[:16], err) + continue + } + batchRecords[j] = aerospike.NewBatchRead(readPolicy, key, []string{fields.BlockHeights.String()}) + } + + // Execute batch read + err := s.client.BatchOperate(batchPolicy, batchRecords) + if err != nil { + // Log error but continue - we'll handle missing children conservatively + s.logger.Warnf("Batch read error for children: %v", err) + } + + // Process results + for j, batchRecord := range batchRecords { + if chunk[j] == nil { + continue + } + + hashKey := chunk[j].String() + + if batchRecord.BatchRec().Err != nil { + // Child not found or error - will be handled conservatively + if !errors.Is(batchRecord.BatchRec().Err, aerospike.ErrKeyNotFound) { + s.logger.Warnf("Error reading child tx %s: %v", chunk[j].String()[:16], batchRecord.BatchRec().Err) + } + result[hashKey] = nil + continue + } + + if batchRecord.BatchRec().Record == nil || batchRecord.BatchRec().Record.Bins == nil { + // No record found + result[hashKey] = nil + continue + } + + // Extract block heights + blockHeightsData, ok := batchRecord.BatchRec().Record.Bins[fields.BlockHeights.String()] + if !ok || blockHeightsData == nil { + // Child is unmined + result[hashKey] = nil + continue + } + + // Store block heights as interface slice + if blockHeights, ok := blockHeightsData.([]interface{}); ok { + result[hashKey] = blockHeights + } else { + result[hashKey] = nil + } + } + } + + return result, nil +} + // verifyChildrenBeforeDeletion checks if all children of a transaction are mined deep enough // Returns (canDelete bool, reason string) func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *chainhash.Hash, currentHeight uint32, workerID int) (bool, string) { @@ -695,8 +788,11 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch return false, "cannot parse outputs data" } + // Phase 1: Collect all child transaction hashes and metadata hasUnspentOutputs := false problematicChildren := []string{} + childrenToVerify := make([]childInfo, 0, len(outputsMap)) + uniqueChildHashes := make(map[string]*chainhash.Hash) for outputIdx, spendingDataInterface := range outputsMap { if spendingDataInterface == nil { @@ -725,41 +821,73 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch continue } - // Query the child transaction to check its block height - childKey, err := aerospike.NewKey(s.namespace, s.set, childTxHash.CloneBytes()) - if err != nil { - s.logger.Warnf("Worker %d: failed to create key for child tx %s: %v", workerID, childTxHash.String()[:16], err) - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (key error)") - continue + // Convert outputIdx to uint32 + var outputIdxUint32 uint32 + switch v := outputIdx.(type) { + case uint32: + outputIdxUint32 = v + case int: + outputIdxUint32 = uint32(v) + case int64: + outputIdxUint32 = uint32(v) + default: + outputIdxUint32 = 0 } - childRec, err := s.client.Get(nil, childKey, fields.BlockHeights.String()) - if err != nil { - // Child not found or error reading - be conservative and skip deletion - s.logger.Warnf("Worker %d: failed to read child tx %s: %v", workerID, childTxHash.String()[:16], err) - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (read error)") - continue + childrenToVerify = append(childrenToVerify, childInfo{ + outputIdx: outputIdxUint32, + childTxHash: childTxHash, + }) + + // Collect unique child hashes for batch read + uniqueChildHashes[childTxHash.String()] = childTxHash + } + + // Check if transaction has unspent outputs + if hasUnspentOutputs { + reason := "has unspent outputs (UTXOs still in use)" + if len(problematicChildren) > 0 { + // Also report which outputs are unspent + reason = fmt.Sprintf("%s: %s", reason, strings.Join(problematicChildren, ", ")) } + return false, reason + } - if childRec == nil || childRec.Bins == nil { - // Child transaction not found, skip deletion to be safe - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not found)") - continue + // If no children to verify, we can safely delete + if len(childrenToVerify) == 0 { + if len(problematicChildren) > 0 { + reason := "has children that are not safe to delete: " + strings.Join(problematicChildren, ", ") + return false, reason } + return true, "" + } - // Check block heights - blockHeightsData, ok := childRec.Bins[fields.BlockHeights.String()] - if !ok || blockHeightsData == nil { - // Child is unmined - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)") + // Phase 2: Batch read all child transactions + childHashSlice := make([]*chainhash.Hash, 0, len(uniqueChildHashes)) + for _, hash := range uniqueChildHashes { + childHashSlice = append(childHashSlice, hash) + } + + childBlockHeightsMap, err := s.batchGetChildrenBlockHeights(childHashSlice) + if err != nil { + // Log error but continue with conservative approach + s.logger.Warnf("Worker %d: error during batch read of children for tx %s: %v", workerID, txHash.String()[:16], err) + } + + // Phase 3: Verify each child using the pre-fetched data + for _, child := range childrenToVerify { + childHashStr := child.childTxHash.String() + blockHeights, exists := childBlockHeightsMap[childHashStr] + + if !exists || blockHeights == nil { + // Child not found or error reading - be conservative and skip deletion + problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (not found)") continue } - // BlockHeights is stored as []uint32 - blockHeights, ok := blockHeightsData.([]interface{}) - if !ok || len(blockHeights) == 0 { - // No block heights, child is unmined - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (unmined)") + if len(blockHeights) == 0 { + // Child is unmined + problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (unmined)") continue } @@ -773,35 +901,25 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch case int64: childBlockHeight = uint32(v) default: - s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, childTxHash.String()[:16], v) - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (invalid height type)") + s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, child.childTxHash.String()[:16], v) + problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (invalid height type)") continue } // Check if child is confirmed deep enough if currentHeight <= childBlockHeight { // Current height is less than or equal to child height - this shouldn't happen - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (height inconsistency)") + problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (height inconsistency)") continue } confirmationDepth := currentHeight - childBlockHeight if confirmationDepth <= retention { - problematicChildren = append(problematicChildren, childTxHash.String()[:8]+"... (not confirmed deep enough)") + problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (not confirmed deep enough)") continue } } - // Check if transaction has unspent outputs - if hasUnspentOutputs { - reason := "has unspent outputs (UTXOs still in use)" - if len(problematicChildren) > 0 { - // Also report which outputs are unspent - reason = fmt.Sprintf("%s: %s", reason, strings.Join(problematicChildren, ", ")) - } - return false, reason - } - if len(problematicChildren) > 0 { reason := "has children that are not safe to delete: " + strings.Join(problematicChildren, ", ") return false, reason From ac16c5e31efdbef1837eb4b42e810bd756e4d3ed Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 10:21:59 +0000 Subject: [PATCH 08/12] settings for default, docker and teratestnet --- settings.conf | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/settings.conf b/settings.conf index 3bb3004f97..8e99b448e4 100644 --- a/settings.conf +++ b/settings.conf @@ -1456,6 +1456,10 @@ utxostore_utxoBatchSize = 128 utxostore_utxoBatchSize.docker = 50 utxostore_utxoBatchSize.docker.m = 512 +utxostore_defensiveCleanupEnabled = false +utxostore_defensiveCleanupEnabled.docker = true +utxostore_defensiveCleanupEnabled.teratestnet = true + # delay before sending batch to blockvalidation, use for testing subtree processing retries validator_blockvalidation_delay = 0 @@ -1509,4 +1513,4 @@ COINBASE_GRPC_ADDRESS = COINBASE_HTTP_PORT = COINBASE_WALLET_PRIVATE_KEY.docker.teranode1 = COINBASE_WALLET_PRIVATE_KEY.docker.teranode2 = -COINBASE_WALLET_PRIVATE_KEY.docker.teranode3 = \ No newline at end of file +COINBASE_WALLET_PRIVATE_KEY.docker.teranode3 = From de9afd99bd02e66ffb17dfd87a36f6ec2a6a7a3c Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 10:42:44 +0000 Subject: [PATCH 09/12] fixes --- stores/utxo/aerospike/cleanup/cleanup_service.go | 6 +++++- stores/utxo/sql/cleanup/cleanup_service.go | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index c4e13fd62d..08f804ffce 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -713,6 +713,10 @@ func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (m if err != nil { // Log error but continue with other children s.logger.Warnf("Failed to create key for child tx %s: %v", hash.String()[:16], err) + // Set nil in batchRecords to maintain index alignment + batchRecords[j] = nil + // Mark this child as failed in the result map + result[hash.String()] = nil continue } batchRecords[j] = aerospike.NewBatchRead(readPolicy, key, []string{fields.BlockHeights.String()}) @@ -727,7 +731,7 @@ func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (m // Process results for j, batchRecord := range batchRecords { - if chunk[j] == nil { + if chunk[j] == nil || batchRecord == nil { continue } diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index 8a33f8d95f..c231878dde 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -246,8 +246,7 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent t_child.block_height as child_block_height FROM transactions_to_check ttc INNER JOIN outputs o ON ttc.id = o.transaction_id - LEFT JOIN inputs i ON o.spending_data IS NOT NULL - AND i.previous_transaction_hash = ttc.hash + LEFT JOIN inputs i ON i.previous_transaction_hash = ttc.hash AND i.previous_tx_idx = o.idx LEFT JOIN transactions t_child ON i.transaction_id = t_child.id ORDER BY ttc.hash, o.idx From 34ee7ea345930552b47810dc6dac66d8582fa95e Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 11:11:29 +0000 Subject: [PATCH 10/12] fixes --- settings/interface.go | 1 + settings/settings.go | 1 + .../utxo/aerospike/cleanup/cleanup_service.go | 91 ++++++++++++++----- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/settings/interface.go b/settings/interface.go index 14a696679a..7977d8a34a 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -371,6 +371,7 @@ type UtxoStoreSettings struct { CleanupDeleteBatcherSize int // Batch size for record deletions during cleanup CleanupDeleteBatcherDurationMillis int // Batch duration for record deletions during cleanup (ms) CleanupMaxConcurrentOperations int // Maximum concurrent operations during cleanup (0 = use connection queue size) + CleanupDefensiveBatchReadSize int // Batch size for reading child transactions during defensive cleanup (default: 1024) } type P2PSettings struct { diff --git a/settings/settings.go b/settings/settings.go index 4fb09ff895..42bd4a4d1a 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -356,6 +356,7 @@ func NewSettings(alternativeContext ...string) *Settings { CleanupDeleteBatcherSize: getInt("utxostore_cleanupDeleteBatcherSize", 256, alternativeContext...), CleanupDeleteBatcherDurationMillis: getInt("utxostore_cleanupDeleteBatcherDurationMillis", 10, alternativeContext...), CleanupMaxConcurrentOperations: getInt("utxostore_cleanupMaxConcurrentOperations", 0, alternativeContext...), + CleanupDefensiveBatchReadSize: getInt("utxostore_cleanupDefensiveBatchReadSize", 1024, alternativeContext...), }, P2P: P2PSettings{ BlockTopic: getString("p2p_block_topic", "", alternativeContext...), diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index 08f804ffce..5242b34ac1 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -690,7 +690,10 @@ func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (m } result := make(map[string][]interface{}) - batchSize := 1024 // Process in chunks to avoid memory issues + batchSize := s.settings.UtxoStore.CleanupDefensiveBatchReadSize + if batchSize <= 0 { + batchSize = 1024 // Default fallback + } // Get batch policy from settings batchPolicy := util.GetAerospikeBatchPolicy(s.settings) @@ -712,7 +715,7 @@ func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (m key, err := aerospike.NewKey(s.namespace, s.set, hash.CloneBytes()) if err != nil { // Log error but continue with other children - s.logger.Warnf("Failed to create key for child tx %s: %v", hash.String()[:16], err) + s.logger.Warnf("Failed to create key for child tx %s: %v", hash.String(), err) // Set nil in batchRecords to maintain index alignment batchRecords[j] = nil // Mark this child as failed in the result map @@ -722,25 +725,40 @@ func (s *Service) batchGetChildrenBlockHeights(childHashes []*chainhash.Hash) (m batchRecords[j] = aerospike.NewBatchRead(readPolicy, key, []string{fields.BlockHeights.String()}) } - // Execute batch read - err := s.client.BatchOperate(batchPolicy, batchRecords) - if err != nil { - // Log error but continue - we'll handle missing children conservatively - s.logger.Warnf("Batch read error for children: %v", err) + // Filter out nil entries before batch operation + validBatchRecords := make([]aerospike.BatchRecordIfc, 0, len(batchRecords)) + validIndices := make([]int, 0, len(batchRecords)) + for idx, record := range batchRecords { + if record != nil { + validBatchRecords = append(validBatchRecords, record) + validIndices = append(validIndices, idx) + } + } + + // Execute batch read only if we have valid records + if len(validBatchRecords) > 0 { + err := s.client.BatchOperate(batchPolicy, validBatchRecords) + if err != nil { + // Log error but continue - we'll handle missing children conservatively + s.logger.Warnf("Batch read error for children: %v", err) + } } - // Process results - for j, batchRecord := range batchRecords { - if chunk[j] == nil || batchRecord == nil { + // Process results - map back to original chunk indices + for idx, validIdx := range validIndices { + batchRecord := validBatchRecords[idx] + hash := chunk[validIdx] + + if hash == nil { continue } - hashKey := chunk[j].String() + hashKey := hash.String() if batchRecord.BatchRec().Err != nil { // Child not found or error - will be handled conservatively if !errors.Is(batchRecord.BatchRec().Err, aerospike.ErrKeyNotFound) { - s.logger.Warnf("Error reading child tx %s: %v", chunk[j].String()[:16], batchRecord.BatchRec().Err) + s.logger.Warnf("Error reading child tx %s: %v", hash.String(), batchRecord.BatchRec().Err) } result[hashKey] = nil continue @@ -811,7 +829,7 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch spendingData, ok := spendingDataInterface.([]byte) if !ok || len(spendingData) < 32 { // Malformed spending data - be conservative and skip deletion - s.logger.Warnf("Worker %d: malformed spending data for output %v of tx %s", workerID, outputIdx, txHash.String()[:16]) + s.logger.Warnf("Worker %d: malformed spending data for output %v of tx %s", workerID, outputIdx, txHash.String()) problematicChildren = append(problematicChildren, fmt.Sprintf("output_%v... (malformed spending data)", outputIdx)) continue } @@ -820,7 +838,7 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch childTxHash, err := chainhash.NewHash(spendingData[:32]) if err != nil { // Corrupt child transaction hash - be conservative and skip deletion - s.logger.Warnf("Worker %d: failed to parse child tx hash for output %v of tx %s: %v", workerID, outputIdx, txHash.String()[:16], err) + s.logger.Warnf("Worker %d: failed to parse child tx hash for output %v of tx %s: %v", workerID, outputIdx, txHash.String(), err) problematicChildren = append(problematicChildren, fmt.Sprintf("output_%v... (corrupt child hash)", outputIdx)) continue } @@ -875,7 +893,7 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch childBlockHeightsMap, err := s.batchGetChildrenBlockHeights(childHashSlice) if err != nil { // Log error but continue with conservative approach - s.logger.Warnf("Worker %d: error during batch read of children for tx %s: %v", workerID, txHash.String()[:16], err) + s.logger.Warnf("Worker %d: error during batch read of children for tx %s: %v", workerID, txHash.String(), err) } // Phase 3: Verify each child using the pre-fetched data @@ -885,13 +903,13 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch if !exists || blockHeights == nil { // Child not found or error reading - be conservative and skip deletion - problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (not found)") + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(not found)") continue } if len(blockHeights) == 0 { // Child is unmined - problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (unmined)") + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(unmined)") continue } @@ -900,26 +918,55 @@ func (s *Service) verifyChildrenBeforeDeletion(bins aerospike.BinMap, txHash *ch switch v := blockHeights[0].(type) { case uint32: childBlockHeight = v + case uint: + childBlockHeight = uint32(v) + case uint64: + childBlockHeight = uint32(v) case int: + if v < 0 { + s.logger.Warnf("Worker %d: negative block height for child tx %s: %d", workerID, child.childTxHash.String(), v) + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(negative height)") + continue + } + childBlockHeight = uint32(v) + case int32: + if v < 0 { + s.logger.Warnf("Worker %d: negative block height for child tx %s: %d", workerID, child.childTxHash.String(), v) + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(negative height)") + continue + } childBlockHeight = uint32(v) case int64: + if v < 0 { + s.logger.Warnf("Worker %d: negative block height for child tx %s: %d", workerID, child.childTxHash.String(), v) + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(negative height)") + continue + } + childBlockHeight = uint32(v) + case float64: + // Aerospike sometimes returns numbers as floats + if v < 0 || v != float64(uint32(v)) { + s.logger.Warnf("Worker %d: invalid float block height for child tx %s: %f", workerID, child.childTxHash.String(), v) + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(invalid float height)") + continue + } childBlockHeight = uint32(v) default: - s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, child.childTxHash.String()[:16], v) - problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (invalid height type)") + s.logger.Warnf("Worker %d: unexpected block height type for child tx %s: %T", workerID, child.childTxHash.String(), v) + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(invalid height type)") continue } // Check if child is confirmed deep enough if currentHeight <= childBlockHeight { // Current height is less than or equal to child height - this shouldn't happen - problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (height inconsistency)") + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(height inconsistency)") continue } confirmationDepth := currentHeight - childBlockHeight if confirmationDepth <= retention { - problematicChildren = append(problematicChildren, child.childTxHash.String()[:8]+"... (not confirmed deep enough)") + problematicChildren = append(problematicChildren, child.childTxHash.String()+"...(not confirmed deep enough)") continue } } @@ -958,7 +1005,7 @@ func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aero if s.settings.UtxoStore.DefensiveCleanupEnabled { canDelete, reason := s.verifyChildrenBeforeDeletion(bins, txHash, job.BlockHeight, workerID) if !canDelete { - s.logger.Warnf("Worker %d: skipping deletion of tx %s: %s", workerID, txHash.String()[:16], reason) + s.logger.Warnf("Worker %d: skipping deletion of tx %s: %s", workerID, txHash.String(), reason) return nil // Skip this transaction, don't return error } } From ececb41f3208cabafd2cf7668f50251e554472be Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 11:47:24 +0000 Subject: [PATCH 11/12] fixes --- stores/utxo/sql/cleanup/cleanup_service.go | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index c231878dde..105264a1f0 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -229,9 +229,10 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent // Single query to get ALL outputs (spent and unspent) for transactions to be deleted // This replaces N queries with 1 query, massively improving performance // - // IMPORTANT: We return ALL transactions that have outputs. For defensive verification: - // 1. Check that ALL outputs are spent (no unspent UTXOs remaining) - // 2. Check that all children of spent outputs are mined and confirmed deep enough + // IMPORTANT: We distinguish between: + // 1. Unspent outputs (spending_data = NULL) → Block deletion (has UTXOs still in use) + // 2. Spent outputs with no child in DB → Allow deletion (external/mempool/already cleaned) + // 3. Spent outputs with child in DB → Check if child is mined deep enough query := ` WITH transactions_to_check AS ( SELECT id, hash @@ -296,17 +297,20 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent } // Output is spent - analyze the child transaction - // Child transaction should exist if output is spent if childHash == nil { - // Spent output but no child found - data inconsistency - result.CanDelete = false - result.Reason = "has spent outputs with missing child transactions" + // Spent output but no child found in our database + // This can happen for: + // 1. External transactions (spending tx not in our database) + // 2. Mempool transactions (not yet stored) + // 3. Already cleaned up transactions + // Since the output is marked as spent, we can allow deletion + // but note it for monitoring result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ - Hash: fmt.Sprintf("output_%d", outputIndex), + Hash: fmt.Sprintf("output_%d_external_or_missing", outputIndex), IsMined: false, BlockHeight: nil, }) - continue + continue // Allow deletion to proceed } child := ChildInfo{ @@ -409,10 +413,13 @@ func deleteTombstoned(db *usql.DB, tSettings *settings.Settings, logger ulogger. if i > 0 { problematicChildrenInfo += ", " } - if child.IsMined { - problematicChildrenInfo += fmt.Sprintf("%s... (mined at height %d)", child.Hash[:8], *child.BlockHeight) + // Check if this is a placeholder for external/missing child + if strings.HasPrefix(child.Hash, "output_") { + problematicChildrenInfo += child.Hash + } else if child.IsMined { + problematicChildrenInfo += fmt.Sprintf("%s (mined at height %d)", child.Hash, *child.BlockHeight) } else { - problematicChildrenInfo += child.Hash[:8] + "... (unmined)" + problematicChildrenInfo += child.Hash + " (unmined)" } } From d8f9b299cd53f106f8b5bb8c64cc0dd72dc67985 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 7 Nov 2025 12:38:31 +0000 Subject: [PATCH 12/12] data integrity fix --- stores/utxo/sql/cleanup/cleanup_service.go | 25 ++++++++++++---------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index 105264a1f0..8a7455759d 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -229,9 +229,9 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent // Single query to get ALL outputs (spent and unspent) for transactions to be deleted // This replaces N queries with 1 query, massively improving performance // - // IMPORTANT: We distinguish between: + // IMPORTANT: We enforce data integrity: // 1. Unspent outputs (spending_data = NULL) → Block deletion (has UTXOs still in use) - // 2. Spent outputs with no child in DB → Allow deletion (external/mempool/already cleaned) + // 2. Spent outputs with no child in DB → Block deletion (data integrity issue - child MUST exist) // 3. Spent outputs with child in DB → Check if child is mined deep enough query := ` WITH transactions_to_check AS ( @@ -297,20 +297,23 @@ func batchVerifyChildrenBeforeDeletion(db *usql.DB, currentHeight uint32, retent } // Output is spent - analyze the child transaction + // IMPORTANT: If spending_data is set, the child transaction MUST exist in our database + // If it doesn't, this indicates a data integrity issue that needs investigation if childHash == nil { - // Spent output but no child found in our database - // This can happen for: - // 1. External transactions (spending tx not in our database) - // 2. Mempool transactions (not yet stored) - // 3. Already cleaned up transactions - // Since the output is marked as spent, we can allow deletion - // but note it for monitoring + // DATA INTEGRITY ISSUE: Output marked as spent but spending transaction not found + // This should never happen in normal operation and indicates: + // 1. The spending transaction was incorrectly deleted + // 2. Data corruption or incomplete transaction storage + // 3. A bug in the system + // We must NOT delete the parent transaction until this is resolved + result.CanDelete = false + result.Reason = "data integrity issue: output marked as spent but spending transaction not found" result.ProblematicChildren = append(result.ProblematicChildren, ChildInfo{ - Hash: fmt.Sprintf("output_%d_external_or_missing", outputIndex), + Hash: fmt.Sprintf("output_%d_missing_child", outputIndex), IsMined: false, BlockHeight: nil, }) - continue // Allow deletion to proceed + continue } child := ChildInfo{