Skip to content

Commit 56ea26c

Browse files
authored
feat: fill the gap between bulk & live (#696)
1 parent e2280c3 commit 56ea26c

File tree

6 files changed

+241
-29
lines changed

6 files changed

+241
-29
lines changed

pkg/services/chaintracks/bulk_headers_container.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,41 @@ func (b *bulkHeadersContainer) GetFileDataByIndex(fileID int) (*ingest.BulkFileD
250250

251251
return &b.GeneratedFileData[fileID], nil
252252
}
253+
254+
func (b *bulkHeadersContainer) LastHeader() (*wdk.ChainBlockHeader, *internal.ChainWork, error) {
255+
length := len(b.chunks)
256+
if length == 0 {
257+
return nil, nil, fmt.Errorf("no chunks available to retrieve last header")
258+
}
259+
260+
lastChunk := b.chunks[length-1]
261+
headerCount := len(lastChunk.data) / 80
262+
if headerCount == 0 {
263+
return nil, nil, fmt.Errorf("last chunk contains no headers")
264+
}
265+
266+
headerDataStart := (headerCount - 1) * 80
267+
headerDataEnd := headerDataStart + 80
268+
269+
headerData := lastChunk.data[headerDataStart:headerDataEnd]
270+
271+
baseBlockHeader, err := wdk.ChainBaseBlockHeaderFromBytes(headerData)
272+
if err != nil {
273+
return nil, nil, fmt.Errorf("failed to parse last block header: %w", err)
274+
}
275+
276+
blockHash, err := baseBlockHeader.CalculateHash()
277+
if err != nil {
278+
return nil, nil, fmt.Errorf("failed to compute hash for last block header: %w", err)
279+
}
280+
281+
height := b.MaxHeightAtChunk(length - 1)
282+
283+
header := &wdk.ChainBlockHeader{
284+
ChainBaseBlockHeader: *baseBlockHeader,
285+
Height: must.ConvertToUInt(height),
286+
Hash: blockHash.String(),
287+
}
288+
289+
return header, &lastChunk.LastChainWork, nil
290+
}

pkg/services/chaintracks/bulk_manager.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs"
1010
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
1111
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/ingest"
12+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/internal"
1213
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
1314
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk"
1415
)
@@ -33,9 +34,18 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor, chai
3334
}
3435

3536
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error {
37+
if presentHeight <= liveHeightThreshold {
38+
bm.logger.Info("Skipping bulk synchronization - present height below live height threshold", slog.Any("present_height", presentHeight), slog.Any("live_height_threshold", liveHeightThreshold))
39+
return nil
40+
}
41+
3642
bm.logger.Info("Starting bulk synchronization", slog.Any("present_height", presentHeight), slog.Any("initial_ranges", initialRanges))
3743

38-
missingRange := models.NewHeightRange(0, presentHeight)
44+
missingRange, err := models.NewHeightRange(0, presentHeight-liveHeightThreshold).Subtract(initialRanges.Bulk)
45+
if err != nil {
46+
return fmt.Errorf("failed to compute missing bulk range: %w", err)
47+
}
48+
3949
for _, ingestor := range bm.bulkIngestors {
4050
if missingRange.IsEmpty() {
4151
break
@@ -47,7 +57,7 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint,
4757
return fmt.Errorf("bulk synchronization failed for ingestor %s: %w", ingestor.Name, err)
4858
}
4959

50-
if err := bm.processBulkChunks(ctx, bulkChunks, downloader); err != nil {
60+
if err := bm.processBulkChunks(ctx, bulkChunks, downloader, missingRange.MaxHeight); err != nil {
5161
return fmt.Errorf("failed to process bulk chunks from ingestor %s: %w", ingestor.Name, err)
5262
}
5363

@@ -80,6 +90,13 @@ func (bm *bulkManager) FindHeaderForHeight(height uint) (*wdk.ChainBlockHeader,
8090
return bm.container.FindHeaderForHeight(height)
8191
}
8292

93+
func (bm *bulkManager) LastHeader() (*wdk.ChainBlockHeader, *internal.ChainWork, error) {
94+
bm.locker.RLock()
95+
defer bm.locker.RUnlock()
96+
97+
return bm.container.LastHeader()
98+
}
99+
83100
func (bm *bulkManager) FilesInfo() *ingest.BulkHeaderFilesInfo {
84101
bm.locker.RLock()
85102
defer bm.locker.RUnlock()
@@ -97,7 +114,7 @@ func (bm *bulkManager) GetFileDataByIndex(fileID int) (*ingest.BulkFileData, err
97114
return bm.container.GetFileDataByIndex(fileID)
98115
}
99116

100-
func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []ingest.BulkHeaderMinimumInfo, downloader ingest.BulkFileDownloader) error {
117+
func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []ingest.BulkHeaderMinimumInfo, downloader ingest.BulkFileDownloader, maxHeight uint) error {
101118
chunksToLoad := bm.getChunksToLoad(bulkChunks)
102119
type chunkWithInfo struct {
103120
data []byte
@@ -130,7 +147,12 @@ func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []inges
130147
continue
131148
}
132149

133-
if err := bm.container.Add(ctx, fileData.data, fileData.info.ToHeightRange()); err != nil {
150+
dataRange := fileData.info.ToHeightRange()
151+
if dataRange.MaxHeight > maxHeight {
152+
dataRange.MaxHeight = maxHeight
153+
}
154+
155+
if err := bm.container.Add(ctx, fileData.data, dataRange); err != nil {
134156
return fmt.Errorf("failed to add bulk file %v to container: %w", fileData.info, err)
135157
}
136158
}
@@ -157,3 +179,82 @@ func (bm *bulkManager) shouldAddNewFile(info *ingest.BulkHeaderMinimumInfo) bool
157179
rangeToAdd := info.ToHeightRange().Above(currentRange)
158180
return !rangeToAdd.IsEmpty()
159181
}
182+
183+
func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) ([]wdk.ChainBlockHeader, error) {
184+
var newLiveHeaders []wdk.ChainBlockHeader
185+
maxBulkHeight := bm.GetHeightRange().MaxHeight
186+
minLiveHeight := presentHeight
187+
if liveInitialRange.NotEmpty() {
188+
minLiveHeight = liveInitialRange.MinHeight
189+
}
190+
191+
if minLiveHeight <= maxBulkHeight || minLiveHeight < maxBulkHeight+addLiveRecursionLimit {
192+
// no gap to fill
193+
return nil, nil
194+
}
195+
196+
// use bulk ingestors to fill the gap and treat fetched headers as live headers
197+
missingRange := models.NewHeightRange(maxBulkHeight+1, minLiveHeight-1)
198+
199+
for _, ingestor := range bm.bulkIngestors {
200+
if missingRange.IsEmpty() {
201+
break
202+
}
203+
204+
bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, missingRange)
205+
if err != nil {
206+
bm.logger.Error("Chaintracks service - error during bulk synchronization to fill gap", slog.String("ingestor_name", ingestor.Name), slog.String("error", err.Error()))
207+
return nil, fmt.Errorf("bulk synchronization to fill gap failed for ingestor %s: %w", ingestor.Name, err)
208+
}
209+
210+
for _, chunk := range bulkChunks {
211+
intersection := chunk.ToHeightRange().Intersect(missingRange)
212+
if intersection.IsEmpty() {
213+
continue
214+
}
215+
216+
data, err := downloader(ctx, chunk)
217+
if err != nil {
218+
return nil, fmt.Errorf("failed to download bulk file %v to fill gap: %w", chunk, err)
219+
}
220+
221+
if err := chunk.Validate(data); err != nil {
222+
return nil, fmt.Errorf("downloaded bulk file %v to fill gap is invalid: %w", chunk, err)
223+
}
224+
225+
minIndex := intersection.MinHeight - chunk.FirstHeight
226+
maxIndex := intersection.MaxHeight - chunk.FirstHeight
227+
228+
for i := minIndex; i <= maxIndex; i++ {
229+
startByte := i * 80
230+
endByte := startByte + 80
231+
headerData := data[startByte:endByte]
232+
233+
baseHeader, err := wdk.ChainBaseBlockHeaderFromBytes(headerData)
234+
if err != nil {
235+
return nil, fmt.Errorf("failed to parse block header at height %d from bulk file %v: %w", chunk.FirstHeight+i, chunk, err)
236+
}
237+
238+
blockHash, err := baseHeader.CalculateHash()
239+
if err != nil {
240+
return nil, fmt.Errorf("failed to compute hash for block header at height %d from bulk file %v: %w", chunk.FirstHeight+i, chunk, err)
241+
}
242+
243+
header := wdk.ChainBlockHeader{
244+
ChainBaseBlockHeader: *baseHeader,
245+
Height: chunk.FirstHeight + i,
246+
Hash: blockHash.String(),
247+
}
248+
249+
newLiveHeaders = append(newLiveHeaders, header)
250+
}
251+
252+
missingRange, err = missingRange.Subtract(intersection)
253+
if err != nil {
254+
return nil, fmt.Errorf("failed to compute missing range after filling gap with ingestor %s: %w", ingestor.Name, err)
255+
}
256+
}
257+
}
258+
259+
return newLiveHeaders, nil
260+
}

pkg/services/chaintracks/chaintracks_service.go

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
)
2121

2222
const (
23+
// TODO: constants below, can be made configurable if needed
2324
liveHeadersChanSize = 1000
2425
lastPresentHeightTTL = 60 * time.Second
2526
cdnSyncRepeatDuration = 24 * time.Hour
2627
syncCheckInterval = 1 * time.Second
2728
addLiveRecursionLimit = 11
2829
halfLiveRecursionLimit = addLiveRecursionLimit / 2
30+
liveHeightThreshold = 2000
2931
)
3032

3133
// Service provides core functionality for the Chaintracks service with logging and configuration support.
@@ -367,6 +369,10 @@ func (s *Service) shiftLiveHeaders(ctx context.Context) error {
367369
return fmt.Errorf("bulk synchronization failed during live headers shift: %w", err)
368370
}
369371

372+
if err := s.fillGapLiveHeaders(ctx, presentHeight, before.Live); err != nil {
373+
return fmt.Errorf("failed to fill gap live headers during live headers shift: %w", err)
374+
}
375+
370376
if err := s.processHeaders(ctx); err != nil {
371377
return fmt.Errorf("failed to process live headers during live headers shift: %w", err)
372378
}
@@ -381,6 +387,25 @@ func (s *Service) skipBulkSync(presentHeight uint, ranges models.HeightRanges) b
381387
return ranges.Live.NotEmpty() && ranges.Live.MaxHeight >= presentHeight-halfLiveRecursionLimit
382388
}
383389

390+
func (s *Service) fillGapLiveHeaders(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) error {
391+
gapHeaders, err := s.bulkMgr.GetGapHeadersAsLive(ctx, presentHeight, liveInitialRange)
392+
if err != nil {
393+
return fmt.Errorf("failed to get gap headers as live during live headers shift: %w", err)
394+
}
395+
396+
if len(gapHeaders) > 0 {
397+
s.logger.Info("Filling gap between bulk and live headers using bulk ingestors", slog.Any("num_gap_headers", len(gapHeaders)))
398+
}
399+
400+
for _, header := range gapHeaders {
401+
if err := s.addLiveHeader(ctx, header); err != nil {
402+
s.logger.Warn("Chaintracks service - failed to add gap header as live", slog.String("header_hash", header.Hash), slog.String("error", err.Error()))
403+
}
404+
}
405+
406+
return nil
407+
}
408+
384409
func (s *Service) processHeaders(ctx context.Context) error {
385410
for {
386411
select {
@@ -441,15 +466,32 @@ func (s *Service) addLiveHeaderRecursive(ctx context.Context, header wdk.ChainBl
441466
var errNoPrev = fmt.Errorf("no previous header found")
442467

443468
func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHeader) (err error) {
444-
// TODO: implement header.Validate() method and uncomment validation check
445-
//if err := header.Validate(); err != nil {
446-
// return fmt.Errorf("invalid block header: %w", err)
447-
//}
469+
if header.Height == 0 {
470+
return fmt.Errorf("handling genesis block header is not supported here")
471+
}
472+
473+
if err := header.Validate(); err != nil {
474+
return fmt.Errorf("invalid block header: %w", err)
475+
}
448476

449477
if IsDirtyHash(header.Hash) {
450478
return fmt.Errorf("cannot add block header with dirty hash: %s", header.Hash)
451479
}
452480

481+
lastBulk, lastBulkChainWork, err := s.bulkMgr.LastHeader()
482+
if err != nil {
483+
return fmt.Errorf("failed to get last bulk header: %w", err)
484+
}
485+
486+
if lastBulk == nil {
487+
return fmt.Errorf("no bulk headers available to validate against")
488+
}
489+
490+
if header.Height <= lastBulk.Height {
491+
s.logger.Info("Chaintracks service - skipping storage of live header already present in bulk storage", slog.String("header_hash", header.Hash), slog.Any("header_height", header.Height))
492+
return nil
493+
}
494+
453495
q := s.storage.Query(ctx)
454496
q.Begin()
455497
defer func() {
@@ -479,28 +521,26 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead
479521
}
480522

481523
if oneBack == nil {
482-
s.logger.Debug("Chaintracks service - previous header not found, cannot add header yet", slog.String("header_hash", header.Hash), slog.String("previous_hash", header.PreviousHash))
483-
484524
if count, err := q.CountLiveHeaders(); err != nil {
485525
return fmt.Errorf("failed to count live headers: %w", err)
486526
} else if count == 0 {
487-
s.logger.Info("Chaintracks service - no live headers present, inserting genesis header", slog.String("header_hash", header.Hash))
488-
489-
// TODO check if this first-live-block-header matches the last bulk header
490-
// TODO: Important: Chainwork from bits should be added to the last ChainWork from the last bulk file
491-
headerChainWork := internal.ChainWorkFromBits(header.Bits)
492-
493-
if err := q.InsertNewLiveHeader(&models.LiveBlockHeader{
494-
ChainBlockHeader: header,
495-
PreviousHeaderID: nil,
496-
ChainWork: headerChainWork.To64PadHex(),
497-
IsActive: true,
498-
IsChainTip: true,
499-
}); err != nil {
500-
return fmt.Errorf("failed to insert genesis live header: %w", err)
527+
// No live headers yet, check if this header connects directly to last bulk
528+
if header.PreviousHash == lastBulk.Hash && header.Height == lastBulk.Height+1 {
529+
headerChainWork := internal.ChainWorkFromBits(header.Bits)
530+
chainWork := headerChainWork.AddChainWork(*lastBulkChainWork)
531+
532+
if err := q.InsertNewLiveHeader(&models.LiveBlockHeader{
533+
ChainBlockHeader: header,
534+
PreviousHeaderID: nil,
535+
ChainWork: chainWork.To64PadHex(),
536+
IsChainTip: true,
537+
IsActive: true,
538+
}); err != nil {
539+
return fmt.Errorf("failed to insert new live header: %w", err)
540+
}
541+
542+
return nil
501543
}
502-
503-
return nil
504544
}
505545

506546
return errNoPrev

pkg/services/chaintracks/chaintracks_storage.interface.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ type NamedLiveIngestor struct {
3434

3535
// BulkIngestor defines an interface for bulk synchronization of block headers within specified height ranges.
3636
// The Synchronize method ingests headers up to the given presentHeight for provided height ranges and returns insertion results.
37-
// TODO: refine return type from 'any' to a more specific type representing synchronization results.
3837
type BulkIngestor interface {
3938
Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]ingest.BulkHeaderMinimumInfo, ingest.BulkFileDownloader, error)
4039
}

pkg/services/chaintracks/ingest/bulk_ingestor_cdn.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ type BulkFileDownloader = func(ctx context.Context, fileInfo BulkHeaderMinimumIn
4040
// Synchronize retrieves available bulk header files for the configured BSV network and prepares chunks for ingestion.
4141
// It validates file metadata, checks network consistency, and returns a list of chunked header information for sync.
4242
func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderMinimumInfo, BulkFileDownloader, error) {
43-
// TODO: PresentHeight and ranges are not used in TS implementation, consider using them for optimization
44-
4543
filesInfo, err := b.reader.FetchBulkHeaderFilesInfo(ctx, b.chain)
4644
if err != nil {
4745
return nil, nil, fmt.Errorf("failed to fetch bulk header files info: %w", err)
@@ -50,6 +48,15 @@ func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, r
5048
bulkInfo := make([]BulkHeaderMinimumInfo, 0, len(filesInfo.Files))
5149
for i := range filesInfo.Files {
5250
file := &filesInfo.Files[i]
51+
52+
if !file.ToHeightRange().Overlaps(rangeToFetch) {
53+
b.logger.Info("Skipping bulk header file - does not overlap requested range",
54+
slog.String("file_name", file.FileName),
55+
logging.Number("start_height", file.FirstHeight),
56+
logging.Number("end_height", must.ConvertToIntFromUnsigned(file.FirstHeight)+file.Count-1))
57+
continue
58+
}
59+
5360
b.logger.Info("Found bulk header file",
5461
slog.String("file_name", file.FileName),
5562
logging.Number("start_height", file.FirstHeight),

0 commit comments

Comments
 (0)