Skip to content

Commit 5dfbc51

Browse files
authored
blob-batcher-idle-cpu-optimisation (#217)
1 parent 587626c commit 5dfbc51

File tree

3 files changed

+51
-29
lines changed

3 files changed

+51
-29
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,13 @@ require (
173173
require (
174174
github.com/DATA-DOG/go-sqlmock v1.5.2
175175
github.com/bitcoin-sv/alert-system v0.2.0
176-
github.com/bsv-blockchain/go-batcher v1.1.0
176+
github.com/bsv-blockchain/go-batcher v1.2.5
177177
github.com/bsv-blockchain/go-bn v1.0.3
178178
github.com/bsv-blockchain/go-lockfree-queue v1.0.0
179179
github.com/bsv-blockchain/go-p2p-message-bus v0.1.7
180180
github.com/bsv-blockchain/go-safe-conversion v1.1.0
181181
github.com/bsv-blockchain/go-sdk v1.2.11
182-
github.com/bsv-blockchain/go-tx-map v1.2.0
182+
github.com/bsv-blockchain/go-tx-map v1.2.1
183183
github.com/bsv-blockchain/go-wire v1.0.6
184184
github.com/felixge/fgprof v0.9.5
185185
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENU
140140
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
141141
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
142142
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
143-
github.com/bsv-blockchain/go-batcher v1.1.0 h1:LntvZyDY/0ySmVaITYtHD6e5ydkcc67WoSqWc5oo4sU=
144-
github.com/bsv-blockchain/go-batcher v1.1.0/go.mod h1:69pxNteTblVliQot7VRzVoPW+5F1NSsXvgdVZw7eUZk=
143+
github.com/bsv-blockchain/go-batcher v1.2.5 h1:zaGQgUV9Pl8nY2pduEsiic7CTOhDDni7S5ZgWK6tRBc=
144+
github.com/bsv-blockchain/go-batcher v1.2.5/go.mod h1:P98X4sdCGGSeBEDfpQE/YzD5tn/nyDXwhyTfcqLpg3w=
145145
github.com/bsv-blockchain/go-bc v1.0.2 h1:D1PIYI6Q3Jwdhkp2cucf91vmsPXgXtbGkFpk+xpsD0Y=
146146
github.com/bsv-blockchain/go-bc v1.0.2/go.mod h1:h3/1KfzgrDos3+bLGWsHkGYoFWcWZ2Vmep2r2AoE3AA=
147147
github.com/bsv-blockchain/go-bn v1.0.3 h1:98+8/mmj8V6nzpvC6e9WvH4MXc6GtkG0JYL/OMPIU3E=
@@ -160,8 +160,8 @@ github.com/bsv-blockchain/go-sdk v1.2.11 h1:SK8kDuDZNP3ubvx0AL0bR/I8tXWljJICyUsi
160160
github.com/bsv-blockchain/go-sdk v1.2.11/go.mod h1:S+8iokWX2la9G4mzwHIeCvYkADRzcdfk1AprN0z5MDI=
161161
github.com/bsv-blockchain/go-subtree v1.1.2 h1:evQ961Cku9wIrKA5qjvn2t0nfUL/7rBPqHth2eZRso4=
162162
github.com/bsv-blockchain/go-subtree v1.1.2/go.mod h1:e+VXWba1DoKZ05LiBU0N6FKZl6cD12yUNE0SrjtJup8=
163-
github.com/bsv-blockchain/go-tx-map v1.2.0 h1:HF8XTLrl5YGpEeWmsoO58w/XSNKJy49DYgrt/0EQHn4=
164-
github.com/bsv-blockchain/go-tx-map v1.2.0/go.mod h1:sjsSHrl5HNT+0p1AeS/6CE7Ds4V4Kjn9PRBcKB3ozMc=
163+
github.com/bsv-blockchain/go-tx-map v1.2.1 h1:uzKryYn4uMCR3ko6N71TyUrKfAnRvwG5Mdm1ufCStyo=
164+
github.com/bsv-blockchain/go-tx-map v1.2.1/go.mod h1:4NsZBCM6bFNwYEc7J4C6jiNdm5aY/rtRVXqpG7PgbX4=
165165
github.com/bsv-blockchain/go-wire v1.0.6 h1:rMVASfuXtrZB1ZaZEl+/tvmXfdPMf4KY8Pew7/VeyQ0=
166166
github.com/bsv-blockchain/go-wire v1.0.6/go.mod h1:Jp6ekSmh/KZL1Gm/OPmbyMspNsficSgjXxlJ6bFD0Hs=
167167
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=

stores/blob/batcher/batcher.go

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/binary"
2020
"encoding/hex"
2121
"io"
22+
"sync"
2223
"time"
2324

2425
"github.com/bsv-blockchain/go-bt/v2/chainhash"
@@ -47,10 +48,12 @@ type Batcher struct {
4748
writeKeys bool
4849
// queue is a lock-free queue for storing batch items to be processed asynchronously
4950
queue *lockfreequeue.LockFreeQ[BatchItem]
50-
// queueCtx is the context for controlling the background batch processing goroutine
51-
queueCtx context.Context
52-
// queueCancel is the function to cancel the queue context and stop background processing
53-
queueCancel context.CancelFunc
51+
// done is the channel for signaling the background batch processing goroutine to stop
52+
done chan struct{}
53+
// notifyCh is used to notify the worker goroutine when new items are enqueued
54+
notifyCh chan struct{}
55+
// wg is used to wait for the background worker goroutine to complete during shutdown
56+
wg sync.WaitGroup
5457
// currentBatch holds the accumulated blob data for the current batch
5558
currentBatch []byte
5659
// currentBatchKeys holds the accumulated key data for the current batch (if writeKeys is true)
@@ -94,28 +97,41 @@ type blobStoreSetter interface {
9497
// Returns:
9598
// - *Batcher: A configured batcher instance ready to accept blob operations
9699
func New(logger ulogger.Logger, blobStore blobStoreSetter, sizeInBytes int, writeKeys bool) *Batcher {
97-
ctx, cancel := context.WithCancel(context.Background())
98100
b := &Batcher{
99101
logger: logger,
100102
blobStore: blobStore,
101103
sizeInBytes: sizeInBytes,
102104
writeKeys: writeKeys,
103105
queue: lockfreequeue.NewLockFreeQ[BatchItem](),
104-
queueCtx: ctx,
105-
queueCancel: cancel,
106+
done: make(chan struct{}),
107+
notifyCh: make(chan struct{}, 1),
106108
currentBatch: make([]byte, 0, sizeInBytes),
107109
currentBatchKeys: make([]byte, 0, sizeInBytes),
108110
}
109111

112+
b.wg.Add(1)
110113
go func() {
114+
defer b.wg.Done()
115+
111116
var (
112117
batchItem *BatchItem
113118
err error
114119
)
115120

116121
for {
122+
// Try immediate dequeue (optimistic fast path)
123+
batchItem = b.queue.Dequeue()
124+
if batchItem != nil {
125+
err = b.processBatchItem(batchItem)
126+
if err != nil {
127+
b.logger.Errorf("error processing batch item: %v", err)
128+
}
129+
continue
130+
}
131+
132+
// Queue is empty - wait for notification or shutdown
117133
select {
118-
case <-b.queueCtx.Done():
134+
case <-b.done:
119135
// Process remaining items before exiting
120136
for {
121137
batchItem = b.queue.Dequeue()
@@ -133,19 +149,11 @@ func New(logger ulogger.Logger, blobStore blobStoreSetter, sizeInBytes int, writ
133149
b.logger.Errorf("error writing final batch during shutdown: %v", err)
134150
}
135151
}
136-
137152
return
138-
default:
139-
batchItem = b.queue.Dequeue()
140-
if batchItem == nil {
141-
time.Sleep(10 * time.Millisecond)
142-
continue
143-
}
144153

145-
err = b.processBatchItem(batchItem)
146-
if err != nil {
147-
b.logger.Errorf("error processing batch item: %v", err)
148-
}
154+
case <-b.notifyCh:
155+
// Item available, loop back to dequeue
156+
continue
149157
}
150158
}
151159
}()
@@ -235,7 +243,9 @@ func (b *Batcher) writeBatch(currentBatch []byte, batchKeys []byte) error {
235243
binary.BigEndian.PutUint32(batchKey, timeUint32)
236244
// add a random string as the next bytes, to prevent conflicting filenames from other pods
237245
randBytes := make([]byte, 4)
238-
_, _ = rand.Read(randBytes)
246+
if _, err := rand.Read(randBytes); err != nil {
247+
return errors.NewStorageError("failed to generate random bytes for batch key", err)
248+
}
239249
batchKey = append(batchKey, randBytes...)
240250

241251
g, gCtx := errgroup.WithContext(context.Background())
@@ -298,10 +308,16 @@ func (b *Batcher) Health(ctx context.Context, checkLiveness bool) (int, string,
298308
// - error: Any error that occurred during shutdown
299309
func (b *Batcher) Close(_ context.Context) error {
300310
// Signal the background goroutine to stop
301-
b.queueCancel()
311+
close(b.done)
302312

303-
// Wait a bit to ensure the goroutine has time to process remaining items
304-
time.Sleep(100 * time.Millisecond)
313+
// Wake up the worker if it's blocked on notifyCh
314+
select {
315+
case b.notifyCh <- struct{}{}:
316+
default:
317+
}
318+
319+
// Wait for the background goroutine to finish processing all remaining items
320+
b.wg.Wait()
305321

306322
return nil
307323
}
@@ -351,6 +367,12 @@ func (b *Batcher) Set(_ context.Context, hash []byte, fileType fileformat.FileTy
351367
value: value,
352368
})
353369

370+
// Notify worker that new item is available (non-blocking)
371+
select {
372+
case b.notifyCh <- struct{}{}:
373+
default: // Already notified, don't block
374+
}
375+
354376
return nil
355377
}
356378

0 commit comments

Comments
 (0)