Skip to content

Commit 07a7dec

Browse files
committed
Add semaphore to external tx storing in Aerospike to prevent locking timeout issues
1 parent fdbaa27 commit 07a7dec

File tree

4 files changed

+22
-6
lines changed

4 files changed

+22
-6
lines changed

settings/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ type UtxoStoreSettings struct {
372372
DBTimeout time.Duration
373373
UseExternalTxCache bool
374374
ExternalizeAllTransactions bool
375+
ExternalStoreConcurrency int // Maximum concurrent external storage operations (0 = unlimited)
375376
PostgresMaxIdleConns int
376377
PostgresMaxOpenConns int
377378
VerboseDebug bool

settings/settings.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ func NewSettings(alternativeContext ...string) *Settings {
357357
DBTimeout: getDuration("utxostore_dbTimeoutDuration", 5*time.Second, alternativeContext...),
358358
UseExternalTxCache: getBool("utxostore_useExternalTxCache", true, alternativeContext...),
359359
ExternalizeAllTransactions: getBool("utxostore_externalizeAllTransactions", false, alternativeContext...),
360+
ExternalStoreConcurrency: getInt("utxostore_externalStoreConcurrency", 64, alternativeContext...),
360361
PostgresMaxIdleConns: getInt("utxostore_utxo_postgresMaxIdleConns", 10, alternativeContext...),
361362
PostgresMaxOpenConns: getInt("utxostore_utxo_postgresMaxOpenConns", 80, alternativeContext...),
362363
VerboseDebug: getBool("utxostore_verbose_debug", false, alternativeContext...),

stores/utxo/aerospike/aerospike.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ type Store struct {
133133
externalStore blob.Store
134134
utxoBatchSize int
135135
externalTxCache *util.ExpiringConcurrentCache[chainhash.Hash, *bt.Tx]
136-
indexMutex sync.Mutex // Mutex for index creation operations
137-
indexOnce sync.Once // Ensures index creation/wait is only done once per process
136+
externalStoreSem chan struct{} // Semaphore to limit concurrent external storage operations
137+
indexMutex sync.Mutex // Mutex for index creation operations
138+
indexOnce sync.Once // Ensures index creation/wait is only done once per process
138139
}
139140

140141
// New creates a new Aerospike-based UTXO store.
@@ -191,6 +192,12 @@ func New(ctx context.Context, logger ulogger.Logger, tSettings *settings.Setting
191192
externalTxCache = util.NewExpiringConcurrentCache[chainhash.Hash, *bt.Tx](10 * time.Second)
192193
}
193194

195+
// Initialize external store semaphore if concurrency limit is set
196+
var externalStoreSem chan struct{}
197+
if tSettings.UtxoStore.ExternalStoreConcurrency > 0 {
198+
externalStoreSem = make(chan struct{}, tSettings.UtxoStore.ExternalStoreConcurrency)
199+
}
200+
194201
s := &Store{
195202
ctx: ctx,
196203
url: aerospikeURL,
@@ -199,10 +206,11 @@ func New(ctx context.Context, logger ulogger.Logger, tSettings *settings.Setting
199206
setName: setName,
200207
logger: logger,
201208

202-
settings: tSettings,
203-
externalStore: externalStore,
204-
utxoBatchSize: utxoBatchSize,
205-
externalTxCache: externalTxCache,
209+
settings: tSettings,
210+
externalStore: externalStore,
211+
utxoBatchSize: utxoBatchSize,
212+
externalTxCache: externalTxCache,
213+
externalStoreSem: externalStoreSem,
206214
}
207215

208216
// Ensure index creation/wait is only done once per process

stores/utxo/aerospike/create.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,12 @@ func (s *Store) storeExternallyWithLock(
833833
fileType fileformat.FileType,
834834
funcName string,
835835
) {
836+
// Acquire semaphore to limit concurrent external storage operations
837+
if s.externalStoreSem != nil {
838+
s.externalStoreSem <- struct{}{}
839+
defer func() { <-s.externalStoreSem }()
840+
}
841+
836842
// Acquire lock FIRST to prevent duplicate work
837843
lockKey, err := s.acquireLock(bItem.txHash, len(binsToStore))
838844
if err != nil {

0 commit comments

Comments
 (0)