Skip to content

Commit 42c8ad9

Browse files
authored
feat: live-to-bulk migration & new-tip/reorg callbacks & more config fields & fixes (#699)
1 parent 806bf33 commit 42c8ad9

File tree

11 files changed

+346
-65
lines changed

11 files changed

+346
-65
lines changed

cmd/chaintracks/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,47 @@ func main() {
4949
panic(err)
5050
}
5151

52+
listenForTipHeaders(server, ctx)
53+
listenForReorgs(server, ctx)
54+
5255
if err := server.ListenAndServe(ctx); err != nil {
5356
panic(err)
5457
}
5558
}
59+
60+
func listenForTipHeaders(server *chaintracks.Server, ctx context.Context) {
61+
newTipHeadersChan, unsubscribe := server.Service.SubscribeHeaders()
62+
63+
go func() {
64+
for {
65+
select {
66+
case header := <-newTipHeadersChan:
67+
slog.Default().Info("New tip header received", slog.Uint64("height", uint64(header.Height)), slog.String("hash", header.Hash))
68+
case <-ctx.Done():
69+
unsubscribe()
70+
return
71+
}
72+
}
73+
}()
74+
}
75+
76+
func listenForReorgs(server *chaintracks.Server, ctx context.Context) {
77+
reorgChan, unsubscribe := server.Service.SubscribeReorgs()
78+
79+
go func() {
80+
for {
81+
select {
82+
case reorg := <-reorgChan:
83+
slog.Default().Info("Reorg detected",
84+
slog.Uint64("new_tip_height", uint64(reorg.NewTip.Height)),
85+
slog.String("new_tip_hash", reorg.NewTip.Hash),
86+
slog.Uint64("old_tip_height", uint64(reorg.OldTip.Height)),
87+
slog.String("old_tip_hash", reorg.OldTip.Hash),
88+
)
89+
case <-ctx.Done():
90+
unsubscribe()
91+
return
92+
}
93+
}
94+
}()
95+
}

pkg/defs/chaintracks.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ type ChaintracksServiceConfig struct {
2525
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
2626
BulkIngestors []BulkIngestorConfig `mapstructure:"bulk_ingestors"`
2727
WocAPIKey string `mapstructure:"woc_api_key"`
28+
29+
AddLiveRecursionLimit uint `mapstructure:"add_live_recursion_limit"`
30+
LiveHeightThreshold uint `mapstructure:"live_height_threshold"`
2831
}
2932

3033
// Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type.
@@ -52,6 +55,10 @@ func (c *ChaintracksServiceConfig) Validate() error {
5255
}
5356
}
5457

58+
if c.AddLiveRecursionLimit > 100 || c.AddLiveRecursionLimit > c.LiveHeightThreshold {
59+
return fmt.Errorf("add_live_recursion_limit must be less than or equal to live_height_threshold and not exceed 100")
60+
}
61+
5562
return nil
5663
}
5764

@@ -73,7 +80,9 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig {
7380
Type: WhatsOnChainCDN,
7481
},
7582
},
76-
WocAPIKey: "",
83+
WocAPIKey: "",
84+
AddLiveRecursionLimit: 10,
85+
LiveHeightThreshold: 2000,
7786
}
7887
}
7988

pkg/services/chaintracks/bulk_headers_container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (b *bulkHeadersContainer) MaxHeightAtChunk(index int) int {
137137
func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlockHeader, error) {
138138
chunkIndex := b.getIndexForHeight(must.ConvertToIntFromUnsigned(height))
139139
if chunkIndex >= len(b.chunks) {
140-
return nil, nil
140+
return nil, nil // not found
141141
}
142142

143143
chunk := b.chunks[chunkIndex]
@@ -147,7 +147,7 @@ func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlock
147147
headerDataEnd := headerDataStart + 80
148148

149149
if headerDataEnd > len(chunk.data) {
150-
return nil, fmt.Errorf("header data end index %d exceeds chunk data length %d", headerDataEnd, len(chunk.data))
150+
return nil, nil // not found
151151
}
152152

153153
headerData := chunk.data[headerDataStart:headerDataEnd]

pkg/services/chaintracks/bulk_manager.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor, chai
3333
}
3434
}
3535

36-
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error {
36+
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges, liveHeightThreshold uint) error {
3737
if presentHeight <= liveHeightThreshold {
3838
bm.logger.Info("Skipping bulk synchronization - present height below live height threshold", slog.Any("present_height", presentHeight), slog.Any("live_height_threshold", liveHeightThreshold))
3939
return nil
@@ -114,6 +114,30 @@ func (bm *bulkManager) GetFileDataByIndex(fileID int) (*ingest.BulkFileData, err
114114
return bm.container.GetFileDataByIndex(fileID)
115115
}
116116

117+
func (bm *bulkManager) MigrateFromLiveHeaders(ctx context.Context, liveHeaders []*models.LiveBlockHeader) error {
118+
bm.locker.Lock()
119+
defer bm.locker.Unlock()
120+
121+
// create data slice
122+
data := make([]byte, 0, len(liveHeaders)*80)
123+
for _, header := range liveHeaders {
124+
headerBytes, err := header.Bytes()
125+
if err != nil {
126+
return fmt.Errorf("failed to convert live header at height %d to bytes: %w", header.Height, err)
127+
}
128+
data = append(data, headerBytes...)
129+
}
130+
131+
// add to container
132+
heightRange := models.NewHeightRange(liveHeaders[0].Height, liveHeaders[len(liveHeaders)-1].Height)
133+
err := bm.container.Add(ctx, data, heightRange)
134+
if err != nil {
135+
return fmt.Errorf("failed to add live headers to bulk container: %w", err)
136+
}
137+
138+
return nil
139+
}
140+
117141
func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []ingest.BulkHeaderMinimumInfo, downloader ingest.BulkFileDownloader, maxHeight uint) error {
118142
chunksToLoad := bm.getChunksToLoad(bulkChunks)
119143
type chunkWithInfo struct {
@@ -180,7 +204,7 @@ func (bm *bulkManager) shouldAddNewFile(info *ingest.BulkHeaderMinimumInfo) bool
180204
return !rangeToAdd.IsEmpty()
181205
}
182206

183-
func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) ([]wdk.ChainBlockHeader, error) {
207+
func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange, addLiveRecursionLimit uint) ([]wdk.ChainBlockHeader, error) {
184208
var newLiveHeaders []wdk.ChainBlockHeader
185209
maxBulkHeight := bm.GetHeightRange().MaxHeight
186210
minLiveHeight := presentHeight

pkg/services/chaintracks/chaintracks_handler.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package chaintracks
33
import (
44
"bytes"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"log/slog"
89
"net/http"
@@ -13,6 +14,7 @@ import (
1314
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
1415
servercommon "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/server"
1516
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
17+
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk"
1618
"github.com/go-softwarelab/common/pkg/to"
1719
)
1820

@@ -119,6 +121,11 @@ func (h *Handler) handleFindChainTipHeader(w http.ResponseWriter, r *http.Reques
119121
w.Header().Set("Content-Type", "application/json")
120122

121123
tipHeader, err := h.service.FindChainTipHeader(r.Context())
124+
if errors.Is(err, wdk.ErrNotFoundError) {
125+
http.Error(w, "Chain tip not found", http.StatusNotFound)
126+
return
127+
}
128+
122129
if err != nil {
123130
h.logger.Error("failed to find chain tip header hex", slog.String("error", err.Error()))
124131
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
@@ -137,6 +144,11 @@ func (h *Handler) handleFindTipHashHex(w http.ResponseWriter, r *http.Request) {
137144
w.Header().Set("Content-Type", "application/json")
138145

139146
tipHash, err := h.service.FindChainTipHeader(r.Context())
147+
if errors.Is(err, wdk.ErrNotFoundError) {
148+
http.Error(w, "Chain tip not found", http.StatusNotFound)
149+
return
150+
}
151+
140152
if err != nil {
141153
h.logger.Error("failed to find chain tip hash hex", slog.String("error", err.Error()))
142154
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
@@ -167,6 +179,11 @@ func (h *Handler) handleFindHeaderHexForHeight(w http.ResponseWriter, r *http.Re
167179
}
168180

169181
header, err := h.service.FindHeaderForHeight(r.Context(), height)
182+
if errors.Is(err, wdk.ErrNotFoundError) {
183+
http.Error(w, "Header not found for the specified height", http.StatusNotFound)
184+
return
185+
}
186+
170187
if err != nil {
171188
h.logger.Error("failed to find header hex for height", slog.String("error", err.Error()))
172189
http.Error(w, "Internal Server Error", http.StatusInternalServerError)

0 commit comments

Comments
 (0)