Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d058c4e
Delete at height safely
freemans13 Nov 19, 2025
e5715ec
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Nov 19, 2025
aa71d4a
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Nov 27, 2025
8d21c08
settings tweak
freemans13 Nov 27, 2025
10e875d
deduplicate chunk processing
freemans13 Nov 27, 2025
2a067f1
check context while processing
freemans13 Nov 27, 2025
4bab408
remove unused last_spend
freemans13 Nov 27, 2025
387512d
remove last_spender
freemans13 Nov 27, 2025
9365072
move pruner settings from utxostore to pruner
freemans13 Nov 27, 2025
1c0f56e
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Nov 27, 2025
f8e708c
fix merge issue on deleteTombstoned()
freemans13 Nov 27, 2025
e9315cd
spelling
freemans13 Nov 27, 2025
08808c2
flaky test tweak
freemans13 Nov 27, 2025
1dbbe26
flaky test fix
freemans13 Nov 27, 2025
4175259
flaky test fix
freemans13 Nov 27, 2025
0512cd8
defensive toggle
freemans13 Nov 27, 2025
91607cb
remove bloated logging
freemans13 Nov 28, 2025
e5600e0
fixes
freemans13 Nov 28, 2025
067cb97
opimisations
freemans13 Nov 28, 2025
abb3e01
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Nov 28, 2025
a2d4d8f
test fix
freemans13 Nov 28, 2025
84dda1a
sql utxostore delete transactions tweak
freemans13 Nov 28, 2025
c4b1270
tweaks
freemans13 Dec 1, 2025
346dafc
utxoChunkSize
freemans13 Dec 1, 2025
7e122c2
linting
freemans13 Dec 1, 2025
9ad7e6f
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Dec 1, 2025
156df86
Merge branch 'main' of https://github.com/bsv-blockchain/teranode int…
freemans13 Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
635 changes: 635 additions & 0 deletions docs/utxo-safe-deletion.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions services/blockassembly/BlockAssembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type BlockAssembler struct {

// unminedCleanupTicker manages periodic cleanup of old unmined transactions
unminedCleanupTicker *time.Ticker

// cachedCandidate stores the cached mining candidate
cachedCandidate *CachedMiningCandidate

Expand Down
4 changes: 2 additions & 2 deletions services/p2p/peer_registry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func getPeerRegistryCacheFilePath(configuredDir string) string {

// SavePeerRegistryCache saves the peer registry data to a JSON file
func (pr *PeerRegistry) SavePeerRegistryCache(cacheDir string) error {
pr.mu.RLock()
defer pr.mu.RUnlock()
pr.mu.Lock()
defer pr.mu.Unlock()

cache := &PeerRegistryCache{
Version: PeerRegistryCacheVersion,
Expand Down
3 changes: 0 additions & 3 deletions services/pruner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (s *Server) Init(ctx context.Context) error {
case s.prunerCh <- height32:
s.logger.Debugf("Queued pruning for height %d from BlockPersisted notification", height32)
default:
s.logger.Debugf("Pruning already in progress for height %d", height32)
}
}
}
Expand All @@ -145,7 +144,6 @@ func (s *Server) Init(ctx context.Context) error {
persistedHeight := s.lastPersistedHeight.Load()
if persistedHeight > 0 {
// Block persister is running - BlockPersisted notifications will handle pruning
s.logger.Debugf("Block notification received but block persister is active (persisted height: %d), skipping", persistedHeight)
continue
}

Expand Down Expand Up @@ -186,7 +184,6 @@ func (s *Server) Init(ctx context.Context) error {
case s.prunerCh <- state.CurrentHeight:
s.logger.Debugf("Queued pruning for height %d from Block notification (mined_set=true)", state.CurrentHeight)
default:
s.logger.Debugf("Pruning already in progress for height %d", state.CurrentHeight)
}
}
}
Expand Down
78 changes: 53 additions & 25 deletions services/pruner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,37 +104,65 @@ func (s *Server) prunerProcessor(ctx context.Context) {
continue
}

// Wait for pruner to complete with timeout
// Wait for pruner to complete with optional timeout
prunerTimeout := s.settings.Pruner.JobTimeout
timeoutTimer := time.NewTimer(prunerTimeout)
defer timeoutTimer.Stop()

select {
case status := <-doneCh:
if status != "completed" {
s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status)
prunerErrors.WithLabelValues("dah_pruner").Inc()
} else {
s.logger.Infof("Pruner for height %d completed successfully", latestHeight)
prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds())
prunerProcessed.Inc()
if prunerTimeout == 0 {
// Wait indefinitely (no timeout)
select {
case status := <-doneCh:
if status != "completed" {
s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status)
prunerErrors.WithLabelValues("dah_pruner").Inc()
} else {
// Get job info to log records processed
recordsProcessed := int64(0)
if job := s.prunerService.GetJobByHeight(latestHeight); job != nil {
recordsProcessed = job.RecordsProcessed.Load()
}
s.logger.Infof("Pruner for height %d completed successfully, pruned %d records", latestHeight, recordsProcessed)
prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds())
prunerProcessed.Inc()
}
case <-ctx.Done():
return
}
case <-timeoutTimer.C:
s.logger.Infof("Pruner for height %d exceeded coordinator timeout of %v - pruner continues in background, re-queuing immediately", latestHeight, prunerTimeout)
// Note: This is not an error - the pruner job continues processing in the background.
// The coordinator re-queues immediately to check again.
// Very large pruners may take longer than the timeout and require multiple iterations.
} else {
// Use timeout
timeoutTimer := time.NewTimer(prunerTimeout)
defer timeoutTimer.Stop()

// Immediately re-queue to check again (non-blocking)
select {
case s.prunerCh <- latestHeight:
s.logger.Debugf("Re-queued pruner for height %d after timeout", latestHeight)
default:
// Channel full, will be retried when notifications trigger again
s.logger.Debugf("Pruner channel full, will retry on next notification")
case status := <-doneCh:
if status != "completed" {
s.logger.Warnf("Pruner for height %d finished with status: %s", latestHeight, status)
prunerErrors.WithLabelValues("dah_pruner").Inc()
} else {
// Get job info to log records processed
recordsProcessed := int64(0)
if job := s.prunerService.GetJobByHeight(latestHeight); job != nil {
recordsProcessed = job.RecordsProcessed.Load()
}
s.logger.Infof("Pruner for height %d completed successfully, pruned %d records", latestHeight, recordsProcessed)
prunerDuration.WithLabelValues("dah_pruner").Observe(time.Since(startTime).Seconds())
prunerProcessed.Inc()
}
case <-timeoutTimer.C:
s.logger.Infof("Pruner for height %d exceeded coordinator timeout of %v - pruner continues in background", latestHeight, prunerTimeout)
// Note: This is not an error - the pruner job continues processing in the background.
// Only re-queue if channel is empty (next trigger will catch up if channel has pending jobs).

// Only re-queue if channel is empty (non-blocking check)
select {
case s.prunerCh <- latestHeight:
s.logger.Debugf("Re-queued pruner for height %d (channel was empty)", latestHeight)
default:
// Channel has pending jobs, no need to re-queue
s.logger.Debugf("Pruner channel has pending jobs, skipping re-queue")
}
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}

Expand Down
36 changes: 23 additions & 13 deletions settings.conf
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,29 @@ pruner_grpcListenAddress.docker.host = localhost:${PORT_PREFIX}${PRUNER_GRPC_POR
# Default: 10m
pruner_jobTimeout = 10m

# UTXO-specific pruning settings
# Enable defensive checks before deleting UTXO transactions
# When enabled, verifies that all spending children are mined > BlockHeightRetention blocks ago
# Default: false
pruner_utxoDefensiveEnabled = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Root cause identified:

The SQL pruner service does not have a defensiveEnabled field in its struct (stores/utxo/sql/pruner/pruner_service.go:26-34) and does not read the UTXODefensiveEnabled setting during initialization (pruner_service.go:88-94).

The defensive verification is hardcoded into the SQL DELETE query (lines 230-258) and always runs regardless of the flag value.

Fix needed:
Add defensiveEnabled field to SQL Service struct and conditionally build the query based on this flag, similar to Aerospike implementation at line 483.

Copy link
Contributor

@github-actions github-actions bot Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified and resolved - Both implementations now correctly read and respect the defensiveEnabled flag from settings.


# UTXO batcher settings - optimized for multi-million record pruning operations
# These settings control batching of operations during delete-at-height (DAH) UTXO pruning
# Larger batch sizes = fewer round-trips to storage = faster pruning
pruner_utxoParentUpdateBatcherSize = 2000
pruner_utxoParentUpdateBatcherDurationMillis = 100
pruner_utxoDeleteBatcherSize = 5000
pruner_utxoDeleteBatcherDurationMillis = 100

# Maximum concurrent operations during UTXO pruning
# Increase this if you have a large connection pool and want faster pruning
# Default: 1 (auto-detect from connection pool)
pruner_utxoMaxConcurrentOperations = 1

# Batch size for reading child transactions during defensive UTXO pruning
# Default: 1024
pruner_utxoDefensiveBatchReadSize = 1024

# @group: dashboard
# Vite dev server ports (comma-separated)
# dashboard_devServerPorts = 5173,4173
Expand Down Expand Up @@ -1493,19 +1516,6 @@ utxostore_storeBatcherDurationMillis = 10

utxostore_storeBatcherSize = 2048

# Pruner batcher settings - optimized for multi-million record pruning operations
# These settings control batching of operations during delete-at-height (DAH) pruning
# Larger batch sizes = fewer round-trips to Aerospike = faster pruning
utxostore_prunerParentUpdateBatcherSize = 2000
utxostore_prunerParentUpdateBatcherDurationMillis = 100
utxostore_prunerDeleteBatcherSize = 5000
utxostore_prunerDeleteBatcherDurationMillis = 100

# Maximum concurrent operations during pruning (0 = use Aerospike connection queue size)
# Increase this if you have a large connection pool and want faster pruning
# Default: 0 (auto-detect from connection pool)
utxostore_prunerMaxConcurrentOperations = 0

# this value determines if a caching mechanism will be used for external transactions
# if you have the memory available, it will speed up your IBD
# and it is required for large blocks which load in the same tx multiple times, e.g. 814337
Expand Down
19 changes: 9 additions & 10 deletions settings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,6 @@ type UtxoStoreSettings struct {
MaxMinedBatchSize int
BlockHeightRetentionAdjustment int32 // Adjustment to GlobalBlockHeightRetention (can be positive or negative)
DisableDAHCleaner bool // Disable the DAH cleaner process completely
// Pruner-specific settings
PrunerParentUpdateBatcherSize int // Batch size for parent record updates during pruning
PrunerParentUpdateBatcherDurationMillis int // Batch duration for parent record updates during pruning (ms)
PrunerDeleteBatcherSize int // Batch size for record deletions during pruning
PrunerDeleteBatcherDurationMillis int // Batch duration for record deletions during pruning (ms)
PrunerMaxConcurrentOperations int // Maximum concurrent operations during pruning (0 = use connection queue size)
}

type P2PSettings struct {
Expand Down Expand Up @@ -489,10 +483,15 @@ type CoinbaseSettings struct {
}

type PrunerSettings struct {
GRPCListenAddress string
GRPCAddress string
WorkerCount int
JobTimeout time.Duration // Timeout for waiting for pruner job completion
GRPCListenAddress string
GRPCAddress string
WorkerCount int // Number of worker goroutines (default: 1)
JobTimeout time.Duration // Timeout for waiting for pruner job completion (0 = wait indefinitely)
UTXODefensiveEnabled bool // Enable defensive checks before deleting UTXO transactions (verify children are mined > BlockHeightRetention blocks ago)
UTXODefensiveBatchReadSize int // Batch size for reading child transactions during defensive UTXO pruning (default: 10000)
UTXOChunkSize int // Number of records to process in each chunk before batch flushing (default: 1000)
UTXOChunkGroupLimit int // Maximum parallel chunk processing during UTXO pruning (default: 10)
UTXOProgressLogInterval time.Duration // Interval for logging progress during UTXO pruning (default: 30s)
}

type SubtreeValidationSettings struct {
Expand Down
19 changes: 9 additions & 10 deletions settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,6 @@ func NewSettings(alternativeContext ...string) *Settings {
MaxMinedBatchSize: getInt("utxostore_maxMinedBatchSize", 1024, alternativeContext...),
BlockHeightRetentionAdjustment: getInt32("utxostore_blockHeightRetentionAdjustment", 0, alternativeContext...),
DisableDAHCleaner: getBool("utxostore_disableDAHCleaner", false, alternativeContext...),
// Pruner-specific settings optimized for multi-million record pruning operations
PrunerParentUpdateBatcherSize: getInt("utxostore_prunerParentUpdateBatcherSize", 2000, alternativeContext...),
PrunerParentUpdateBatcherDurationMillis: getInt("utxostore_prunerParentUpdateBatcherDurationMillis", 100, alternativeContext...),
PrunerDeleteBatcherSize: getInt("utxostore_prunerDeleteBatcherSize", 5000, alternativeContext...),
PrunerDeleteBatcherDurationMillis: getInt("utxostore_prunerDeleteBatcherDurationMillis", 100, alternativeContext...),
PrunerMaxConcurrentOperations: getInt("utxostore_prunerMaxConcurrentOperations", 0, alternativeContext...),
},
P2P: P2PSettings{
BlockTopic: getString("p2p_block_topic", "", alternativeContext...),
Expand Down Expand Up @@ -433,10 +427,15 @@ func NewSettings(alternativeContext ...string) *Settings {
DistributorTimeout: getDuration("distributor_timeout", 30*time.Second, alternativeContext...),
},
Pruner: PrunerSettings{
GRPCAddress: getString("pruner_grpcAddress", "localhost:8096", alternativeContext...),
GRPCListenAddress: getString("pruner_grpcListenAddress", ":8096", alternativeContext...),
WorkerCount: getInt("pruner_workerCount", 4, alternativeContext...), // Default to 4 workers
JobTimeout: getDuration("pruner_jobTimeout", 10*time.Minute, alternativeContext...),
GRPCAddress: getString("pruner_grpcAddress", "localhost:8096", alternativeContext...),
GRPCListenAddress: getString("pruner_grpcListenAddress", ":8096", alternativeContext...),
WorkerCount: getInt("pruner_workerCount", 1, alternativeContext...), // Single job at a time
JobTimeout: getDuration("pruner_jobTimeout", 0, alternativeContext...), // 0 = wait indefinitely
UTXODefensiveEnabled: getBool("pruner_utxoDefensiveEnabled", false, alternativeContext...), // Defensive mode off by default (production)
UTXODefensiveBatchReadSize: getInt("pruner_utxoDefensiveBatchReadSize", 10000, alternativeContext...), // Batch size for child verification
UTXOChunkSize: getInt("pruner_utxoChunkSize", 1000, alternativeContext...), // Chunk size for batch operations
UTXOChunkGroupLimit: getInt("pruner_utxoChunkGroupLimit", 10, alternativeContext...), // Process 10 chunks in parallel
UTXOProgressLogInterval: getDuration("pruner_utxoProgressLogInterval", 30*time.Second, alternativeContext...), // Progress every 30s
},
SubtreeValidation: SubtreeValidationSettings{
QuorumAbsoluteTimeout: getDuration("subtree_quorum_absolute_timeout", 30*time.Second, alternativeContext...),
Expand Down
3 changes: 3 additions & 0 deletions stores/pruner/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Service interface {
// SetPersistedHeightGetter sets the function used to get block persister progress.
// This allows pruner to coordinate with block persister to avoid premature deletion.
SetPersistedHeightGetter(getter func() uint32)

// GetJobByHeight returns a job for the specified block height
GetJobByHeight(blockHeight uint32) *Job
}

// PrunerServiceProvider defines an interface for stores that can provide a pruner service.
Expand Down
19 changes: 10 additions & 9 deletions stores/pruner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ func (s JobStatus) String() string {

// Job represents a pruner job
type Job struct {
BlockHeight uint32
status atomic.Int32 // Using atomic for thread-safe access
Error error
Created time.Time
Started time.Time
Ended time.Time
ctx context.Context
cancel context.CancelFunc
DoneCh chan string // Channel to signal when the job is complete (for testing purposes)
BlockHeight uint32
status atomic.Int32 // Using atomic for thread-safe access
RecordsProcessed atomic.Int64 // Number of records processed/pruned
Error error
Created time.Time
Started time.Time
Ended time.Time
ctx context.Context
cancel context.CancelFunc
DoneCh chan string // Channel to signal when the job is complete (for testing purposes)
}

// GetStatus returns the current status of the job
Expand Down
14 changes: 14 additions & 0 deletions stores/pruner/job_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ func (m *JobManager) GetJobs() []*Job {
return clone
}

// GetJobByHeight returns a job for the specified block height
func (m *JobManager) GetJobByHeight(blockHeight uint32) *Job {
m.jobsMutex.RLock()
defer m.jobsMutex.RUnlock()

for _, job := range m.jobs {
if job.BlockHeight == blockHeight {
return job
}
}

return nil
}

// worker processes jobs from the queue
func (m *JobManager) worker(workerID int) {
m.logger.Debugf("[JobManager] Worker %d started", workerID)
Expand Down
Loading