Skip to content

Commit 806bf33

Browse files
authored
feat: implement chain reorganization handling in Chaintracks service (#698)
1 parent 56ea26c commit 806bf33

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

pkg/services/chaintracks/chaintracks_service.go

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -579,9 +579,41 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead
579579
}
580580

581581
isActiveTip := chainWork.CmpChainWork(priorTipChainWork) > 0
582-
//if isActiveTip {
583-
// // TODO: handle reorgs if needed
584-
//}
582+
if isActiveTip {
583+
s.logger.Info("Chaintracks service - new active chain tip detected", slog.String("header_hash", header.Hash), slog.Any("header_height", header.Height))
584+
585+
// find newHeader's first active ancestor
586+
activeAncestor := oneBack
587+
for !activeAncestor.IsActive {
588+
previousHash := activeAncestor.PreviousHash
589+
activeAncestor, err = q.GetLiveHeaderByHash(previousHash)
590+
if err != nil {
591+
return fmt.Errorf("failed to get active ancestor header: %w", err)
592+
}
593+
594+
if activeAncestor == nil {
595+
return fmt.Errorf("active ancestor header not found for hash: %s", previousHash)
596+
}
597+
}
598+
599+
// TODO: Calculate reorg depth - but this is not needed yet - used only by reorg callbacks
600+
601+
if activeAncestor.HeaderID != oneBack.HeaderID {
602+
s.logger.Info("Chaintracks service - chain reorganization detected", slog.String("new_tip_hash", header.Hash), slog.Any("new_tip_height", header.Height), slog.String("active_ancestor_hash", activeAncestor.Hash), slog.Any("active_ancestor_height", activeAncestor.Height))
603+
604+
// deactivate headers from the current active chain tip up to but excluding our activeAncestor:
605+
if err := s.setActiveRecursivelyUntilReachAncestor(q, false, priorTip, activeAncestor); err != nil {
606+
return fmt.Errorf("failed to deactivate headers during reorg: %w", err)
607+
}
608+
609+
// the first header to activate is one before the one we are about to insert
610+
// headers are activated until we reach the active ancestor
611+
if err := s.setActiveRecursivelyUntilReachAncestor(q, true, oneBack, activeAncestor); err != nil {
612+
return fmt.Errorf("failed to activate headers during reorg: %w", err)
613+
}
614+
615+
}
616+
}
585617

586618
if oneBack.IsChainTip {
587619
if err := q.SetChainTipByID(oneBack.HeaderID, false); err != nil {
@@ -606,6 +638,28 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead
606638
return nil
607639
}
608640

641+
func (s *Service) setActiveRecursivelyUntilReachAncestor(q models.StorageQueries, isActive bool, first, ancestor *models.LiveBlockHeader) error {
642+
var err error
643+
current := first
644+
for current.HeaderID != ancestor.HeaderID {
645+
if err := q.SetActiveByID(current.HeaderID, isActive); err != nil {
646+
return fmt.Errorf("failed to set active=%t during reorg: %w", isActive, err)
647+
}
648+
649+
previousHash := current.PreviousHash
650+
current, err = q.GetLiveHeaderByHash(previousHash)
651+
if err != nil {
652+
return fmt.Errorf("failed to get previous header during reorg: %w", err)
653+
}
654+
655+
if current == nil {
656+
return fmt.Errorf("previous header not found during reorg for hash: %s", previousHash)
657+
}
658+
}
659+
660+
return nil
661+
}
662+
609663
func cancellableSleep(ctx context.Context, d time.Duration) error {
610664
select {
611665
case <-time.After(d):

pkg/services/chaintracks/gormstorage/storage_queries.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ func (i *storageQueries) SetChainTipByID(id uint, isChainTip bool) error {
113113
return nil
114114
}
115115

116+
func (i *storageQueries) SetActiveByID(id uint, isActive bool) error {
117+
table := i.getQuery().ChaintracksLiveHeader
118+
_, err := table.
119+
Where(table.HeaderID.Eq(id)).
120+
UpdateColumn(table.IsActive, isActive)
121+
if err != nil {
122+
return fmt.Errorf("failed to set active by ID: %w", err)
123+
}
124+
return nil
125+
}
126+
116127
func (i *storageQueries) InsertNewLiveHeader(header *models.LiveBlockHeader) error {
117128
table := i.getQuery().ChaintracksLiveHeader
118129
err := table.Create(&dbmodels.ChaintracksLiveHeader{

pkg/services/chaintracks/models/storage_queries.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type StorageQueries interface {
1212
GetLiveHeaderByHash(hash string) (*LiveBlockHeader, error)
1313
GetActiveTipLiveHeader() (*LiveBlockHeader, error)
1414
SetChainTipByID(id uint, isChainTip bool) error
15+
SetActiveByID(id uint, isActive bool) error
1516
InsertNewLiveHeader(header *LiveBlockHeader) error
1617
CountLiveHeaders() (int64, error)
1718
GetLiveHeaderByHeight(height uint) (*LiveBlockHeader, error)

0 commit comments

Comments
 (0)