Skip to content

Commit d638f27

Browse files
authored
Performance improvements in external files handling (#253)
1 parent a3c3c65 commit d638f27

File tree

6 files changed

+39
-6
lines changed

6 files changed

+39
-6
lines changed

settings.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,12 @@ utxostore_prunerMaxConcurrentOperations = 0
15111511
# and it is required for large blocks which load in the same tx multiple times, e.g. 814337
15121512
utxostore_useExternalTxCache = true
15131513

1514+
# Maximum concurrent external storage operations (multi-record transactions)
1515+
# Set to 0 for unlimited. Lower values prevent overwhelming Aerospike connection pool.
1516+
# Should be less than or equal to Aerospike ConnectionQueueSize.
1517+
utxostore_externalStoreConcurrency = 16
1518+
utxostore_externalStoreConcurrency.docker.m = 4
1519+
15141520
# utxos per record, if larger it will externalize the transaction
15151521
# ! do not change this after starting a node !
15161522
utxostore_utxoBatchSize = 128

settings/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ type UtxoStoreSettings struct {
372372
DBTimeout time.Duration
373373
UseExternalTxCache bool
374374
ExternalizeAllTransactions bool
375+
ExternalStoreConcurrency int // Maximum concurrent external storage operations (0 = unlimited)
375376
PostgresMaxIdleConns int
376377
PostgresMaxOpenConns int
377378
VerboseDebug bool

settings/settings.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ func NewSettings(alternativeContext ...string) *Settings {
357357
DBTimeout: getDuration("utxostore_dbTimeoutDuration", 5*time.Second, alternativeContext...),
358358
UseExternalTxCache: getBool("utxostore_useExternalTxCache", true, alternativeContext...),
359359
ExternalizeAllTransactions: getBool("utxostore_externalizeAllTransactions", false, alternativeContext...),
360+
ExternalStoreConcurrency: getInt("utxostore_externalStoreConcurrency", 16, alternativeContext...),
360361
PostgresMaxIdleConns: getInt("utxostore_utxo_postgresMaxIdleConns", 10, alternativeContext...),
361362
PostgresMaxOpenConns: getInt("utxostore_utxo_postgresMaxOpenConns", 80, alternativeContext...),
362363
VerboseDebug: getBool("utxostore_verbose_debug", false, alternativeContext...),

stores/utxo/aerospike/aerospike.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ type Store struct {
133133
externalStore blob.Store
134134
utxoBatchSize int
135135
externalTxCache *util.ExpiringConcurrentCache[chainhash.Hash, *bt.Tx]
136-
indexMutex sync.Mutex // Mutex for index creation operations
137-
indexOnce sync.Once // Ensures index creation/wait is only done once per process
136+
externalStoreSem chan struct{} // Semaphore to limit concurrent external storage operations
137+
indexMutex sync.Mutex // Mutex for index creation operations
138+
indexOnce sync.Once // Ensures index creation/wait is only done once per process
138139
}
139140

140141
// New creates a new Aerospike-based UTXO store.
@@ -191,6 +192,12 @@ func New(ctx context.Context, logger ulogger.Logger, tSettings *settings.Setting
191192
externalTxCache = util.NewExpiringConcurrentCache[chainhash.Hash, *bt.Tx](10 * time.Second)
192193
}
193194

195+
// Initialize external store semaphore if concurrency limit is set
196+
var externalStoreSem chan struct{}
197+
if tSettings.UtxoStore.ExternalStoreConcurrency > 0 {
198+
externalStoreSem = make(chan struct{}, tSettings.UtxoStore.ExternalStoreConcurrency)
199+
}
200+
194201
s := &Store{
195202
ctx: ctx,
196203
url: aerospikeURL,
@@ -199,10 +206,11 @@ func New(ctx context.Context, logger ulogger.Logger, tSettings *settings.Setting
199206
setName: setName,
200207
logger: logger,
201208

202-
settings: tSettings,
203-
externalStore: externalStore,
204-
utxoBatchSize: utxoBatchSize,
205-
externalTxCache: externalTxCache,
209+
settings: tSettings,
210+
externalStore: externalStore,
211+
utxoBatchSize: utxoBatchSize,
212+
externalTxCache: externalTxCache,
213+
externalStoreSem: externalStoreSem,
206214
}
207215

208216
// Ensure index creation/wait is only done once per process

stores/utxo/aerospike/create.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,12 @@ func (s *Store) storeExternallyWithLock(
830830
fileType fileformat.FileType,
831831
funcName string,
832832
) {
833+
// Acquire semaphore to limit concurrent external storage operations
834+
if s.externalStoreSem != nil {
835+
s.externalStoreSem <- struct{}{}
836+
defer func() { <-s.externalStoreSem }()
837+
}
838+
833839
// Acquire lock FIRST to prevent duplicate work
834840
lockKey, err := s.acquireLock(bItem.txHash, len(binsToStore))
835841
if err != nil {

stores/utxo/aerospike/get.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,17 @@ func (s *Store) GetTxFromExternalStore(ctx context.Context, previousTxHash chain
13161316
tracing.WithHistogram(prometheusTxMetaAerospikeMapGetExternal),
13171317
)
13181318

1319+
if s.externalTxCache != nil {
1320+
return s.externalTxCache.GetOrSet(previousTxHash, func() (*bt.Tx, bool, error) {
1321+
tx, err := s.getExternalTransaction(ctx, previousTxHash)
1322+
if err != nil {
1323+
return nil, false, err
1324+
}
1325+
1326+
return tx, true, nil
1327+
})
1328+
}
1329+
13191330
return s.getExternalTransaction(ctx, previousTxHash)
13201331
}
13211332

0 commit comments

Comments
 (0)