Skip to content

Commit b0087c7

Browse files
authored
blockpersister fork recovery (#216)
1 parent a864221 commit b0087c7

File tree

4 files changed

+422
-15
lines changed

4 files changed

+422
-15
lines changed

services/blockpersister/Server.go

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,10 +246,15 @@ func (u *Server) getNextBlockToProcess(ctx context.Context) (*model.Block, error
246246
// This detects blockchain reorganizations that may have occurred since the last persistence
247247
// This check can be disabled via settings for testing or special scenarios
248248
if u.settings.Block.BlockPersisterEnableDefensiveReorgCheck && lastPersistedHeight > 0 && lastPersistedHash != nil {
249+
needsRecovery := false
250+
var recoveryReason string
251+
249252
// Get the block to obtain its ID
250253
lastBlock, err := u.blockchainClient.GetBlock(ctx, lastPersistedHash)
251254
if err != nil {
252-
u.logger.Warnf("[BlockPersister] Could not retrieve last persisted block %s for reorg check: %v. Continuing with next block.", lastPersistedHash.String(), err)
255+
// If we can't retrieve the block, it's likely been pruned because it's on an orphaned chain
256+
needsRecovery = true
257+
recoveryReason = fmt.Sprintf("last persisted block %s not found in blockchain store (likely pruned orphan)", lastPersistedHash.String())
253258
} else {
254259
// Check if this block is still on the current chain using its ID
255260
onCurrentChain, err := u.blockchainClient.CheckBlockIsInCurrentChain(ctx, []uint32{lastBlock.ID})
@@ -258,14 +263,53 @@ func (u *Server) getNextBlockToProcess(ctx context.Context) (*model.Block, error
258263
}
259264

260265
if !onCurrentChain {
261-
// Reorg detected - last persisted block is no longer on the current chain
262-
// We must return nil to trigger recovery logic - continuing would cause an infinite loop
263-
// because the parent hash validation will fail (new chain block's parent != orphaned block)
264-
u.logger.Infof("[BlockPersister] Detected reorg: last persisted block %s at height %d is no longer on current chain. Returning nil to trigger recovery.",
265-
lastPersistedHash.String(), lastPersistedHeight)
266-
// Return nil to prevent infinite loop - manual intervention or proper reorg recovery needed
267-
return nil, nil
266+
needsRecovery = true
267+
recoveryReason = fmt.Sprintf("last persisted block %s at height %d is no longer on current chain", lastPersistedHash.String(), lastPersistedHeight)
268+
}
269+
}
270+
271+
if needsRecovery {
272+
// Reorg detected - trigger recovery
273+
u.logger.Infof("[BlockPersister] Detected reorg: %s. Starting recovery...", recoveryReason)
274+
275+
// Find common ancestor by walking backward and trying to rollback to each current chain block
276+
// This approach works even if the orphaned blocks have been pruned from the blockchain store
277+
var commonAncestorHeight uint32
278+
var commonAncestorHash *chainhash.Hash
279+
280+
// Walk backward from last persisted height
281+
for height := lastPersistedHeight; height > 0; height-- {
282+
// Get the block from the current chain at this height
283+
currentChainBlock, err := u.blockchainClient.GetBlockByHeight(ctx, height)
284+
if err != nil {
285+
// Blockchain service should be reliable during recovery
286+
// If it's failing, there's likely a real infrastructure problem that needs attention
287+
return nil, errors.NewProcessingError("blockchain service unavailable during reorg recovery at height %d", height, err)
288+
}
289+
290+
// Try to rollback to this block's hash
291+
// This will succeed if the hash exists in our state file (meaning it's a common ancestor)
292+
if err := u.state.RollbackToHash(currentChainBlock.Hash()); err == nil {
293+
// Success - found common ancestor
294+
commonAncestorHeight = height
295+
commonAncestorHash = currentChainBlock.Hash()
296+
break
297+
}
298+
// If rollback failed, this block isn't in our state file, continue backward
299+
}
300+
301+
if commonAncestorHash == nil {
302+
return nil, errors.NewProcessingError("no common ancestor found during reorg recovery - could not find any current chain block in state file", nil)
268303
}
304+
305+
// Calculate how many blocks we rolled back
306+
blocksRolledBack := lastPersistedHeight - commonAncestorHeight
307+
308+
u.logger.Infof("[BlockPersister] Reorg recovery complete: rolled back %d blocks from height %d to common ancestor at height %d (hash: %s). Will resume processing from height %d.",
309+
blocksRolledBack, lastPersistedHeight, commonAncestorHeight, commonAncestorHash.String(), commonAncestorHeight+1)
310+
311+
// Return nil to trigger next iteration, which will start processing from common ancestor + 1
312+
return nil, nil
269313
}
270314
}
271315

services/blockpersister/Server_test.go

Lines changed: 139 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,33 +1231,165 @@ func TestGetNextBlockToProcess_ReorgDetected(t *testing.T) {
12311231
// Create mock blockchain client
12321232
mockClient := &blockchain.Mock{}
12331233

1234-
// Create mock block for the last persisted block (height 100)
1235-
lastPersistedHash, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000064")
1234+
// Create common ancestor header first so we can use its computed hash
1235+
dummyHash, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000001")
1236+
nBits, _ := model.NewNBitFromString("1d00ffff")
1237+
commonAncestorHeader := &model.BlockHeader{
1238+
Version: 1,
1239+
HashPrevBlock: dummyHash,
1240+
HashMerkleRoot: dummyHash,
1241+
Timestamp: 1234567890,
1242+
Bits: *nBits,
1243+
Nonce: 1,
1244+
}
1245+
1246+
// Use the computed hash as the last persisted hash
1247+
lastPersistedHash := commonAncestorHeader.Hash()
12361248
lastPersistedBlock := &model.Block{
12371249
Height: 100,
12381250
ID: 100,
12391251
}
12401252

1241-
// Mock the GetBlock call for reorg detection
1253+
// Mock the GetBlock call for reorg detection - using computed hash
12421254
mockClient.On("GetBlock", ctx, lastPersistedHash).Return(
12431255
lastPersistedBlock, nil)
12441256
// Mock the CheckBlockIsInCurrentChain call - returning false to simulate reorg
12451257
mockClient.On("CheckBlockIsInCurrentChain", ctx, []uint32{uint32(100)}).Return(
12461258
false, nil) // false indicates block is NOT on current chain (reorg detected)
1247-
// GetBestBlockHeader should NOT be called because we return early
1259+
1260+
// Mock recovery flow - new approach walks backward trying each height
1261+
// It will try to get the block at height 100 from current chain
1262+
// and that will match our state file, so recovery succeeds immediately
1263+
mockClient.On("GetBlockByHeight", ctx, uint32(100)).Return(
1264+
&model.Block{Height: 100, Header: commonAncestorHeader}, nil)
12481265

12491266
server := New(ctx, logger, tSettings, nil, nil, nil, mockClient)
12501267

1251-
// Set initial persisted height with the orphaned hash
1268+
// Set initial persisted height with the computed hash
1269+
// This ensures the hash in the state file matches what we'll try to rollback to
12521270
err := server.state.AddBlock(100, lastPersistedHash.String())
12531271
require.NoError(t, err)
12541272

12551273
// Call getNextBlockToProcess
12561274
block, err := server.getNextBlockToProcess(ctx)
12571275

1258-
// Verify that we return nil block and nil error (triggering recovery)
1276+
// Verify that we return nil block and nil error after recovery
1277+
require.NoError(t, err)
1278+
assert.Nil(t, block, "Should return nil block after recovery to trigger retry")
1279+
1280+
// Verify state wasn't changed since common ancestor is same as last persisted
1281+
height, hash, err := server.state.GetLastPersistedBlock()
1282+
require.NoError(t, err)
1283+
require.Equal(t, uint32(100), height)
1284+
require.Equal(t, commonAncestorHeader.Hash().String(), hash.String())
1285+
1286+
// Verify mock expectations
1287+
mockClient.AssertExpectations(t)
1288+
}
1289+
1290+
// TestGetNextBlockToProcess_ReorgRecovery tests the full reorg recovery flow using block locators
1291+
func TestGetNextBlockToProcess_ReorgRecovery(t *testing.T) {
1292+
ctx := context.Background()
1293+
logger := ulogger.TestLogger{}
1294+
tSettings := test.CreateBaseTestSettings(t)
1295+
1296+
// Create temp directory for state file
1297+
tempDir := t.TempDir()
1298+
tSettings.Block.StateFile = tempDir + "/blocks.dat"
1299+
tSettings.Block.BlockPersisterPersistAge = 2
1300+
1301+
// Create mock blockchain client
1302+
mockClient := &blockchain.Mock{}
1303+
1304+
// Simulate state with blocks up to height 105 (on old chain)
1305+
// Common ancestor is at height 100
1306+
// Reorg occurred at height 101
1307+
1308+
// Create common ancestor header first so we can use its computed hash everywhere
1309+
dummyHash, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000001")
1310+
nBits, _ := model.NewNBitFromString("1d00ffff")
1311+
commonAncestorHeader := &model.BlockHeader{
1312+
Version: 1,
1313+
HashPrevBlock: dummyHash,
1314+
HashMerkleRoot: dummyHash,
1315+
Timestamp: 1234567890,
1316+
Bits: *nBits,
1317+
Nonce: 1,
1318+
}
1319+
commonAncestorHash := commonAncestorHeader.Hash()
1320+
1321+
// Old chain hashes (blocks after the fork)
1322+
oldChainHash101, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000101")
1323+
oldChainHash102, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000102")
1324+
oldChainHash103, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000103")
1325+
oldChainHash104, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000104")
1326+
oldChainHash105, _ := chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000105")
1327+
1328+
server := New(ctx, logger, tSettings, nil, nil, nil, mockClient)
1329+
1330+
// Set up state with blocks from old chain using the computed common ancestor hash
1331+
require.NoError(t, server.state.AddBlock(100, commonAncestorHash.String()))
1332+
require.NoError(t, server.state.AddBlock(101, oldChainHash101.String()))
1333+
require.NoError(t, server.state.AddBlock(102, oldChainHash102.String()))
1334+
require.NoError(t, server.state.AddBlock(103, oldChainHash103.String()))
1335+
require.NoError(t, server.state.AddBlock(104, oldChainHash104.String()))
1336+
require.NoError(t, server.state.AddBlock(105, oldChainHash105.String()))
1337+
1338+
// Verify initial state
1339+
height, hash, err := server.state.GetLastPersistedBlock()
1340+
require.NoError(t, err)
1341+
require.Equal(t, uint32(105), height)
1342+
require.Equal(t, oldChainHash105.String(), hash.String())
1343+
1344+
// Mock the reorg detection sequence
1345+
lastPersistedBlock := &model.Block{
1346+
Height: 105,
1347+
ID: 105,
1348+
}
1349+
1350+
// 1. GetBlock call for last persisted block
1351+
mockClient.On("GetBlock", ctx, oldChainHash105).Return(lastPersistedBlock, nil)
1352+
1353+
// 2. CheckBlockIsInCurrentChain returns false (reorg detected)
1354+
mockClient.On("CheckBlockIsInCurrentChain", ctx, []uint32{uint32(105)}).Return(false, nil)
1355+
1356+
// 3. Mock recovery flow - new approach walks backward from height 105
1357+
// trying to find a block from current chain that exists in our state file
1358+
// Heights 105, 104, 103, 102, 101 won't match (different hashes)
1359+
// Height 100 will match (common ancestor)
1360+
newChainHash105, _ := chainhash.NewHashFromStr("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
1361+
newChainHash104, _ := chainhash.NewHashFromStr("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
1362+
newChainHash103, _ := chainhash.NewHashFromStr("cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
1363+
newChainHash102, _ := chainhash.NewHashFromStr("dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd")
1364+
newChainHash101, _ := chainhash.NewHashFromStr("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee")
1365+
1366+
// Mock GetBlockByHeight for each height during backward search
1367+
// Create proper block headers for each height on the new chain
1368+
mockClient.On("GetBlockByHeight", ctx, uint32(105)).Return(
1369+
&model.Block{Height: 105, Header: &model.BlockHeader{Version: 1, HashPrevBlock: oldChainHash104, HashMerkleRoot: newChainHash105, Timestamp: 1234567890, Bits: *nBits, Nonce: 1}}, nil)
1370+
mockClient.On("GetBlockByHeight", ctx, uint32(104)).Return(
1371+
&model.Block{Height: 104, Header: &model.BlockHeader{Version: 1, HashPrevBlock: oldChainHash103, HashMerkleRoot: newChainHash104, Timestamp: 1234567890, Bits: *nBits, Nonce: 1}}, nil)
1372+
mockClient.On("GetBlockByHeight", ctx, uint32(103)).Return(
1373+
&model.Block{Height: 103, Header: &model.BlockHeader{Version: 1, HashPrevBlock: oldChainHash102, HashMerkleRoot: newChainHash103, Timestamp: 1234567890, Bits: *nBits, Nonce: 1}}, nil)
1374+
mockClient.On("GetBlockByHeight", ctx, uint32(102)).Return(
1375+
&model.Block{Height: 102, Header: &model.BlockHeader{Version: 1, HashPrevBlock: oldChainHash101, HashMerkleRoot: newChainHash102, Timestamp: 1234567890, Bits: *nBits, Nonce: 1}}, nil)
1376+
mockClient.On("GetBlockByHeight", ctx, uint32(101)).Return(
1377+
&model.Block{Height: 101, Header: &model.BlockHeader{Version: 1, HashPrevBlock: commonAncestorHash, HashMerkleRoot: newChainHash101, Timestamp: 1234567890, Bits: *nBits, Nonce: 1}}, nil)
1378+
mockClient.On("GetBlockByHeight", ctx, uint32(100)).Return(
1379+
&model.Block{Height: 100, Header: commonAncestorHeader}, nil)
1380+
1381+
// Call getNextBlockToProcess - should trigger reorg recovery
1382+
block, err := server.getNextBlockToProcess(ctx)
1383+
1384+
// Should return nil (to retry on next iteration)
1385+
require.NoError(t, err)
1386+
assert.Nil(t, block, "Should return nil after recovery to trigger retry")
1387+
1388+
// Verify state was rolled back to common ancestor (height 100)
1389+
height, hash, err = server.state.GetLastPersistedBlock()
12591390
require.NoError(t, err)
1260-
assert.Nil(t, block, "Should return nil block when reorg is detected")
1391+
require.Equal(t, uint32(100), height)
1392+
require.Equal(t, commonAncestorHeader.Hash().String(), hash.String())
12611393

12621394
// Verify mock expectations
12631395
mockClient.AssertExpectations(t)

services/blockpersister/state/state.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package state
2424

2525
import (
26+
"bufio"
2627
"bytes"
2728
"fmt"
2829
"io"
@@ -229,6 +230,125 @@ func (s *State) AddBlock(height uint32, hash string) error {
229230
return nil
230231
}
231232

233+
// RollbackToHash truncates the state file to the specified block hash
234+
// This removes all blocks after the first occurrence of the target hash
235+
// Uses streaming to minimize memory usage - only one line in memory at a time
236+
// Parameters:
237+
// - targetHash: the hash of the block to rollback to
238+
//
239+
// Returns:
240+
// - error: any error encountered (including if hash is not found)
241+
func (s *State) RollbackToHash(targetHash *chainhash.Hash) (err error) {
242+
s.fileLock.Lock()
243+
defer s.fileLock.Unlock()
244+
245+
// Open source file for reading
246+
srcFile, err := os.Open(s.filePath)
247+
if err != nil {
248+
if os.IsNotExist(err) {
249+
return errors.NewProcessingError("cannot rollback to hash %s: state file does not exist", targetHash.String())
250+
}
251+
return errors.NewProcessingError("failed to open blocks file", err)
252+
}
253+
defer srcFile.Close()
254+
255+
srcDeferFn, err := s.lockFile(srcFile, syscall.LOCK_SH)
256+
if err != nil {
257+
return errors.NewProcessingError("could not lock file for reading", err)
258+
}
259+
defer srcDeferFn()
260+
261+
// Create temporary file for writing
262+
dir := filepath.Dir(s.filePath)
263+
tmpFile, err := os.CreateTemp(dir, "blocks.*.tmp")
264+
if err != nil {
265+
return errors.NewProcessingError("failed to create temporary file", err)
266+
}
267+
tmpPath := tmpFile.Name()
268+
defer os.Remove(tmpPath) // Always cleanup temp file (fails silently if renamed)
269+
270+
// Defer tmpFile.Close() - will be skipped on success via tmpFileClosed flag
271+
tmpFileClosed := false
272+
defer func() {
273+
if !tmpFileClosed {
274+
tmpFile.Close()
275+
}
276+
}()
277+
278+
tmpDeferFn, err := s.lockFile(tmpFile, syscall.LOCK_EX)
279+
if err != nil {
280+
return errors.NewProcessingError("could not lock temporary file", err)
281+
}
282+
defer tmpDeferFn()
283+
284+
// Stream copy line by line until we hit the target hash
285+
scanner := bufio.NewScanner(srcFile)
286+
linesWritten := 0
287+
hashFound := false
288+
289+
for scanner.Scan() {
290+
line := scanner.Bytes()
291+
292+
if len(line) == 0 {
293+
continue
294+
}
295+
296+
// Parse hash from the line (format: height,hash)
297+
commaIndex := bytes.IndexByte(line, ',')
298+
if commaIndex != -1 {
299+
// Extract and check hash
300+
hashStr := string(bytes.TrimSpace(line[commaIndex+1:]))
301+
hash, err := chainhash.NewHashFromStr(hashStr)
302+
if err == nil && hash.IsEqual(targetHash) {
303+
// Found target hash - write this line and stop
304+
if _, err := tmpFile.Write(line); err != nil {
305+
return errors.NewProcessingError("failed to write to temporary file", err)
306+
}
307+
if _, err := tmpFile.WriteString("\n"); err != nil {
308+
return errors.NewProcessingError("failed to write newline to temporary file", err)
309+
}
310+
linesWritten++
311+
hashFound = true
312+
break
313+
}
314+
}
315+
316+
// Write line and continue
317+
if _, err := tmpFile.Write(line); err != nil {
318+
return errors.NewProcessingError("failed to write to temporary file", err)
319+
}
320+
if _, err := tmpFile.WriteString("\n"); err != nil {
321+
return errors.NewProcessingError("failed to write newline to temporary file", err)
322+
}
323+
linesWritten++
324+
}
325+
326+
if err = scanner.Err(); err != nil {
327+
return errors.NewProcessingError("failed to read blocks file", err)
328+
}
329+
330+
if !hashFound {
331+
return errors.NewProcessingError("cannot rollback to hash %s: hash not found in state file", targetHash.String())
332+
}
333+
334+
// Sync temp file to disk
335+
if err = tmpFile.Sync(); err != nil {
336+
return errors.NewProcessingError("failed to sync temporary file", err)
337+
}
338+
339+
// Close temp file before rename (locks will be released by defers)
340+
tmpFile.Close()
341+
tmpFileClosed = true
342+
343+
// Atomically replace the original file
344+
if err = os.Rename(tmpPath, s.filePath); err != nil {
345+
return errors.NewProcessingError("failed to rename temporary file", err)
346+
}
347+
348+
s.logger.Infof("[BlockPersister State] Rolled back to hash %s (%d blocks in final state)", targetHash.String(), linesWritten)
349+
return nil
350+
}
351+
232352
// lockFile implements file locking for concurrent access
233353
// Parameters:
234354
// - file: file to lock

0 commit comments

Comments
 (0)