diff --git a/docs/utxo-safe-deletion.md b/docs/utxo-safe-deletion.md new file mode 100644 index 0000000000..04acfd2409 --- /dev/null +++ b/docs/utxo-safe-deletion.md @@ -0,0 +1,635 @@ +# UTXO Safe Deletion: Delete-At-Height (DAH) Evolution + +## Overview + +This document describes the evolution of the "delete at height" mechanism for parent transaction cleanup in the UTXO store. Through production experience and analysis, we've refined the approach to provide comprehensive verification of all spending children before parent deletion, ensuring no child transaction can be orphaned. + +## The Problem + +When a parent transaction becomes fully spent (all its outputs have been consumed by child transactions), we want to delete it to free up storage. However, we must ensure we never delete a parent transaction while its spending children are still unstable - either unmined or subject to reorganization. + +### Risk scenario 1: Unmined child + +1. Parent TX is fully spent by Child TX (in block assembly) +2. Parent reaches DAH and is deleted from UTXO store +3. Child TX tries to validate, but Parent TX is missing → **validation fails** + +### Risk scenario 2: Recently mined child (reorganization) + +1. Parent TX is fully spent by Child TX at block height 1000 +2. Parent reaches DAH at height 1288 and is deleted +3. Chain reorganization at height 1300 orphans Child TX +4. Child TX gets re-validated, but Parent TX is missing → **validation fails** + +All scenarios require: **atomic verification at deletion time, not separate coordination**. + +## Design Philosophy: Fail-Safe Deletion + +**Core principle:** When designing deletion logic, safety takes priority. Temporary storage bloat is recoverable; premature deletion of data that child transactions depend on is not. + +### Learnings from Production + +Production deployment revealed edge cases where parent transactions were being deleted while their spending children were not yet stable: + +- Some children remained in block assembly longer than expected +- Chain reorganizations affecting recently-mined children +- These cases led to validation failures when children attempted to re-validate + +### Current Approach: Comprehensive Verification + +Based on these learnings, the refined approach implements: + +1. **Positive verification required**: Deletion proceeds only after confirming ALL children are stable +2. **Conservative on uncertainty**: Missing or ambiguous data → skip deletion, retry later +3. **Independent checks**: Multiple verification layers; any failure safely aborts deletion +4. **Predictable failure mode**: Issues manifest as observable storage retention, not data loss + +This makes the system self-correcting - parents that should be deleted will be picked up in future cleanup passes once conditions are met. + +## Initial Approach + +### Mechanism + +The deletion mechanism used time-based retention: + +```lua +local newDeleteHeight = currentBlockHeight + blockHeightRetention +-- With blockHeightRetention=288, DAH = current + 288 blocks (~2 days) +``` + +The cleanup service would delete records when `current_height >= deleteAtHeight`. + +### Observations from Production + +Through production deployment, we discovered edge cases: + +1. **Variable mining times**: Some transactions remained in block assembly longer than the 288-block window +2. **Chain reorganizations**: Recently-mined children could be reorganized out +3. **No child verification**: The initial implementation didn't verify child transaction stability before parent deletion + +**Example scenario:** + +```text +Block 1000: Parent fully spent by Child (in block assembly) +Block 1288: DAH reached → Parent deleted +Block 1300: Child attempts validation → validation failure (parent missing) +``` + +### Evolution: Adding Parent Preservation + +To address these production issues, a preservation mechanism was added: + +**Function:** `PreserveParentsOfOldUnminedTransactions()` (in `stores/utxo/cleanup_unmined.go`) + +**Approach:** + +- Ran periodically via ticker in Block Assembly service +- Identified old unmined transactions +- Set `preserve_until` on their parent transactions to prevent premature deletion +- Provided safety net for long-running unmined transactions + +**Characteristics:** + +1. **Reactive solution**: Addressed issue after transactions became old +2. **Additional complexity**: Required periodic scanning and parent identification +3. **Focused scope**: Primarily protected parents of very old unmined transactions +4. **Production stabilization**: Successfully prevented validation failures in deployment + +This intermediate solution provided production stability while we analyzed the comprehensive fix. + +## Current Approach: Comprehensive Child Verification + +### Two-Layer Safety Mechanism + +The refined implementation uses **two independent safety checks** working together: + +#### Layer 1: DAH Eligibility (Lua Script) + +```lua +-- Set DAH at normal retention period +local deleteHeight = currentBlockHeight + blockHeightRetention +-- With blockHeightRetention=288, DAH = current + 288 blocks (~2 days) +``` + +**No additional tracking needed** - spending children are already stored: + +```lua +-- When an output is spent, spendMulti() stores the child TX hash in spending_data: +-- UTXO format: [32 bytes utxo_hash][36 bytes spending_data] +-- spending_data = [32 bytes child_tx_hash][4 bytes vin] +-- +-- All spending children are implicitly tracked in the UTXOs bin +-- Cleanup extracts all unique children from spent UTXOs +``` + +**Why verify ALL children?** + +- A parent with 100 outputs might be spent by 100 different child transactions +- We must verify **every single child** is stable before deleting the parent +- If even one child is unmined or recently mined, the parent must be kept +- The spending_data already contains all this information - we just extract and verify it + +#### Layer 2: Child Stability Verification (Cleanup Service) + +When cleanup runs and finds a parent eligible for deletion (DAH reached), it performs an **additional safety check**: + +**Aerospike cleanup:** + +```go +const safetyWindow = 288 + +// Batch verify ALL spending children for this parent +allSpendingChildren := getSpendingChildrenSet(parent) +safetyMap := batchVerifyChildrenSafety(allSpendingChildren, currentBlockHeight) + +// For each parent: +for _, childHash := range allSpendingChildren { + if !safetyMap[childHash] { + // At least one child not stable - SKIP DELETION (fail-safe) + // ALL children must be stable before parent can be deleted + return nil + } +} + +// ALL children are stable (mined >= 288 blocks ago) - safe to delete parent +``` + +**Fail-safe verification logic:** + +```go +// Conservative edge case handling in batchVerifyChildrenSafety: + +if record.Err != nil { + // ANY error (including child not found) → BE CONSERVATIVE + // We require POSITIVE verification, not assumptions + safetyMap[hexHash] = false + continue +} + +if unminedSince != nil { + // Child is unmined → NOT SAFE + safetyMap[hexHash] = false + continue +} + +if !hasBlockHeights || len(blockHeightsList) == 0 { + // Missing block height data → BE CONSERVATIVE + safetyMap[hexHash] = false + continue +} + +// Only mark safe after EXPLICIT confirmation of stability +if currentBlockHeight >= maxChildBlockHeight + safetyWindow { + safetyMap[hexHash] = true // Positive verification +} else { + safetyMap[hexHash] = false // Not yet stable +} +``` + +**SQL cleanup:** + +```sql +DELETE FROM transactions +WHERE id IN ( + SELECT t.id + FROM transactions t + WHERE t.delete_at_height IS NOT NULL + AND t.delete_at_height <= current_height + AND NOT EXISTS ( + -- Find ANY unstable child - if found, parent cannot be deleted + -- This ensures ALL children must be stable before parent deletion + SELECT 1 + FROM outputs o + WHERE o.transaction_id = t.id + AND o.spending_data IS NOT NULL + AND ( + -- Extract child TX hash from spending_data (first 32 bytes) + -- Check if this child is NOT stable + NOT EXISTS ( + SELECT 1 + FROM transactions child + INNER JOIN block_ids child_blocks ON child.id = child_blocks.transaction_id + WHERE child.hash = substr(o.spending_data, 1, 32) + AND child.unmined_since IS NULL -- Child must be mined + AND child_blocks.block_height <= (current_height - 288) -- Child must be stable + ) + ) + ) +) +``` + +**Logic:** Parent can only be deleted if there are NO outputs with unstable children. Even one unstable child blocks deletion. + +### How It Works: Timeline Example + +**Scenario:** Parent fully spent at block 1000, child mined at block 1001 + +| Block Height | Event | Previous Approach | Current Approach | +|--------------|-------|-------------------|------------------| +| 1000 | Parent fully spent | DAH = 1288 | DAH = 1288, children in spending_data | +| 1001 | Child mined | - | Child @ height 1001 | +| 1288 | Cleanup runs | Delete (time-based) | Extract and check all children | +| 1288 | Child stability check | - | 1288 - 1001 = 287 < 288 → Skip (wait) | +| 1289 | Cleanup runs | Already deleted | Extract and check all children | +| 1289 | Child stability check | - | 1289 - 1001 = 288 ≥ 288 → Delete safely! | + +**Key Improvement:** + +- **Previous**: Deletion based solely on time elapsed +- **Current**: Deletion based on verified child stability +- **Benefit**: Handles variable mining times and reorganizations gracefully + +**Edge Cases Discovered in Production:** + +### Scenario A: Long-running block assembly + +| Block Height | Event | Observed Behavior | Impact | +|--------------|-------|-------------------|--------| +| 1000 | Parent fully spent by Child (in block assembly) | DAH = 1288 | - | +| 1200 | Cleanup runs (child still unmined) | ⏳ Not yet time | - | +| 1288 | Cleanup runs | Parent deleted | Child validation fails | +| 1300 | Child attempts validation | Parent missing | Service degradation | + +### Scenario B: Chain reorganization + +| Block Height | Event | Observed Behavior | Impact | +|--------------|-------|-------------------|--------| +| 1000 | Parent fully spent by Child | DAH = 1288 | - | +| 1100 | Child mined | Child @ height 1100 | - | +| 1288 | Cleanup runs | Parent deleted | - | +| 1300 | Chain reorg orphans child | Child needs re-validation | Validation fails | + +These production observations informed the comprehensive verification approach. + +## Safety Guarantees + +### Edge Case 1: Child Never Mined + +**Scenario:** Parent fully spent at block 1000, child remains in block assembly + +| Block Height | Event | New System Behavior | +|--------------|-------|---------------------| +| 1000 | Parent fully spent (child in block assembly) | DAH = 1288, children in spending_data | +| 1288 | Cleanup runs | Child unmined → ❌ Skip (keeps parent) | +| 1500 | Cleanup runs | Child still unmined → ❌ Skip (keeps parent) | +| 2000 | Cleanup runs | Child still unmined → ❌ Skip (keeps parent) | + +**Result:** Parent is never deleted as long as child remains unmined. This is correct behavior - we cannot delete a parent whose child might still validate. + +### Edge Case 2: Child Mined Very Late + +**Scenario:** Parent fully spent at block 1000, child not mined until block 1500 + +| Block Height | Event | New System Behavior | +|--------------|-------|---------------------| +| 1000 | Parent fully spent | DAH = 1288, children in spending_data | +| 1288 | Cleanup runs | Child not mined yet → ❌ Skip | +| 1500 | Child finally mined | Child @ height 1500 | +| 1788 | Cleanup runs | 1788 - 1500 = 288 → ✅ Delete safely! | + +**Result:** The parent waits until the child is actually mined AND stable, regardless of how long that takes. + +### Edge Case 3: Chain Reorganization + +**Scenario:** Chain reorg orphans the child + +| Block Height | Event | System Behavior | +|--------------|-------|-----------------| +| 1000 | Parent fully spent by Child | DAH = 1288, children in spending_data | +| 1100 | Child mined | Child @ height 1100 | +| 1200 | Chain reorg orphans Child | Child moved to unmined state | +| 1288 | Cleanup runs | Child unmined → ❌ Skip (keeps parent) | +| 1300 | Child re-mined at new height | Child @ height 1300 | +| 1588 | Cleanup runs | 1588 - 1300 = 288 → ✅ Delete safely! | + +**Result:** The parent is preserved through the reorg and only deleted after child stability is re-established at the new height. + +### Edge Case 4: Service Restart Race Condition (Previous Approach) + +**Scenario:** System restarts while parents have unmined children + +| Event | Previous Approach Behavior | Vulnerability | +|-------|---------------------------|---------------| +| Block 1000: Parent spent by unmined child | DAH = 1288 | - | +| Block 1400: System shutdown | preserve_until not set (preservation hasn't run yet) | - | +| Block 1500: System restart | - | - | +| Block 1500: Cleanup service starts | Sees parent with DAH=1288, preserve_until=NULL | - | +| Block 1500: Cleanup runs BEFORE preservation ticker | Parent deleted (no preservation yet) | **Parent deleted prematurely** | +| Block 1500: Preservation ticker finally starts | Tries to preserve parent (too late) | Child orphaned | + +**Architectural issue:** Previous approach relied on coordination between two independent processes: + +- Cleanup service (continuous) +- Preservation ticker (periodic) + +After restart, cleanup could run before preservation caught up, creating a race window. + +**Current approach eliminates this race:** + +- Verification happens atomically during cleanup +- No separate preservation process needed +- No coordination required +- Robust to restarts, delays, or timing issues + +## Implementation Details + +### Removed: Periodic Parent Preservation Workaround + +**Deleted files:** + +- `stores/utxo/cleanup_unmined.go` (126 lines) +- `stores/utxo/tests/cleanup_unmined_test.go` (415 lines) +- `test/e2e/daemon/wip/unmined_tx_cleanup_e2e_test.go` (488 lines) + +**Deleted function:** `PreserveParentsOfOldUnminedTransactions()` + +This periodic preservation workaround is no longer needed because: + +- **Old approach**: React to old unmined transactions by preserving their parents +- **New approach**: Proactively verify child stability before any deletion +- The new last_spender verification provides comprehensive protection, making the band-aid unnecessary + +### Lua Script Changes (teranode.lua) + +**Modified:** `setDeleteAtHeight()` function - No longer uses excessive buffer + +```lua +-- DAH now set at normal retention (same as old system) +local conservativeDeleteHeight = currentBlockHeight + blockHeightRetention +``` + +**Key insight:** No separate tracking needed! + +```lua +-- We do NOT track spending children separately +-- They are already embedded in UTXO spending_data (bytes 32-64 of each spent UTXO) +-- Cleanup service extracts ALL children from spent UTXOs for verification +``` + +This is more robust because: + +- Cannot miss any children (all are in spending_data) +- No risk of tracking only "last" child +- Simpler code - no special tracking logic needed + +### Aerospike Cleanup Service Changes + +**Modified:** `processRecordChunk()` function + +- Extracts ALL unique spending children from UTXO spending_data +- Scans every spent UTXO to find all child TX hashes +- Builds map of parent -> all children + +**Enhanced:** `batchVerifyChildrenSafety()` function + +- Batch reads ALL unique spending children in a chunk +- Checks each child's mined status and block height +- Returns safety map: `childHash -> isSafe` + +**Modified:** `processRecordCleanupWithSafetyMap()` + +- Verifies ALL children are stable (not just one) +- If ANY child is unstable, skips deletion +- Parent will be reconsidered in future cleanup passes + +### SQL Cleanup Service Changes + +**Modified:** `deleteTombstoned()` query + +- Uses `NOT EXISTS` to find ANY unstable child +- Extracts ALL children from `outputs.spending_data` +- Verifies ALL children are mined and stable +- If ANY child fails verification, parent is kept +- Comprehensive query that checks every spending child + +### Database Schema + +**No new fields required!** + +The spending children information is already present in existing data: + +- **Aerospike**: Embedded in UTXO `spending_data` (bytes 32-64 of each spent UTXO) +- **SQL**: Stored in `outputs.spending_data` column (bytes 1-32 contain child TX hash) + +This approach is superior because: + +- No redundant data storage +- Cannot get out of sync +- Automatically tracks ALL children +- Simpler schema + +## Benefits + +### 1. Addresses Production Edge Cases + +- Observed cases where children remained in block assembly past retention window +- Chain reorganizations affecting recently-mined transactions +- Verification ensures children are both mined AND stable before parent deletion +- Gracefully handles variable mining times + +### 2. Fail-Safe Operational Characteristics + +- Verification uncertainties lead to retention rather than deletion +- Observable in storage metrics +- Self-correcting through retry mechanism +- Allows investigation before any service impact + +### 3. Maintains Retention Efficiency + +- Uses same 288-block (~2 day) retention window +- Adds verification layer without extending base retention +- Optimal case (immediate mining): similar deletion timing +- Edge cases (delayed mining): waits appropriately for stability + +### 4. Robust Safety Properties + +- Verifies all spending children, regardless of count +- Multiple independent verification layers +- Handles variable mining times and reorganizations +- Conservative defaults on ambiguous cases + +### 5. Database Operation Trade-offs + +**Previous approach (main + preservation):** + +Case 1: Parent whose child mines normally (no preservation) + +- Spend: 1 read (parent), 1 write (set DAH) +- Cleanup: 1 read (parent), 1 delete, M writes (mark child as deleted in M parent TXs via deletedChildren map) +- **Total: 2 reads, 1+M writes** + +Case 2: Parent whose child remains unmined (triggers preservation) + +- Spend: 1 read (parent), 1 write (set DAH) +- Preservation: 1 index scan, 1 read (child TxInpoints), 1 write (set preserve_until on parents) +- Cleanup skips: K reads (while preserved) +- Expiration: 1 read (batch), 1 write (clear preserve, set DAH) +- Final cleanup: 1 read (parent), 1 delete, M writes (deletedChildren) +- **Total: 3+K reads, 3+M writes** + +**Current approach (all cases):** + +- Spend: 1 read (parent), 1 write (set DAH) +- Cleanup: 1 read (parent + UTXOs), N reads (verify N children via BatchGet), 1 delete, M writes (deletedChildren) +- **Total: 2+N reads, 1+M writes** + +Where: + +- N = unique children for this parent (typically 2-3, up to 100+) +- M = number of inputs (parents to update with deletedChildren) - same in both approaches +- K = number of cleanup skip attempts in previous preservation case + +**Analysis:** + +The current approach **always performs child verification reads** (N reads): + +- **Best case**: Parent with 2 children → 2+2 = 4 reads (vs 2 reads if no preservation needed) +- **Typical case**: Parent with 2-3 children → 4-5 reads (vs 2-5 reads depending on preservation) +- **Worst case**: Parent with 100 unique children → 100+ reads (vs 2-5 reads) + +**Trade-off evaluation:** + +- Previous approach: Variable overhead (only on problematic cases) +- Current approach: Consistent overhead (on ALL parents) +- Current does more work in common case, less in problematic case +- Current is heavier on database but provides comprehensive verification + +**Honest assessment:** The current approach increases database load by adding child verification reads to every parent deletion. This overhead is the cost of comprehensive safety verification - we're choosing correctness over efficiency. + +### 6. Simplified Architecture + +- Replaces periodic preservation with direct verification +- Removed `cleanup_unmined.go` and associated tests (1000+ lines) +- Single verification approach replaces multi-stage process +- Uses existing data (spending_data) rather than separate tracking +- Easier to reason about and maintain + +### 7. Clearer Intent + +- Code explicitly documents safety mechanism +- Easy to understand and verify correctness +- Separates concerns (eligibility vs. verification) +- Fail-safe approach makes debugging straightforward + +## Backward Compatibility + +### Schema Migration + +**No schema changes required!** + +The new implementation uses existing data: + +- **Aerospike**: Reads `utxos` bin (already exists) to extract spending children +- **SQL**: Queries `outputs.spending_data` column (already exists) + +**Migration:** + +- Zero schema changes needed +- No database migrations +- Works immediately on existing data +- Backward compatible with all existing records + +### Handling All Records (New and Old) + +The cleanup logic works identically for all records: + +1. Query records where `delete_at_height <= current_height` +2. For each record, extract ALL spending children from spending_data +3. Verify ALL children are mined and stable +4. Only delete if ALL children pass verification + +**This works for:** + +- New records (created with new code) +- Old records (created before this change) +- Conflicting transactions (no spending children) +- Edge cases (missing data → conservative, skip deletion) + +## Conclusion + +The safe deletion mechanism has evolved through three stages, each addressing observed challenges in production. + +### Evolution Timeline + +#### Stage 1: Initial Implementation + +- Time-based deletion using DAH +- Simple and predictable +- Worked well for typical cases + +#### Stage 2: Production Hardening + +- Added periodic parent preservation +- Addressed edge cases with long-running transactions +- Provided operational stability + +#### Stage 3: Comprehensive Solution (This Branch) + +- Direct verification of all children +- Consolidates previous multi-stage approach +- More efficient and complete + +### Key Characteristics + +#### Comprehensive Verification + +- Checks ALL spending children, not just one representative +- Handles parents with many outputs spent by different children +- Prevents edge case where early children could be orphaned + +#### Architectural Simplification + +- Eliminated multi-process coordination (cleanup + preservation) +- Removed 1000+ lines of workaround code +- Single atomic verification at deletion time +- No race conditions between independent processes + +#### Production-Safe Design + +- Verification issues manifest as observable retention +- Self-correcting through retry mechanism +- Clear operational signals for troubleshooting +- Storage metrics indicate when children block deletion + +### Summary + +The evolution of the deletion mechanism reflects iterative refinement based on production experience. Each stage addressed observed challenges: + +1. **Initial**: Time-based deletion (simple, predictable) +2. **Intermediate**: Added preservation for old unmined transactions (production stabilization) +3. **Current**: Comprehensive child verification (complete solution) + +**Architecture evolution:** + +- Consolidated multi-stage process into single atomic verification +- Leverages existing data (spending_data) rather than additional tracking +- Reduced code complexity (1000+ lines removed) +- Eliminated race conditions and process coordination complexity +- Trade-off: More database reads during cleanup for comprehensive verification + +#### Key design principle: Verify ALL children + +For parents with multiple outputs spent by different children: + +**Multi-child example:** + +```text +Block 100: Child A spends parent output 0 +Block 200: Child B spends parent output 1 (makes parent fully spent) +Block 488: Cleanup must verify BOTH Child A and Child B are stable +Block 500: If Child A reorganized but only Child B was checked → validation failure +``` + +**Implementation approach:** + +- Extract ALL spending children from spending_data (already embedded in UTXOs) +- Verify EVERY child is mined and stable +- If ANY child unstable → parent kept +- Ensures no child can be orphaned + +**Technical details:** + +- **Aerospike**: Scans all spent UTXOs to extract children (bytes 32-64 of each UTXO) +- **SQL**: Query checks all `outputs.spending_data` (bytes 1-32 of spending_data) +- **Both**: Single batch verification call for all unique children in a chunk +- Efficient due to batching and data locality diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index d5789e2e52..0a4b0f9745 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -149,6 +149,7 @@ type BlockAssembler struct { // unminedCleanupTicker manages periodic cleanup of old unmined transactions unminedCleanupTicker *time.Ticker + // cachedCandidate stores the cached mining candidate cachedCandidate *CachedMiningCandidate diff --git a/services/p2p/peer_registry_cache.go b/services/p2p/peer_registry_cache.go index cb563c67ce..b1aa8abeab 100644 --- a/services/p2p/peer_registry_cache.go +++ b/services/p2p/peer_registry_cache.go @@ -75,8 +75,8 @@ func getPeerRegistryCacheFilePath(configuredDir string) string { // SavePeerRegistryCache saves the peer registry data to a JSON file func (pr *PeerRegistry) SavePeerRegistryCache(cacheDir string) error { - pr.mu.RLock() - defer pr.mu.RUnlock() + pr.mu.Lock() + defer pr.mu.Unlock() cache := &PeerRegistryCache{ Version: PeerRegistryCacheVersion, diff --git a/services/pruner/server.go b/services/pruner/server.go index 133ba5a520..18e36e50d3 100644 --- a/services/pruner/server.go +++ b/services/pruner/server.go @@ -133,7 +133,6 @@ func (s *Server) Init(ctx context.Context) error { case s.prunerCh <- height32: s.logger.Debugf("Queued pruning for height %d from BlockPersisted notification", height32) default: - s.logger.Debugf("Pruning already in progress for height %d", height32) } } } @@ -145,7 +144,6 @@ func (s *Server) Init(ctx context.Context) error { persistedHeight := s.lastPersistedHeight.Load() if persistedHeight > 0 { // Block persister is running - BlockPersisted notifications will handle pruning - s.logger.Debugf("Block notification received but block persister is active (persisted height: %d), skipping", persistedHeight) continue } @@ -186,7 +184,6 @@ func (s *Server) Init(ctx context.Context) error { case s.prunerCh <- state.CurrentHeight: s.logger.Debugf("Queued pruning for height %d from Block notification (mined_set=true)", state.CurrentHeight) default: - s.logger.Debugf("Pruning already in progress for height %d", state.CurrentHeight) } } } diff --git a/services/pruner/worker.go b/services/pruner/worker.go index 372757a571..c2a6b3c059 100644 --- a/services/pruner/worker.go +++ b/services/pruner/worker.go @@ -104,37 +104,65 @@ func (s *Server) prunerProcessor(ctx context.Context) { continue } - // Wait for pruner to complete with timeout + // Wait for pruner to complete with optional timeout prunerTimeout := s.settings.Pruner.JobTimeout - timeoutTimer := time.NewTimer(prunerTimeout) - defer timeoutTimer.Stop() - select { - case status := <-doneCh: - if status != "completed" { - s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status) - prunerErrors.WithLabelValues("dah_pruner").Inc() - } else { - s.logger.Infof("Pruner for height %d completed successfully", latestHeight) - prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds()) - prunerProcessed.Inc() + if prunerTimeout == 0 { + // Wait indefinitely (no timeout) + select { + case status := <-doneCh: + if status != "completed" { + s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status) + prunerErrors.WithLabelValues("dah_pruner").Inc() + } else { + // Get job info to log records processed + recordsProcessed := int64(0) + if job := s.prunerService.GetJobByHeight(latestHeight); job != nil { + recordsProcessed = job.RecordsProcessed.Load() + } + s.logger.Infof("Pruner for height %d completed successfully, pruned %d records", latestHeight, recordsProcessed) + prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds()) + prunerProcessed.Inc() + } + case <-ctx.Done(): + return } - case <-timeoutTimer.C: - s.logger.Infof("Pruner for height %d exceeded coordinator timeout of %v - pruner continues in background, re-queuing immediately", latestHeight, prunerTimeout) - // Note: This is not an error - the pruner job continues processing in the background. - // The coordinator re-queues immediately to check again. - // Very large pruners may take longer than the timeout and require multiple iterations. + } else { + // Use timeout + timeoutTimer := time.NewTimer(prunerTimeout) + defer timeoutTimer.Stop() - // Immediately re-queue to check again (non-blocking) select { - case s.prunerCh <- latestHeight: - s.logger.Debugf("Re-queued pruner for height %d after timeout", latestHeight) - default: - // Channel full, will be retried when notifications trigger again - s.logger.Debugf("Pruner channel full, will retry on next notification") + case status := <-doneCh: + if status != "completed" { + s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status) + prunerErrors.WithLabelValues("dah_pruner").Inc() + } else { + // Get job info to log records processed + recordsProcessed := int64(0) + if job := s.prunerService.GetJobByHeight(latestHeight); job != nil { + recordsProcessed = job.RecordsProcessed.Load() + } + s.logger.Infof("Pruner for height %d completed successfully, pruned %d records", latestHeight, recordsProcessed) + prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds()) + prunerProcessed.Inc() + } + case <-timeoutTimer.C: + s.logger.Infof("Pruner for height %d exceeded coordinator timeout of %v - pruner continues in background", latestHeight, prunerTimeout) + // Note: This is not an error - the pruner job continues processing in the background. + // Only re-queue if channel is empty (next trigger will catch up if channel has pending jobs). + + // Only re-queue if channel is empty (non-blocking check) + select { + case s.prunerCh <- latestHeight: + s.logger.Debugf("Re-queued pruner for height %d (channel was empty)", latestHeight) + default: + // Channel has pending jobs, no need to re-queue + s.logger.Debugf("Pruner channel has pending jobs, skipping re-queue") + } + case <-ctx.Done(): + return } - case <-ctx.Done(): - return } } diff --git a/settings.conf b/settings.conf index ae82e17222..b957354b6a 100644 --- a/settings.conf +++ b/settings.conf @@ -583,6 +583,29 @@ pruner_grpcListenAddress.docker.host = localhost:${PORT_PREFIX}${PRUNER_GRPC_POR # Default: 10m pruner_jobTimeout = 10m +# UTXO-specific pruning settings +# Enable defensive checks before deleting UTXO transactions +# When enabled, verifies that all spending children are mined > BlockHeightRetention blocks ago +# Default: false +pruner_utxoDefensiveEnabled = false + +# UTXO batcher settings - optimized for multi-million record pruning operations +# These settings control batching of operations during delete-at-height (DAH) UTXO pruning +# Larger batch sizes = fewer round-trips to storage = faster pruning +pruner_utxoParentUpdateBatcherSize = 2000 +pruner_utxoParentUpdateBatcherDurationMillis = 100 +pruner_utxoDeleteBatcherSize = 5000 +pruner_utxoDeleteBatcherDurationMillis = 100 + +# Maximum concurrent operations during UTXO pruning +# Increase this if you have a large connection pool and want faster pruning +# Default: 1 (auto-detect from connection pool) +pruner_utxoMaxConcurrentOperations = 1 + +# Batch size for reading child transactions during defensive UTXO pruning +# Default: 1024 +pruner_utxoDefensiveBatchReadSize = 1024 + # @group: dashboard # Vite dev server ports (comma-separated) # dashboard_devServerPorts = 5173,4173 @@ -1493,19 +1516,6 @@ utxostore_storeBatcherDurationMillis = 10 utxostore_storeBatcherSize = 2048 -# Pruner batcher settings - optimized for multi-million record pruning operations -# These settings control batching of operations during delete-at-height (DAH) pruning -# Larger batch sizes = fewer round-trips to Aerospike = faster pruning -utxostore_prunerParentUpdateBatcherSize = 2000 -utxostore_prunerParentUpdateBatcherDurationMillis = 100 -utxostore_prunerDeleteBatcherSize = 5000 -utxostore_prunerDeleteBatcherDurationMillis = 100 - -# Maximum concurrent operations during pruning (0 = use Aerospike connection queue size) -# Increase this if you have a large connection pool and want faster pruning -# Default: 0 (auto-detect from connection pool) -utxostore_prunerMaxConcurrentOperations = 0 - # this value determines if a caching mechanism will be used for external transactions # if you have the memory available, it will speed up your IBD # and it is required for large blocks which load in the same tx multiple times, e.g. 814337 diff --git a/settings/interface.go b/settings/interface.go index 71b84ca0d6..ed20345244 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -381,12 +381,6 @@ type UtxoStoreSettings struct { MaxMinedBatchSize int BlockHeightRetentionAdjustment int32 // Adjustment to GlobalBlockHeightRetention (can be positive or negative) DisableDAHCleaner bool // Disable the DAH cleaner process completely - // Pruner-specific settings - PrunerParentUpdateBatcherSize int // Batch size for parent record updates during pruning - PrunerParentUpdateBatcherDurationMillis int // Batch duration for parent record updates during pruning (ms) - PrunerDeleteBatcherSize int // Batch size for record deletions during pruning - PrunerDeleteBatcherDurationMillis int // Batch duration for record deletions during pruning (ms) - PrunerMaxConcurrentOperations int // Maximum concurrent operations during pruning (0 = use connection queue size) } type P2PSettings struct { @@ -489,10 +483,15 @@ type CoinbaseSettings struct { } type PrunerSettings struct { - GRPCListenAddress string - GRPCAddress string - WorkerCount int - JobTimeout time.Duration // Timeout for waiting for pruner job completion + GRPCListenAddress string + GRPCAddress string + WorkerCount int // Number of worker goroutines (default: 1) + JobTimeout time.Duration // Timeout for waiting for pruner job completion (0 = wait indefinitely) + UTXODefensiveEnabled bool // Enable defensive checks before deleting UTXO transactions (verify children are mined > BlockHeightRetention blocks ago) + UTXODefensiveBatchReadSize int // Batch size for reading child transactions during defensive UTXO pruning (default: 10000) + UTXOChunkSize int // Number of records to process in each chunk before batch flushing (default: 1000) + UTXOChunkGroupLimit int // Maximum parallel chunk processing during UTXO pruning (default: 10) + UTXOProgressLogInterval time.Duration // Interval for logging progress during UTXO pruning (default: 30s) } type SubtreeValidationSettings struct { diff --git a/settings/settings.go b/settings/settings.go index 56865793eb..b101138c81 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -366,12 +366,6 @@ func NewSettings(alternativeContext ...string) *Settings { MaxMinedBatchSize: getInt("utxostore_maxMinedBatchSize", 1024, alternativeContext...), BlockHeightRetentionAdjustment: getInt32("utxostore_blockHeightRetentionAdjustment", 0, alternativeContext...), DisableDAHCleaner: getBool("utxostore_disableDAHCleaner", false, alternativeContext...), - // Pruner-specific settings optimized for multi-million record pruning operations - PrunerParentUpdateBatcherSize: getInt("utxostore_prunerParentUpdateBatcherSize", 2000, alternativeContext...), - PrunerParentUpdateBatcherDurationMillis: getInt("utxostore_prunerParentUpdateBatcherDurationMillis", 100, alternativeContext...), - PrunerDeleteBatcherSize: getInt("utxostore_prunerDeleteBatcherSize", 5000, alternativeContext...), - PrunerDeleteBatcherDurationMillis: getInt("utxostore_prunerDeleteBatcherDurationMillis", 100, alternativeContext...), - PrunerMaxConcurrentOperations: getInt("utxostore_prunerMaxConcurrentOperations", 0, alternativeContext...), }, P2P: P2PSettings{ BlockTopic: getString("p2p_block_topic", "", alternativeContext...), @@ -433,10 +427,15 @@ func NewSettings(alternativeContext ...string) *Settings { DistributorTimeout: getDuration("distributor_timeout", 30*time.Second, alternativeContext...), }, Pruner: PrunerSettings{ - GRPCAddress: getString("pruner_grpcAddress", "localhost:8096", alternativeContext...), - GRPCListenAddress: getString("pruner_grpcListenAddress", ":8096", alternativeContext...), - WorkerCount: getInt("pruner_workerCount", 4, alternativeContext...), // Default to 4 workers - JobTimeout: getDuration("pruner_jobTimeout", 10*time.Minute, alternativeContext...), + GRPCAddress: getString("pruner_grpcAddress", "localhost:8096", alternativeContext...), + GRPCListenAddress: getString("pruner_grpcListenAddress", ":8096", alternativeContext...), + WorkerCount: getInt("pruner_workerCount", 1, alternativeContext...), // Single job at a time + JobTimeout: getDuration("pruner_jobTimeout", 0, alternativeContext...), // 0 = wait indefinitely + UTXODefensiveEnabled: getBool("pruner_utxoDefensiveEnabled", false, alternativeContext...), // Defensive mode off by default (production) + UTXODefensiveBatchReadSize: getInt("pruner_utxoDefensiveBatchReadSize", 10000, alternativeContext...), // Batch size for child verification + UTXOChunkSize: getInt("pruner_utxoChunkSize", 1000, alternativeContext...), // Chunk size for batch operations + UTXOChunkGroupLimit: getInt("pruner_utxoChunkGroupLimit", 10, alternativeContext...), // Process 10 chunks in parallel + UTXOProgressLogInterval: getDuration("pruner_utxoProgressLogInterval", 30*time.Second, alternativeContext...), // Progress every 30s }, SubtreeValidation: SubtreeValidationSettings{ QuorumAbsoluteTimeout: getDuration("subtree_quorum_absolute_timeout", 30*time.Second, alternativeContext...), diff --git a/stores/pruner/interfaces.go b/stores/pruner/interfaces.go index 83b9837458..05bfe13323 100644 --- a/stores/pruner/interfaces.go +++ b/stores/pruner/interfaces.go @@ -15,6 +15,9 @@ type Service interface { // SetPersistedHeightGetter sets the function used to get block persister progress. // This allows pruner to coordinate with block persister to avoid premature deletion. SetPersistedHeightGetter(getter func() uint32) + + // GetJobByHeight returns a job for the specified block height + GetJobByHeight(blockHeight uint32) *Job } // PrunerServiceProvider defines an interface for stores that can provide a pruner service. diff --git a/stores/pruner/job.go b/stores/pruner/job.go index 3ac4ea799c..950bb3a76f 100644 --- a/stores/pruner/job.go +++ b/stores/pruner/job.go @@ -38,15 +38,16 @@ func (s JobStatus) String() string { // Job represents a pruner job type Job struct { - BlockHeight uint32 - status atomic.Int32 // Using atomic for thread-safe access - Error error - Created time.Time - Started time.Time - Ended time.Time - ctx context.Context - cancel context.CancelFunc - DoneCh chan string // Channel to signal when the job is complete (for testing purposes) + BlockHeight uint32 + status atomic.Int32 // Using atomic for thread-safe access + RecordsProcessed atomic.Int64 // Number of records processed/pruned + Error error + Created time.Time + Started time.Time + Ended time.Time + ctx context.Context + cancel context.CancelFunc + DoneCh chan string // Channel to signal when the job is complete (for testing purposes) } // GetStatus returns the current status of the job diff --git a/stores/pruner/job_processor.go b/stores/pruner/job_processor.go index 210a6aa808..465d832c5d 100644 --- a/stores/pruner/job_processor.go +++ b/stores/pruner/job_processor.go @@ -286,6 +286,20 @@ func (m *JobManager) GetJobs() []*Job { return clone } +// GetJobByHeight returns a job for the specified block height +func (m *JobManager) GetJobByHeight(blockHeight uint32) *Job { + m.jobsMutex.RLock() + defer m.jobsMutex.RUnlock() + + for _, job := range m.jobs { + if job.BlockHeight == blockHeight { + return job + } + } + + return nil +} + // worker processes jobs from the queue func (m *JobManager) worker(workerID int) { m.logger.Debugf("[JobManager] Worker %d started", workerID) diff --git a/stores/utxo/aerospike/pruner/pruner_service.go b/stores/utxo/aerospike/pruner/pruner_service.go index c574b71ed9..3ed27cb5ca 100644 --- a/stores/utxo/aerospike/pruner/pruner_service.go +++ b/stores/utxo/aerospike/pruner/pruner_service.go @@ -4,9 +4,11 @@ import ( "bytes" "context" "sync" + "sync/atomic" "time" "github.com/aerospike/aerospike-client-go/v8" + "github.com/aerospike/aerospike-client-go/v8/types" "github.com/bsv-blockchain/go-bt/v2" "github.com/bsv-blockchain/go-bt/v2/chainhash" "github.com/bsv-blockchain/teranode/errors" @@ -21,6 +23,7 @@ import ( "github.com/ordishs/gocore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" ) // Ensure Store implements the Pruner Service interface @@ -77,9 +80,12 @@ type Options struct { } // Service manages background jobs for cleaning up records based on block height +// Service implements the pruner.Service interface for Aerospike-backed UTXO stores. +// This service extracts configuration values as fields during initialization rather than +// storing the settings object, optimizing for performance in hot paths where settings +// are accessed millions of times (e.g., utxoBatchSize in per-record processing loops). type Service struct { logger ulogger.Logger - settings *settings.Settings client *uaerospike.Client external blob.Store namespace string @@ -88,7 +94,20 @@ type Service struct { ctx context.Context indexWaiter IndexWaiter - // internally reused variables + // Configuration values extracted from settings for performance + utxoBatchSize int + blockHeightRetention uint32 + defensiveEnabled bool + defensiveBatchReadSize int + chunkSize int + chunkGroupLimit int + progressLogInterval time.Duration + + // Cached field names (avoid repeated String() allocations in hot paths) + fieldTxID, fieldUtxos, fieldInputs, fieldDeletedChildren, fieldExternal string + fieldDeleteAtHeight, fieldTotalExtraRecs, fieldUnminedSince, fieldBlockHeights string + + // Internally reused variables queryPolicy *aerospike.QueryPolicy writePolicy *aerospike.WritePolicy batchWritePolicy *aerospike.BatchWritePolicy @@ -97,10 +116,6 @@ type Service struct { // getPersistedHeight returns the last block height processed by block persister // Used to coordinate cleanup with block persister progress (can be nil) getPersistedHeight func() uint32 - - // maxConcurrentOperations limits concurrent operations during cleanup processing - // Auto-detected from Aerospike client connection queue size - maxConcurrentOperations int } // parentUpdateInfo holds accumulated parent update information for batching @@ -164,33 +179,35 @@ func NewService(tSettings *settings.Settings, opts Options) (*Service, error) { // Use the configured batch policy from settings (configured via aerospike_batchPolicy URL) batchPolicy := util.GetAerospikeBatchPolicy(tSettings) - // Determine max concurrent operations: - // - Use connection queue size as the upper bound (to prevent connection exhaustion) - // - If setting is configured (non-zero), use the minimum of setting and connection queue size - // - If setting is 0 or unset, use connection queue size - connectionQueueSize := opts.Client.GetConnectionQueueSize() - maxConcurrentOps := connectionQueueSize - if tSettings.UtxoStore.PrunerMaxConcurrentOperations > 0 { - if tSettings.UtxoStore.PrunerMaxConcurrentOperations < maxConcurrentOps { - maxConcurrentOps = tSettings.UtxoStore.PrunerMaxConcurrentOperations - } - } - service := &Service{ - logger: opts.Logger, - settings: tSettings, - client: opts.Client, - external: opts.ExternalStore, - namespace: opts.Namespace, - set: opts.Set, - ctx: opts.Ctx, - indexWaiter: opts.IndexWaiter, - queryPolicy: queryPolicy, - writePolicy: writePolicy, - batchWritePolicy: batchWritePolicy, - batchPolicy: batchPolicy, - getPersistedHeight: opts.GetPersistedHeight, - maxConcurrentOperations: maxConcurrentOps, + logger: opts.Logger, + client: opts.Client, + external: opts.ExternalStore, + namespace: opts.Namespace, + set: opts.Set, + ctx: opts.Ctx, + indexWaiter: opts.IndexWaiter, + queryPolicy: queryPolicy, + writePolicy: writePolicy, + batchWritePolicy: batchWritePolicy, + batchPolicy: batchPolicy, + getPersistedHeight: opts.GetPersistedHeight, + utxoBatchSize: tSettings.UtxoStore.UtxoBatchSize, + blockHeightRetention: tSettings.GetUtxoStoreBlockHeightRetention(), + defensiveEnabled: tSettings.Pruner.UTXODefensiveEnabled, + defensiveBatchReadSize: tSettings.Pruner.UTXODefensiveBatchReadSize, + chunkSize: tSettings.Pruner.UTXOChunkSize, + chunkGroupLimit: tSettings.Pruner.UTXOChunkGroupLimit, + progressLogInterval: tSettings.Pruner.UTXOProgressLogInterval, + fieldTxID: fields.TxID.String(), + fieldUtxos: fields.Utxos.String(), + fieldInputs: fields.Inputs.String(), + fieldDeletedChildren: fields.DeletedChildren.String(), + fieldExternal: fields.External.String(), + fieldDeleteAtHeight: fields.DeleteAtHeight.String(), + fieldTotalExtraRecs: fields.TotalExtraRecs.String(), + fieldUnminedSince: fields.UnminedSince.String(), + fieldBlockHeights: fields.BlockHeights.String(), } // Create the job processor function @@ -282,9 +299,6 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { job.SetStatus(pruner.JobStatusRunning) job.Started = time.Now() - // Get the job's context for cancellation support - jobCtx := job.Context() - s.logger.Infof("Worker %d starting cleanup job for block height %d", workerID, job.BlockHeight) // BLOCK PERSISTER COORDINATION: Calculate safe cleanup height @@ -316,7 +330,7 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { // Only apply limitation if block persister has actually processed blocks (height > 0) if persistedHeight > 0 { - retention := s.settings.GetUtxoStoreBlockHeightRetention() + retention := s.blockHeightRetention // Calculate max safe height: persisted_height + retention // Block persister at height N means blocks 0 to N are persisted in .subtree_data files. @@ -332,11 +346,18 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { // Create a query statement stmt := aerospike.NewStatement(s.namespace, s.set) - stmt.BinNames = []string{fields.TxID.String(), fields.DeleteAtHeight.String(), fields.Inputs.String(), fields.External.String(), fields.TotalExtraRecs.String()} + // Fetch minimal bins for production (non-defensive), full bins for defensive mode + binNames := []string{s.fieldTxID, s.fieldExternal, s.fieldTotalExtraRecs, s.fieldInputs} + if s.defensiveEnabled { + binNames = append(binNames, s.fieldDeleteAtHeight, s.fieldUtxos, s.fieldDeletedChildren) + } else { + binNames = append(binNames, s.fieldDeleteAtHeight) + } + stmt.BinNames = binNames // Set the filter to find records with a delete_at_height less than or equal to the safe cleanup height // This will automatically use the index since the filter is on the indexed bin - err := stmt.SetFilter(aerospike.NewRangeFilter(fields.DeleteAtHeight.String(), 1, int64(safeCleanupHeight))) + err := stmt.SetFilter(aerospike.NewRangeFilter(s.fieldDeleteAtHeight, 1, int64(safeCleanupHeight))) if err != nil { job.SetStatus(pruner.JobStatusFailed) job.Error = err @@ -358,30 +379,91 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { defer recordset.Close() result := recordset.Results() - recordCount := int64(0) - lastProgressLog := time.Now() - progressLogInterval := 30 * time.Second + recordCount := atomic.Int64{} + skippedCount := atomic.Int64{} // Records skipped due to defensive logic + + // Process records in chunks for efficient batch verification of children + // Chunk size configurable via pruner_utxoChunkSize setting (default: 1000) + chunk := make([]*aerospike.Result, 0, s.chunkSize) + + // Use errgroup to process chunks in parallel with controlled concurrency + chunkGroup := &errgroup.Group{} + // Limit parallel chunk processing to avoid overwhelming the system + // Configurable via pruner_utxoChunkGroupLimit setting (default: 10) + util.SafeSetLimit(chunkGroup, s.chunkGroupLimit) // Log initial start s.logger.Infof("Worker %d: starting cleanup scan for height %d (delete_at_height <= %d)", workerID, job.BlockHeight, safeCleanupHeight) - // Batch accumulation slices - batchSize := s.settings.UtxoStore.PrunerDeleteBatcherSize - parentUpdates := make(map[string]*parentUpdateInfo) // keyed by parent txid - deletions := make([]*aerospike.Key, 0, batchSize) - externalFiles := make([]*externalFileInfo, 0, batchSize) + // Start progress logging ticker if interval is configured + var progressTicker *time.Ticker + var progressDone chan struct{} + if s.progressLogInterval > 0 { + progressTicker = time.NewTicker(s.progressLogInterval) + progressDone = make(chan struct{}) + + go func() { + for { + select { + case <-progressTicker.C: + current := recordCount.Load() + skipped := skippedCount.Load() + elapsed := time.Since(job.Started) + if s.defensiveEnabled { + s.logger.Infof("Worker %d: pruner progress at height %d: pruned %d records, skipped %d records (elapsed: %v)", + workerID, job.BlockHeight, current, skipped, elapsed) + } else { + s.logger.Infof("Worker %d: pruner progress at height %d: pruned %d records (elapsed: %v)", + workerID, job.BlockHeight, current, elapsed) + } + case <-progressDone: + return + } + } + }() + + // Ensure ticker is stopped and goroutine is cleaned up + defer func() { + progressTicker.Stop() + close(progressDone) + }() + } + + // Helper to submit a chunk for processing + submitChunk := func(chunkToProcess []*aerospike.Result) { + // Copy chunk for goroutine to avoid race + chunkCopy := make([]*aerospike.Result, len(chunkToProcess)) + copy(chunkCopy, chunkToProcess) + + chunkGroup.Go(func() error { + processed, skipped, err := s.processRecordChunk(job, workerID, chunkCopy) + if err != nil { + return err + } + recordCount.Add(int64(processed)) + skippedCount.Add(int64(skipped)) + return nil + }) + } + + // Get job context for cancellation support + jobCtx := job.Context() - // Process records and accumulate into batches + // Process records and accumulate into chunks for { - // Check for cancellation before processing next record + // Check for cancellation before processing next chunk select { case <-jobCtx.Done(): s.logger.Infof("Worker %d: cleanup job for height %d cancelled", workerID, job.BlockHeight) recordset.Close() - // Flush any accumulated operations before exiting (use parent context since jobCtx is cancelled) - if err := s.flushCleanupBatches(s.ctx, workerID, job.BlockHeight, parentUpdates, deletions, externalFiles); err != nil { - s.logger.Errorf("Worker %d: error flushing batches during cancellation: %v", workerID, err) + // Process any accumulated chunk before exiting + if len(chunk) > 0 { + submitChunk(chunk) + } + // Wait for submitted chunks to complete + if err := chunkGroup.Wait(); err != nil { + s.logger.Errorf("Worker %d: error in chunks during cancellation: %v", workerID, err) } s.markJobAsFailed(job, errors.NewProcessingError("Worker %d: cleanup job for height %d cancelled", workerID, job.BlockHeight)) return @@ -390,142 +472,481 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { rec, ok := <-result if !ok || rec == nil { - break // No more records + // Process final chunk if any + if len(chunk) > 0 { + submitChunk(chunk) + } + break } - if rec.Err != nil { - s.logger.Errorf("Worker %d: error reading record: %v", workerID, rec.Err) + chunk = append(chunk, rec) + + // Process chunk when full (in parallel) + if len(chunk) >= s.chunkSize { + submitChunk(chunk) + chunk = chunk[:0] // Reset chunk + } + } + + // Wait for all parallel chunks to complete + if err := chunkGroup.Wait(); err != nil { + s.logger.Errorf("Worker %d: error processing chunks: %v", workerID, err) + s.markJobAsFailed(job, err) + return + } + + finalRecordCount := recordCount.Load() + finalSkippedCount := skippedCount.Load() + + // Set job status and record count + job.SetStatus(pruner.JobStatusCompleted) + job.RecordsProcessed.Store(finalRecordCount) + job.Ended = time.Now() + + // Summary log: total pruned and total prevented by defensive logic + if s.defensiveEnabled { + s.logger.Infof("Worker %d completed cleanup job for block height %d in %v: pruned %d records, skipped %d records (defensive logic)", + workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount, finalSkippedCount) + } else { + s.logger.Infof("Worker %d completed cleanup job for block height %d in %v: pruned %d records", + workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount) + } + + prometheusUtxoCleanupBatch.Observe(float64(time.Since(job.Started).Microseconds()) / 1_000_000) +} + +// processRecordChunk processes a chunk of parent records with batched child verification +// Returns: (processedCount, skippedCount, error) +func (s *Service) processRecordChunk(job *pruner.Job, workerID int, chunk []*aerospike.Result) (int, int, error) { + if len(chunk) == 0 { + return 0, 0, nil + } + + // Defensive child verification is conditional on the UTXODefensiveEnabled setting + // When disabled, parents are deleted without verifying children are stable + var safetyMap map[string]bool + var parentToChildren map[string][]string + + if !s.defensiveEnabled { + // Defensive mode disabled - allow all deletions without child verification + safetyMap = make(map[string]bool) + parentToChildren = make(map[string][]string) + } else { + // Step 1: Extract ALL unique spending children from chunk + // For each parent record, we extract all spending child TX hashes from spent UTXOs + // We must verify EVERY child is stable before deleting the parent + uniqueSpendingChildren := make(map[string][]byte, 100000) // hex hash -> bytes (typical: ~50-100 children per chunk) + parentToChildren = make(map[string][]string, len(chunk)) // parent record key -> child hashes + deletedChildren := make(map[string]bool, 20) // child hash -> already deleted (typical: 0-20) + + for _, rec := range chunk { + if rec.Err != nil || rec.Record == nil || rec.Record.Bins == nil { + continue + } + + // Extract deletedChildren map from parent record + // If a child is in this map, it means it was already pruned and shouldn't block parent deletion + if deletedChildrenRaw, hasDeleted := rec.Record.Bins[s.fieldDeletedChildren]; hasDeleted { + if deletedMap, ok := deletedChildrenRaw.(map[interface{}]interface{}); ok { + for childHashIface := range deletedMap { + if childHashStr, ok := childHashIface.(string); ok { + deletedChildren[childHashStr] = true + // s.logger.Debugf("Worker %d: Found deleted child in parent record: %s", workerID, childHashStr[:8]) + } + } + } else { + s.logger.Debugf("Worker %d: deletedChildren bin wrong type: %T", workerID, deletedChildrenRaw) + } + } + + // Extract all spending children from this parent's UTXOs + utxosRaw, hasUtxos := rec.Record.Bins[s.fieldUtxos] + if !hasUtxos { + continue + } + + utxosList, ok := utxosRaw.([]interface{}) + if !ok { + continue + } + + parentKey := rec.Record.Key.String() + childrenForThisParent := make([]string, 0, 16) // Pre-allocate for typical ~10 spent UTXOs per tx + + // Scan all UTXOs for spending data + for _, utxoRaw := range utxosList { + utxoBytes, ok := utxoRaw.([]byte) + if !ok || len(utxoBytes) < 68 { // 32 (utxo hash) + 36 (spending data) + continue + } + + // spending_data starts at byte 32, first 32 bytes of spending_data is child TX hash + childTxHashBytes := utxoBytes[32:64] + + // Check if this is actual spending data (not all zeros) + hasSpendingData := false + for _, b := range childTxHashBytes { + if b != 0 { + hasSpendingData = true + break + } + } + + if hasSpendingData { + hexHash := chainhash.Hash(childTxHashBytes).String() + uniqueSpendingChildren[hexHash] = childTxHashBytes + childrenForThisParent = append(childrenForThisParent, hexHash) + // s.logger.Debugf("Worker %d: Extracted spending child from UTXO: %s", workerID, hexHash[:8]) + } + } + + if len(childrenForThisParent) > 0 { + parentToChildren[parentKey] = childrenForThisParent + } + } + + // Step 2: Batch verify all unique children (single BatchGet call for entire chunk) + if len(uniqueSpendingChildren) > 0 { + safetyMap = s.batchVerifyChildrenSafety(uniqueSpendingChildren, job.BlockHeight, deletedChildren) + } else { + safetyMap = make(map[string]bool) + } + } + + // Step 3: Accumulate operations for entire chunk, then flush once (efficient batching) + allParentUpdates := make(map[string]*parentUpdateInfo, 1000) // Accumulate all parent updates for chunk + allDeletions := make([]*aerospike.Key, 0, 1000) // Accumulate all deletions for chunk + allExternalFiles := make([]*externalFileInfo, 0, 10) // Accumulate external files (<1%) + processedCount := 0 + skippedCount := 0 + + for _, rec := range chunk { + if rec.Err != nil || rec.Record == nil || rec.Record.Bins == nil { continue } - // Extract record data - bins := rec.Record.Bins - txHash, err := s.extractTxHash(bins) - if err != nil { - s.logger.Errorf("Worker %d: %v", workerID, err) + txIDBytes, ok := rec.Record.Bins[s.fieldTxID].([]byte) + if !ok || len(txIDBytes) != 32 { continue } - inputs, err := s.extractInputs(job, workerID, bins, txHash) + txHash, err := chainhash.NewHash(txIDBytes) if err != nil { - s.logger.Errorf("Worker %d: %v", workerID, err) continue } + // Check if children are safe (defensive mode only) + parentKey := rec.Record.Key.String() + childrenHashes, hasChildren := parentToChildren[parentKey] + + if hasChildren && len(childrenHashes) > 0 { + allSafe := true + var unsafeChild string + for _, childHash := range childrenHashes { + if !safetyMap[childHash] { + allSafe = false + unsafeChild = childHash + break + } + } + + if !allSafe { + // Skip this record - at least one child not stable + s.logger.Infof("Defensive skip - parent %s cannot be deleted due to unstable child %s (%d children total)", + txHash.String(), unsafeChild, len(childrenHashes)) + skippedCount++ + continue + } + } + + // Safe to delete - get inputs for parent updates + inputs, err := s.getTxInputsFromBins(job, workerID, rec.Record.Bins, txHash) + if err != nil { + return 0, 0, err + } + // Accumulate parent updates for _, input := range inputs { - // Calculate the parent key for this input - keySource := uaerospike.CalculateKeySource(input.PreviousTxIDChainHash(), input.PreviousTxOutIndex, s.settings.UtxoStore.UtxoBatchSize) + keySource := uaerospike.CalculateKeySource(input.PreviousTxIDChainHash(), input.PreviousTxOutIndex, s.utxoBatchSize) parentKeyStr := string(keySource) - if existing, ok := parentUpdates[parentKeyStr]; ok { - // Add this transaction to the list of deleted children for this parent + if existing, ok := allParentUpdates[parentKeyStr]; ok { existing.childHashes = append(existing.childHashes, txHash) } else { parentKey, err := aerospike.NewKey(s.namespace, s.set, keySource) if err != nil { - s.logger.Errorf("Worker %d: failed to create parent key: %v", workerID, err) - continue + return 0, 0, err } - parentUpdates[parentKeyStr] = &parentUpdateInfo{ + allParentUpdates[parentKeyStr] = &parentUpdateInfo{ key: parentKey, childHashes: []*chainhash.Hash{txHash}, } } } - // Handle external transactions: add file for deletion - external, isExternal := bins[fields.External.String()].(bool) + // Accumulate external files + external, isExternal := rec.Record.Bins[s.fieldExternal].(bool) if isExternal && external { - // Determine file type: if we found inputs, it's a .tx file, otherwise it's .outputs fileType := fileformat.FileTypeOutputs if len(inputs) > 0 { fileType = fileformat.FileTypeTx } - externalFiles = append(externalFiles, &externalFileInfo{ + allExternalFiles = append(allExternalFiles, &externalFileInfo{ txHash: txHash, fileType: fileType, }) } - // Accumulate deletions: master record + any child records - deletions = append(deletions, rec.Record.Key) + // Accumulate deletions (master + child records) + allDeletions = append(allDeletions, rec.Record.Key) - // If this is a multi-record transaction, delete all child records - totalExtraRecs, hasExtraRecs := bins[fields.TotalExtraRecs.String()].(int) - if hasExtraRecs && totalExtraRecs > 0 { - // Generate keys for all child records: txid_1, txid_2, ..., txid_N + if totalExtraRecs, hasExtraRecs := rec.Record.Bins[s.fieldTotalExtraRecs].(int); hasExtraRecs && totalExtraRecs > 0 { for i := 1; i <= totalExtraRecs; i++ { childKeySource := uaerospike.CalculateKeySourceInternal(txHash, uint32(i)) childKey, err := aerospike.NewKey(s.namespace, s.set, childKeySource) - if err != nil { - s.logger.Errorf("Worker %d: failed to create child key for %s_%d: %v", workerID, txHash.String(), i, err) - continue + if err == nil { + allDeletions = append(allDeletions, childKey) } - deletions = append(deletions, childKey) } - s.logger.Debugf("Worker %d: deleting external tx %s with %d child records", workerID, txHash.String(), totalExtraRecs) } - recordCount++ + processedCount++ + } + + // Flush all accumulated operations in one batch per chunk + ctx := s.ctx + if ctx == nil { + ctx = context.Background() + } + if err := s.flushCleanupBatches(ctx, workerID, job.BlockHeight, allParentUpdates, allDeletions, allExternalFiles); err != nil { + return 0, 0, err + } + + return processedCount, skippedCount, nil +} - // Log progress - if recordCount%10000 == 0 || time.Since(lastProgressLog) > progressLogInterval { - s.logger.Infof("Worker %d: cleanup progress for height %d - processed %d records so far", - workerID, job.BlockHeight, recordCount) - lastProgressLog = time.Now() +// batchVerifyChildrenSafety checks multiple child transactions at once to determine if their parents +// can be safely deleted. This is much more efficient than checking each child individually. +// +// Safety guarantee: A parent can only be deleted if ALL spending children have been mined and stable +// for at least 288 blocks. This prevents orphaning children by ensuring we never delete a parent while +// ANY of its spending children might still be reorganized out of the chain. +// +// The spending children are extracted from the parent's UTXO spending_data (embedded in each spent UTXO). +// This ensures we verify EVERY child that spent any output, not just one representative child. +// +// Parameters: +// - spendingChildrenHashes: Map of child TX hashes to verify (32 bytes each) - ALL unique children +// - currentBlockHeight: Current block height for safety window calculation +// +// Returns: +// - map[string]bool: Map of childHash (hex string) -> isSafe (true = this child is stable) +func (s *Service) batchVerifyChildrenSafety(lastSpenderHashes map[string][]byte, currentBlockHeight uint32, deletedChildren map[string]bool) map[string]bool { + if len(lastSpenderHashes) == 0 { + return make(map[string]bool) + } + + safetyMap := make(map[string]bool, len(lastSpenderHashes)) + + // Mark already-deleted children as safe immediately + // If a child is in deletedChildren, it means it was already pruned successfully + // and shouldn't block the parent from being pruned + for hexHash := range deletedChildren { + if _, exists := lastSpenderHashes[hexHash]; exists { + safetyMap[hexHash] = true } + } - // Execute batch when full - if len(deletions) >= batchSize { - if err := s.flushCleanupBatches(jobCtx, workerID, job.BlockHeight, parentUpdates, deletions, externalFiles); err != nil { - s.logger.Errorf("Worker %d: error flushing batches: %v", workerID, err) - // Continue processing despite flush error - will retry at end - } - parentUpdates = make(map[string]*parentUpdateInfo) - deletions = make([]*aerospike.Key, 0, batchSize) - externalFiles = make([]*externalFileInfo, 0, batchSize) + // Process children in batches to avoid overwhelming Aerospike + batchSize := s.defensiveBatchReadSize + if batchSize <= 0 { + batchSize = 1024 // Default batch size if not configured + } + + // Convert map to slice for batching, skipping already-deleted children + // Children in deletedChildren are already marked as safe, no need to query Aerospike + hashEntries := make([]childHashEntry, 0, len(lastSpenderHashes)) + for hexHash, hashBytes := range lastSpenderHashes { + // Skip children that are already marked as safe (deleted) + if safetyMap[hexHash] { + continue + } + hashEntries = append(hashEntries, childHashEntry{hexHash: hexHash, hashBytes: hashBytes}) + } + + // Process in batches + for i := 0; i < len(hashEntries); i += batchSize { + end := i + batchSize + if end > len(hashEntries) { + end = len(hashEntries) + } + batch := hashEntries[i:end] + + s.processBatchOfChildren(batch, safetyMap, currentBlockHeight) + } + + return safetyMap +} + +// childHashEntry holds a child transaction hash for batch processing +type childHashEntry struct { + hexHash string + hashBytes []byte +} + +// processBatchOfChildren verifies a batch of child transactions +func (s *Service) processBatchOfChildren(batch []childHashEntry, safetyMap map[string]bool, currentBlockHeight uint32) { + // Create batch read operations + batchPolicy := aerospike.NewBatchPolicy() + batchPolicy.MaxRetries = 3 + batchPolicy.TotalTimeout = 120 * time.Second + + readPolicy := aerospike.NewBatchReadPolicy() + readPolicy.ReadModeSC = aerospike.ReadModeSCSession + + batchRecords := make([]aerospike.BatchRecordIfc, 0, len(batch)) + hashToKey := make(map[string]string, len(batch)) // hex hash -> key for mapping + + for _, entry := range batch { + hexHash := entry.hexHash + hashBytes := entry.hashBytes + if len(hashBytes) != 32 { + s.logger.Warnf("[batchVerifyChildrenSafety] Invalid hash length for %s", hexHash) + safetyMap[hexHash] = false + continue + } + + childHash, err := chainhash.NewHash(hashBytes) + if err != nil { + s.logger.Warnf("[batchVerifyChildrenSafety] Failed to create hash: %v", err) + safetyMap[hexHash] = false + continue + } + + key, err := aerospike.NewKey(s.namespace, s.set, childHash[:]) + if err != nil { + s.logger.Warnf("[batchVerifyChildrenSafety] Failed to create key for child %s: %v", childHash.String(), err) + safetyMap[hexHash] = false + continue } + + batchRecords = append(batchRecords, aerospike.NewBatchRead( + readPolicy, + key, + []string{s.fieldUnminedSince, s.fieldBlockHeights}, + )) + hashToKey[hexHash] = key.String() } - // Flush any remaining operations - if err := s.flushCleanupBatches(jobCtx, workerID, job.BlockHeight, parentUpdates, deletions, externalFiles); err != nil { - s.logger.Errorf("Worker %d: error flushing final batches: %v", workerID, err) - s.markJobAsFailed(job, errors.NewStorageError("failed to flush final batches", err)) + if len(batchRecords) == 0 { return } - // Check if we were cancelled - wasCancelled := false - select { - case <-jobCtx.Done(): - wasCancelled = true - s.logger.Infof("Worker %d: cleanup job for height %d was cancelled", workerID, job.BlockHeight) - default: - // Not cancelled + // Execute batch operation + err := s.client.BatchOperate(batchPolicy, batchRecords) + if err != nil { + s.logger.Errorf("[processBatchOfChildren] Batch operation failed: %v", err) + // Mark all in this batch as unsafe on batch error + for hexHash := range hashToKey { + safetyMap[hexHash] = false + } + return } - s.logger.Infof("Worker %d: processed %d records for cleanup job %d", workerID, recordCount, job.BlockHeight) + // Process results - use configured retention setting as safety window + safetyWindow := s.blockHeightRetention - // Set appropriate status based on cancellation - if wasCancelled { - job.SetStatus(pruner.JobStatusCancelled) - s.logger.Infof("Worker %d cancelled cleanup job for block height %d after %v, processed %d records before cancellation", - workerID, job.BlockHeight, time.Since(job.Started), recordCount) - } else { - job.SetStatus(pruner.JobStatusCompleted) - s.logger.Infof("Worker %d completed cleanup job for block height %d in %v, processed %d records", - workerID, job.BlockHeight, time.Since(job.Started), recordCount) + // Build reverse map for O(1) lookup instead of O(n²) nested loop + // This avoids scanning all batch records for each child hash + keyToRecord := make(map[string]*aerospike.BatchRecord, len(batchRecords)) + for _, batchRec := range batchRecords { + keyToRecord[batchRec.BatchRec().Key.String()] = batchRec.BatchRec() } - job.Ended = time.Now() - prometheusUtxoCleanupBatch.Observe(float64(time.Since(job.Started).Microseconds()) / 1_000_000) + for hexHash, keyStr := range hashToKey { + // O(1) map lookup instead of O(n) scan + record := keyToRecord[keyStr] + if record == nil { + safetyMap[hexHash] = false + continue + } + + if record.Err != nil { + // Check if this is a "key not found" error - child was already deleted + // This can happen due to race conditions when processing chunks in parallel: + // - Chunk 1 deletes child C and updates parent A's deletedChildren + // - Chunk 2 already loaded parent A (before the update) and now queries child C + // - Child C is gone, so we get KEY_NOT_FOUND_ERROR + // In this case, the child is ALREADY deleted, so it's safe to consider it stable + if aerospikeErr, ok := record.Err.(*aerospike.AerospikeError); ok { + if aerospikeErr.ResultCode == types.KEY_NOT_FOUND_ERROR { + // Child already deleted by another chunk - safe to proceed with parent deletion + safetyMap[hexHash] = true + continue + } + } + // Any other error → be conservative, don't delete parent + s.logger.Warnf("[batchVerifyChildrenSafety] Unexpected error for child %s: %v", hexHash[:8], record.Err) + safetyMap[hexHash] = false + continue + } + + if record.Record == nil || record.Record.Bins == nil { + safetyMap[hexHash] = false + continue + } + + bins := record.Record.Bins + + // Check unmined status + unminedSince, hasUnminedSince := bins[s.fieldUnminedSince] + if hasUnminedSince && unminedSince != nil { + // Child is unmined, not safe + safetyMap[hexHash] = false + continue + } + + // Check block heights + blockHeightsRaw, hasBlockHeights := bins[s.fieldBlockHeights] + if !hasBlockHeights { + // No block heights, treat as not safe + safetyMap[hexHash] = false + continue + } + + blockHeightsList, ok := blockHeightsRaw.([]interface{}) + if !ok || len(blockHeightsList) == 0 { + safetyMap[hexHash] = false + continue + } + + // Find maximum block height + var maxChildBlockHeight uint32 + for _, heightRaw := range blockHeightsList { + height, ok := heightRaw.(int) + if ok && uint32(height) > maxChildBlockHeight { + maxChildBlockHeight = uint32(height) + } + } + + if maxChildBlockHeight == 0 { + safetyMap[hexHash] = false + continue + } + + // Check if child has been stable long enough + if currentBlockHeight < maxChildBlockHeight+safetyWindow { + safetyMap[hexHash] = false + } else { + safetyMap[hexHash] = true + } + } } func (s *Service) getTxInputsFromBins(job *pruner.Job, workerID int, bins aerospike.BinMap, txHash *chainhash.Hash) ([]*bt.Input, error) { var inputs []*bt.Input - external, ok := bins[fields.External.String()].(bool) + external, ok := bins[s.fieldExternal].(bool) if ok && external { // transaction is external, we need to get the data from the external store txBytes, err := s.external.Get(s.ctx, txHash.CloneBytes(), fileformat.FileTypeTx) @@ -534,7 +955,7 @@ func (s *Service) getTxInputsFromBins(job *pruner.Job, workerID int, bins aerosp // Check if outputs exist (sometimes only outputs are stored) exists, err := s.external.Exists(s.ctx, txHash.CloneBytes(), fileformat.FileTypeOutputs) if err != nil { - return nil, errors.NewProcessingError("Worker %d: error checking existence of outputs for external tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: error checking existence of outputs for external tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) } if exists { @@ -548,18 +969,18 @@ func (s *Service) getTxInputsFromBins(job *pruner.Job, workerID int, bins aerosp return []*bt.Input{}, nil } // Other errors should still be reported - return nil, errors.NewProcessingError("Worker %d: error getting external tx %s for cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: error getting external tx %s for cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) } tx, err := bt.NewTxFromBytes(txBytes) if err != nil { - return nil, errors.NewProcessingError("Worker %d: invalid tx bytes for external tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: invalid tx bytes for external tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) } inputs = tx.Inputs } else { // get the inputs from the record directly - inputsValue := bins[fields.Inputs.String()] + inputsValue := bins[s.fieldInputs] if inputsValue == nil { // Inputs field might be nil for certain records (e.g., coinbase) return []*bt.Input{}, nil @@ -579,7 +1000,7 @@ func (s *Service) getTxInputsFromBins(job *pruner.Job, workerID int, bins aerosp inputs[i] = &bt.Input{} if _, err := inputs[i].ReadFrom(bytes.NewReader(input)); err != nil { - return nil, errors.NewProcessingError("Worker %d: invalid input for record in cleanup job %d", workerID, job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: invalid input for record in cleanup job %d: %v", workerID, job.BlockHeight, err) } } } @@ -621,7 +1042,7 @@ func (s *Service) flushCleanupBatches(ctx context.Context, workerID int, blockHe // extractTxHash extracts the transaction hash from record bins func (s *Service) extractTxHash(bins aerospike.BinMap) (*chainhash.Hash, error) { - txIDBytes, ok := bins[fields.TxID.String()].([]byte) + txIDBytes, ok := bins[s.fieldTxID].([]byte) if !ok || len(txIDBytes) != 32 { return nil, errors.NewProcessingError("invalid or missing txid") } @@ -654,7 +1075,7 @@ func (s *Service) executeBatchParentUpdates(ctx context.Context, workerID int, b // For each child transaction being deleted, add it to the DeletedChildren map ops := make([]*aerospike.Operation, len(info.childHashes)) for i, childHash := range info.childHashes { - ops[i] = aerospike.MapPutOp(mapPolicy, fields.DeletedChildren.String(), + ops[i] = aerospike.MapPutOp(mapPolicy, s.fieldDeletedChildren, aerospike.NewStringValue(childHash.String()), aerospike.BoolValue(true)) } @@ -810,9 +1231,9 @@ func (s *Service) ProcessSingleRecord(txHash *chainhash.Hash, inputs []*bt.Input } // Build parent updates map - parentUpdates := make(map[string]*parentUpdateInfo) + parentUpdates := make(map[string]*parentUpdateInfo, len(inputs)) // One parent per input (worst case) for _, input := range inputs { - keySource := uaerospike.CalculateKeySource(input.PreviousTxIDChainHash(), input.PreviousTxOutIndex, s.settings.UtxoStore.UtxoBatchSize) + keySource := uaerospike.CalculateKeySource(input.PreviousTxIDChainHash(), input.PreviousTxOutIndex, s.utxoBatchSize) parentKeyStr := string(keySource) if existing, ok := parentUpdates[parentKeyStr]; ok { @@ -837,3 +1258,8 @@ func (s *Service) ProcessSingleRecord(txHash *chainhash.Hash, inputs []*bt.Input func (s *Service) GetJobs() []*pruner.Job { return s.jobManager.GetJobs() } + +// GetJobByHeight returns a job for the specified block height +func (s *Service) GetJobByHeight(blockHeight uint32) *pruner.Job { + return s.jobManager.GetJobByHeight(blockHeight) +} diff --git a/stores/utxo/aerospike/pruner/pruner_service_test.go b/stores/utxo/aerospike/pruner/pruner_service_test.go index 112849e227..3b029076d0 100644 --- a/stores/utxo/aerospike/pruner/pruner_service_test.go +++ b/stores/utxo/aerospike/pruner/pruner_service_test.go @@ -23,12 +23,15 @@ import ( func createTestSettings() *settings.Settings { return &settings.Settings{ UtxoStore: settings.UtxoStoreSettings{ - PrunerParentUpdateBatcherSize: 100, - PrunerParentUpdateBatcherDurationMillis: 10, - PrunerDeleteBatcherSize: 256, - PrunerDeleteBatcherDurationMillis: 10, - PrunerMaxConcurrentOperations: 0, // 0 = auto-detect from connection queue size - UtxoBatchSize: 128, // Add missing UtxoBatchSize + UtxoBatchSize: 128, + }, + Pruner: settings.PrunerSettings{ + WorkerCount: 1, + JobTimeout: 0, + UTXODefensiveEnabled: false, + UTXODefensiveBatchReadSize: 10000, + UTXOChunkGroupLimit: 10, + UTXOProgressLogInterval: 30 * time.Second, }, } } diff --git a/stores/utxo/sql/mock.go b/stores/utxo/sql/mock.go index 2b6b21f086..ee2e601ea3 100644 --- a/stores/utxo/sql/mock.go +++ b/stores/utxo/sql/mock.go @@ -388,27 +388,27 @@ func SetupCreatePostgresSchemaSuccessMocks(mockDB *MockDB) { return strings.Contains(query, "DO $$") && strings.Contains(query, "block_ids") && strings.Contains(query, "ADD COLUMN") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) - // 13. ADD COLUMN unmined_since to transactions (DO $$ block) + // 12. ADD COLUMN unmined_since to transactions (DO $$ block) mockDB.On("Exec", mock.MatchedBy(func(query string) bool { return strings.Contains(query, "DO $$") && strings.Contains(query, "unmined_since") && strings.Contains(query, "ADD COLUMN") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) - // 14. ADD COLUMN preserve_until to transactions (DO $$ block) + // 13. ADD COLUMN preserve_until to transactions (DO $$ block) mockDB.On("Exec", mock.MatchedBy(func(query string) bool { return strings.Contains(query, "DO $$") && strings.Contains(query, "preserve_until") && strings.Contains(query, "ADD COLUMN") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) - // 15. DROP CONSTRAINT block_ids_transaction_id_fkey (DO $$ block) + // 14. DROP CONSTRAINT block_ids_transaction_id_fkey (DO $$ block) mockDB.On("Exec", mock.MatchedBy(func(query string) bool { return strings.Contains(query, "DO $$") && strings.Contains(query, "block_ids_transaction_id_fkey") && strings.Contains(query, "DROP CONSTRAINT") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) - // 16. ADD CONSTRAINT block_ids_transaction_id_fkey (DO $$ block) + // 15. ADD CONSTRAINT block_ids_transaction_id_fkey (DO $$ block) mockDB.On("Exec", mock.MatchedBy(func(query string) bool { return strings.Contains(query, "DO $$") && strings.Contains(query, "block_ids_transaction_id_fkey") && strings.Contains(query, "ADD CONSTRAINT") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) - // 17. CREATE TABLE conflicting_children + // 16. CREATE TABLE IF NOT EXISTS conflicting_children mockDB.On("Exec", mock.MatchedBy(func(query string) bool { return strings.Contains(query, "CREATE TABLE IF NOT EXISTS conflicting_children") }), mock.Anything).Return(sqlmock.NewResult(0, 0), nil) diff --git a/stores/utxo/sql/pruner/mock.go b/stores/utxo/sql/pruner/mock.go index e74a0deb84..8ee9b94ffc 100644 --- a/stores/utxo/sql/pruner/mock.go +++ b/stores/utxo/sql/pruner/mock.go @@ -236,7 +236,7 @@ func (m *MockStmt) Close() error { } func (m *MockStmt) NumInput() int { - return 1 + return -1 // -1 means don't check argument count } func (m *MockStmt) Exec(args []driver.Value) (driver.Result, error) { diff --git a/stores/utxo/sql/pruner/pruner_service.go b/stores/utxo/sql/pruner/pruner_service.go index b9424c961c..245b8769b2 100644 --- a/stores/utxo/sql/pruner/pruner_service.go +++ b/stores/utxo/sql/pruner/pruner_service.go @@ -9,7 +9,6 @@ import ( "github.com/bsv-blockchain/teranode/stores/pruner" "github.com/bsv-blockchain/teranode/ulogger" "github.com/bsv-blockchain/teranode/util/usql" - "github.com/ordishs/gocore" ) // Ensure Store implements the Pruner Service interface @@ -25,6 +24,8 @@ const ( // Service implements the utxo.CleanupService interface for SQL-based UTXO stores type Service struct { + safetyWindow uint32 // Block height retention for child stability verification + defensiveEnabled bool // Enable defensive checks before deleting UTXO transactions logger ulogger.Logger settings *settings.Settings db *usql.DB @@ -49,6 +50,10 @@ type Options struct { // Ctx is the context to use to signal shutdown Ctx context.Context + + // SafetyWindow is the number of blocks a child must be stable before parent deletion + // If not specified, defaults to global_blockHeightRetention (288 blocks) + SafetyWindow uint32 } // NewService creates a new cleanup service for the SQL store @@ -67,7 +72,7 @@ func NewService(tSettings *settings.Settings, opts Options) (*Service, error) { workerCount := opts.WorkerCount if workerCount <= 0 { - workerCount, _ = gocore.Config().GetInt("sql_cleanup_worker_count", DefaultWorkerCount) + workerCount = DefaultWorkerCount } maxJobsHistory := opts.MaxJobsHistory @@ -75,11 +80,19 @@ func NewService(tSettings *settings.Settings, opts Options) (*Service, error) { maxJobsHistory = DefaultMaxJobsHistory } + safetyWindow := opts.SafetyWindow + if safetyWindow == 0 { + // Default to global retention setting (288 blocks) + safetyWindow = tSettings.GlobalBlockHeightRetention + } + service := &Service{ - logger: opts.Logger, - settings: tSettings, - db: opts.DB, - ctx: opts.Ctx, + safetyWindow: safetyWindow, + defensiveEnabled: tSettings.Pruner.UTXODefensiveEnabled, + logger: opts.Logger, + settings: tSettings, + db: opts.DB, + ctx: opts.Ctx, } // Create the job processor function @@ -129,6 +142,11 @@ func (s *Service) GetJobs() []*pruner.Job { return s.jobManager.GetJobs() } +// GetJobByHeight returns a job for the specified block height +func (s *Service) GetJobByHeight(blockHeight uint32) *pruner.Job { + return s.jobManager.GetJobByHeight(blockHeight) +} + // processCleanupJob executes the cleanup for a specific job func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { s.logger.Debugf("[SQLCleanupService %d] running cleanup job for block height %d", workerID, job.BlockHeight) @@ -183,7 +201,7 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { workerID, job.BlockHeight, safeCleanupHeight) // Execute the cleanup with safe height - deletedCount, err := deleteTombstonedWithCount(s.db, safeCleanupHeight) + deletedCount, err := s.deleteTombstoned(safeCleanupHeight) if err != nil { job.SetStatus(pruner.JobStatusFailed) @@ -193,6 +211,7 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { s.logger.Errorf("[SQLCleanupService %d] cleanup job failed for block height %d: %v", workerID, job.BlockHeight, err) } else { job.SetStatus(pruner.JobStatusCompleted) + job.RecordsProcessed.Store(deletedCount) job.Ended = time.Now() s.logger.Infof("[SQLCleanupService %d] cleanup job completed for block height %d in %v - deleted %d records", @@ -200,11 +219,61 @@ func (s *Service) processCleanupJob(job *pruner.Job, workerID int) { } } -// deleteTombstonedWithCount removes transactions that have passed their expiration time and returns the count. -func deleteTombstonedWithCount(db *usql.DB, blockHeight uint32) (int64, error) { - // Delete transactions that have passed their expiration time - // this will cascade to inputs, outputs, block_ids and conflicting_children - result, err := db.Exec("DELETE FROM transactions WHERE delete_at_height <= $1", blockHeight) +// deleteTombstoned removes transactions that have passed their expiration time. +// Only deletes parent transactions if their last spending child is mined and stable. +func (s *Service) deleteTombstoned(blockHeight uint32) (int64, error) { + // Use configured safety window from settings + safetyWindow := s.safetyWindow + + // Defensive child verification is conditional on the UTXODefensiveEnabled setting + // When disabled, parents are deleted without verifying children are stable + var deleteQuery string + var result interface{ RowsAffected() (int64, error) } + var err error + + if !s.defensiveEnabled { + // Defensive mode disabled - delete all transactions past their expiration + deleteQuery = ` + DELETE FROM transactions + WHERE delete_at_height IS NOT NULL + AND delete_at_height <= $1 + ` + result, err = s.db.Exec(deleteQuery, blockHeight) + } else { + // Defensive mode enabled - verify ALL spending children are stable before deletion + // This prevents orphaning any child transaction + deleteQuery = ` + DELETE FROM transactions + WHERE id IN ( + SELECT t.id + FROM transactions t + WHERE t.delete_at_height IS NOT NULL + AND t.delete_at_height <= $1 + AND NOT EXISTS ( + -- Find ANY unstable child - if found, parent cannot be deleted + -- This ensures ALL children must be stable before parent deletion + SELECT 1 + FROM outputs o + WHERE o.transaction_id = t.id + AND o.spending_data IS NOT NULL + AND ( + -- Extract child TX hash from spending_data (first 32 bytes) + -- Check if this child is NOT stable + NOT EXISTS ( + SELECT 1 + FROM transactions child + INNER JOIN block_ids child_blocks ON child.id = child_blocks.transaction_id + WHERE child.hash = substr(o.spending_data, 1, 32) + AND child.unmined_since IS NULL -- Child must be mined + AND child_blocks.block_height <= ($1 - $2) -- Child must be stable + ) + ) + ) + ) + ` + result, err = s.db.Exec(deleteQuery, blockHeight, safetyWindow) + } + if err != nil { return 0, errors.NewStorageError("failed to delete transactions", err) } @@ -216,14 +285,3 @@ func deleteTombstonedWithCount(db *usql.DB, blockHeight uint32) (int64, error) { return count, nil } - -// deleteTombstoned removes transactions that have passed their expiration time. -func deleteTombstoned(db *usql.DB, blockHeight uint32) error { - // Delete transactions that have passed their expiration time - // this will cascade to inputs, outputs, block_ids and conflicting_children - if _, err := db.Exec("DELETE FROM transactions WHERE delete_at_height <= $1", blockHeight); err != nil { - return errors.NewStorageError("failed to delete transactions", err) - } - - return nil -} diff --git a/stores/utxo/sql/pruner/pruner_service_test.go b/stores/utxo/sql/pruner/pruner_service_test.go index 66e7d0a971..63cb8b5d9e 100644 --- a/stores/utxo/sql/pruner/pruner_service_test.go +++ b/stores/utxo/sql/pruner/pruner_service_test.go @@ -247,10 +247,11 @@ func TestService_processCleanupJob(t *testing.T) { assert.Nil(t, job.Error) // Verify logging - assert.Len(t, loggedMessages, 3) + assert.GreaterOrEqual(t, len(loggedMessages), 2) assert.Contains(t, loggedMessages[0], "running cleanup job") - assert.Contains(t, loggedMessages[1], "starting cleanup scan") - assert.Contains(t, loggedMessages[2], "cleanup job completed") + // loggedMessages[1] is "starting cleanup scan" + // Last message should be completion + assert.Contains(t, loggedMessages[len(loggedMessages)-1], "cleanup job completed") }) t.Run("FailedCleanup", func(t *testing.T) { @@ -327,9 +328,11 @@ func TestDeleteTombstoned(t *testing.T) { logger := &MockLogger{} db := NewMockDB() db.ExecFunc = func(query string, args ...interface{}) (sql.Result, error) { - assert.Equal(t, "DELETE FROM transactions WHERE delete_at_height <= $1", query) - assert.Len(t, args, 1) + // Child safety query expects 2 parameters: blockHeight and safetyWindow + assert.Contains(t, query, "DELETE FROM transactions") + assert.Len(t, args, 2) assert.Equal(t, uint32(100), args[0]) + assert.Equal(t, uint32(288), args[1]) // safetyWindow return &MockResult{rowsAffected: 5}, nil } @@ -533,6 +536,10 @@ func TestService_EdgeCases(t *testing.T) { } db := NewMockDB() + // Configure mock to handle child safety query with 2 parameters + db.ExecFunc = func(query string, args ...interface{}) (sql.Result, error) { + return &MockResult{rowsAffected: 0}, nil + } service, err := NewService(createTestSettings(), Options{ Logger: logger, diff --git a/stores/utxo/sql/sql.go b/stores/utxo/sql/sql.go index 92ee8cca6a..43cd8a04a9 100644 --- a/stores/utxo/sql/sql.go +++ b/stores/utxo/sql/sql.go @@ -1016,42 +1016,51 @@ func (s *Store) spendWithRetry(ctx context.Context, tx *bt.Tx, blockHeight uint3 } func (s *Store) setDAH(ctx context.Context, txn *sql.Tx, transactionID int) error { - // doing 2 updates is the only thing that works in both postgres and sqlite - qSetDAH := ` - UPDATE transactions - SET delete_at_height = $2 - WHERE id = $1 + if s.settings.GetUtxoStoreBlockHeightRetention() == 0 { + return nil + } + + // check whether the transaction has any unspent outputs + qUnspent := ` + SELECT count(o.idx), t.conflicting + FROM transactions t + LEFT JOIN outputs o ON t.id = o.transaction_id + AND o.spending_data IS NULL + WHERE t.id = $1 + GROUP BY t.id ` - if s.settings.GetUtxoStoreBlockHeightRetention() > 0 { - // check whether the transaction has any unspent outputs - qUnspent := ` - SELECT count(o.idx), t.conflicting - FROM transactions t - LEFT JOIN outputs o ON t.id = o.transaction_id - AND o.spending_data IS NULL - WHERE t.id = $1 - GROUP BY t.id - ` + var ( + unspent int + conflicting bool + deleteAtHeightOrNull sql.NullInt64 + ) - var ( - unspent int - conflicting bool - deleteAtHeightOrNull sql.NullInt64 - ) + if err := txn.QueryRowContext(ctx, qUnspent, transactionID).Scan(&unspent, &conflicting); err != nil { + return errors.NewStorageError("[setDAH] error checking for unspent outputs for %d", transactionID, err) + } - if err := txn.QueryRowContext(ctx, qUnspent, transactionID).Scan(&unspent, &conflicting); err != nil { - return errors.NewStorageError("[setDAH] error checking for unspent outputs for %d", transactionID, err) - } + if unspent == 0 || conflicting { + // Transaction is fully spent or conflicting + // Set DAH at normal retention - cleanup service will verify child stability (288 blocks) + // before actually deleting, providing the real safety guarantee + conservativeRetention := s.settings.GetUtxoStoreBlockHeightRetention() + _ = deleteAtHeightOrNull.Scan(int64(s.blockHeight.Load() + conservativeRetention)) - if unspent == 0 || conflicting { - // Now mark the transaction as tombstoned if there are no more unspent outputs - _ = deleteAtHeightOrNull.Scan(int64(s.blockHeight.Load() + s.settings.GetUtxoStoreBlockHeightRetention())) - } + // Note: We do NOT track spending children separately + // They are derived from outputs.spending_data when needed by cleanup + // This ensures we verify ALL children (not just one) before parent deletion + } - if _, err := txn.ExecContext(ctx, qSetDAH, transactionID, deleteAtHeightOrNull); err != nil { - return errors.NewStorageError("[setDAH] error setting DAH for %d", transactionID, err) - } + // Update delete_at_height + qUpdate := ` + UPDATE transactions + SET delete_at_height = $2 + WHERE id = $1 + ` + + if _, err := txn.ExecContext(ctx, qUpdate, transactionID, deleteAtHeightOrNull); err != nil { + return errors.NewStorageError("[setDAH] error setting DAH for %d", transactionID, err) } return nil diff --git a/stores/utxo/sql/sql_test.go b/stores/utxo/sql/sql_test.go index 38faf15af6..ad15430b1c 100644 --- a/stores/utxo/sql/sql_test.go +++ b/stores/utxo/sql/sql_test.go @@ -479,7 +479,7 @@ func TestTombstoneAfterSpendAndUnspend(t *testing.T) { logger := ulogger.TestLogger{} tSettings := test.CreateBaseTestSettings(t) tSettings.UtxoStore.DBTimeout = 30 * time.Second - tSettings.GlobalBlockHeightRetention = 1 // Set low retention for this test + tSettings.GlobalBlockHeightRetention = 5 // Use low retention but compatible with child stability checks tx, err := bt.NewTxFromString("010000000000000000ef01032e38e9c0a84c6046d687d10556dcacc41d275ec55fc00779ac88fdf357a18700000000" + "8c493046022100c352d3dd993a981beba4a63ad15c209275ca9470abfcd57da93b58e4eb5dce82022100840792bc1f456062819f15d33ee7055cf7b5" + @@ -527,19 +527,24 @@ func TestTombstoneAfterSpendAndUnspend(t *testing.T) { require.Fail(t, "cleanup job did not complete within 5 seconds") } - // Verify the transaction is now gone (tombstoned) + // With delete-at-height-safely feature: + // Since the spending child (spendTx01) is not stored in the database with block_ids, + // the cleanup service cannot verify it's stable. This is CONSERVATIVE BEHAVIOR - + // when child stability cannot be verified, parent is kept for safety. + // This prevents potential orphaning of children during reorganizations. + + // Verify the transaction still exists (conservative: kept when child unverifiable) _, err = utxoStore.Get(ctx, tx.TxIDChainHash()) - require.Error(t, err) - assert.True(t, errors.Is(err, errors.ErrTxNotFound)) + require.NoError(t, err, "Parent kept when spending child cannot be verified - this is safe, conservative behavior") // Part 2: Test tombstone after unspend - err = utxoStore.SetBlockHeight(2) + err = utxoStore.SetBlockHeight(20) require.NoError(t, err) err = utxoStore.Delete(ctx, tx.TxIDChainHash()) require.NoError(t, err) - _, err = utxoStore.Create(ctx, tx, 0) + _, err = utxoStore.Create(ctx, tx, 20) require.NoError(t, err) // Calculate the UTXO hash for output 0 @@ -556,18 +561,18 @@ func TestTombstoneAfterSpendAndUnspend(t *testing.T) { SpendingData: spendingData, } - // Spend the transaction - _, err = utxoStore.Spend(ctx, spendTx01, utxoStore.GetBlockHeight()+1) + // Spend the transaction at height 21 + _, err = utxoStore.Spend(ctx, spendTx01, 21) require.NoError(t, err) - // Unspend output 0 + // Unspend output 0 (transaction is no longer fully spent, so shouldn't be deleted) err = utxoStore.Unspend(ctx, []*utxo.Spend{spend0}) require.NoError(t, err) - // Run cleanup for block height 1 + // Run cleanup at height 21 doneCh = make(chan string, 1) - err = cleanupService.UpdateBlockHeight(2, doneCh) + err = cleanupService.UpdateBlockHeight(21, doneCh) require.NoError(t, err) select { diff --git a/util/testutil/common.go b/util/testutil/common.go index 68ac0b3e62..2c2e478bb9 100644 --- a/util/testutil/common.go +++ b/util/testutil/common.go @@ -25,9 +25,18 @@ type CommonTestSetup struct { // NewCommonTestSetup creates the basic test infrastructure used by most service tests func NewCommonTestSetup(t *testing.T) *CommonTestSetup { + logger := ulogger.NewErrorTestLogger(t) + + // Register cleanup to shutdown logger before test completes + // This prevents race conditions where background goroutines try to log + // after the test's *testing.T context has been marked as done + t.Cleanup(func() { + logger.Shutdown() + }) + return &CommonTestSetup{ Ctx: context.Background(), - Logger: ulogger.NewErrorTestLogger(t), + Logger: logger, Settings: test.CreateBaseTestSettings(t), } }