From d70c9127e124e6a032b37b2f81de2eb59a6d039f Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 6 Oct 2025 23:02:46 -0700 Subject: [PATCH 1/9] Expose latest version for SS store --- ss/pebbledb/db.go | 75 ++++++++++++++++++++++++---------------- ss/rocksdb/db.go | 87 +++++++++++++++++------------------------------ ss/sqlite/db.go | 23 +++++++++---- ss/types/store.go | 4 +-- 4 files changed, 95 insertions(+), 94 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 14dd454a..801813b5 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -59,6 +59,8 @@ type Database struct { config config.StateStoreConfig // Earliest version for db after pruning earliestVersion int64 + // Latest version for db + latestVersion int64 // Map of module to when each was last updated // Used in pruning to skip over stores that have not been updated recently @@ -120,15 +122,24 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } + // Initialize earliest version earliestVersion, err := retrieveEarliestVersion(db) if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + + // Initialize latest version + latestVersion, err := retrieveLatestVersion(db) + if err != nil { + return nil, fmt.Errorf("failed to retrieve latest version: %w", err) } + database := &Database{ storage: db, asyncWriteWG: sync.WaitGroup{}, config: config, earliestVersion: earliestVersion, + latestVersion: latestVersion, pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } @@ -182,22 +193,26 @@ func (db *Database) SetLatestVersion(version int64) error { return err } -func (db *Database) GetLatestVersion() (int64, error) { - bz, closer, err := db.storage.Get([]byte(latestVersionKey)) - if err != nil { +func (db *Database) GetLatestVersion() int64 { + return db.latestVersion +} + +// Retrieve latestVersion from db, if not found, return 0. +func retrieveLatestVersion(db *pebble.DB) (int64, error) { + bz, closer, err := db.Get([]byte(latestVersionKey)) + defer func() { + if closer != nil { + _ = closer.Close() + } + }() + if err != nil || len(bz) == 0 { if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database return 0, nil } - return 0, err } - if len(bz) == 0 { - return 0, closer.Close() - } - - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return int64(binary.LittleEndian.Uint64(bz)), nil } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { @@ -211,8 +226,8 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error return nil } -func (db *Database) GetEarliestVersion() (int64, error) { - return db.earliestVersion, nil +func (db *Database) GetEarliestVersion() int64 { + return db.earliestVersion } func (db *Database) SetLastRangeHashed(latestHashed int64) error { @@ -237,23 +252,22 @@ func (db *Database) GetLastRangeHashed() (int64, error) { return cachedValue, nil } -// Retrieves earliest version from db +// Retrieves earliest version from db, if not found, return 0 func retrieveEarliestVersion(db *pebble.DB) (int64, error) { bz, closer, err := db.Get([]byte(earliestVersionKey)) - if err != nil { + defer func() { + if closer != nil { + _ = closer.Close() + } + }() + if err != nil || len(bz) == 0 { if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database return 0, nil } - return 0, err } - if len(bz) == 0 { - return 0, closer.Close() - } - - return int64(binary.LittleEndian.Uint64(bz)), closer.Close() + return int64(binary.LittleEndian.Uint64(bz)), nil } // SetLatestKey sets the latest key processed during migration. @@ -349,6 +363,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } + // Create batch and persist latest version in the batch b, err := NewBatch(db.storage, version) if err != nil { return err @@ -356,11 +371,11 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro for _, kvPair := range cs.Changeset.Pairs { if kvPair.Value == nil { - if err := b.Delete(cs.Name, kvPair.Key); err != nil { + if err = b.Delete(cs.Name, kvPair.Key); err != nil { return err } } else { - if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { + if err = b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { return err } } @@ -369,7 +384,13 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro // Mark the store as updated db.storeKeyDirty.Store(cs.Name, version) - return b.Write() + // Update latest version + err = b.Write() + if err != nil { + return err + } + db.latestVersion = version + return nil } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { @@ -513,10 +534,6 @@ func (db *Database) writeAsyncInBackground() { panic(err) } } - err := db.SetLatestVersion(version) - if err != nil { - panic(err) - } } } diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 6b7c4018..fff3e921 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -58,6 +58,8 @@ type Database struct { // Earliest version for db after pruning earliestVersion int64 + // Latest version for db + latestVersion int64 asyncWriteWG sync.WaitGroup @@ -85,17 +87,24 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } + // Initialize earliest version earliestVersion, err := retrieveEarliestVersion(storage) if err != nil { return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) } + latestVersion, err := retrieveLatestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + database := &Database{ storage: storage, config: config, cfHandle: cfHandle, tsLow: tsLow, earliestVersion: earliestVersion, + latestVersion: latestVersion, pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } @@ -117,31 +126,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return database, nil } -func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Database, error) { - slice, err := storage.GetFullHistoryTsLow(cfHandle) - if err != nil { - return nil, fmt.Errorf("failed to get full_history_ts_low: %w", err) - } - - var tsLow int64 - tsLowBz := copyAndFreeSlice(slice) - if len(tsLowBz) > 0 { - tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) - } - - earliestVersion, err := retrieveEarliestVersion(storage) - if err != nil { - return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) - } - - return &Database{ - storage: storage, - cfHandle: cfHandle, - tsLow: tsLow, - earliestVersion: earliestVersion, - }, nil -} - func (db *Database) Close() error { if db.streamHandler != nil { // Close the changelog stream first @@ -155,7 +139,6 @@ func (db *Database) Close() error { } db.storage.Close() - db.storage = nil db.cfHandle = nil @@ -177,18 +160,8 @@ func (db *Database) SetLatestVersion(version int64) error { return db.storage.Put(defaultWriteOpts, []byte(latestVersionKey), ts[:]) } -func (db *Database) GetLatestVersion() (int64, error) { - bz, err := db.storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) - if err != nil { - return 0, err - } - - if len(bz) == 0 { - // in case of a fresh database - return 0, nil - } - - return int64(binary.LittleEndian.Uint64(bz)), nil +func (db *Database) GetLatestVersion() int64 { + return db.latestVersion } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { @@ -202,8 +175,8 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error return nil } -func (db *Database) GetEarliestVersion() (int64, error) { - return db.earliestVersion, nil +func (db *Database) GetEarliestVersion() int64 { + return db.earliestVersion } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { @@ -246,6 +219,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } + // Update latest version in batch b := NewBatch(db, version) for _, kvPair := range cs.Changeset.Pairs { @@ -260,7 +234,12 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } } - return b.Write() + err := b.Write() + if err != nil { + return err + } + db.latestVersion = version + return nil } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { @@ -297,10 +276,6 @@ func (db *Database) writeAsyncInBackground() { panic(err) } } - err := db.SetLatestVersion(version) - if err != nil { - panic(err) - } } } } @@ -413,10 +388,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } start, end := util.IterateWithPrefix(prefix, nil, nil) - latestVersion, err := db.GetLatestVersion() - if err != nil { - return false, err - } + latestVersion := retrieveLatestVersion(db.storage) var startTs [TimestampSize]byte binary.LittleEndian.PutUint64(startTs[:], uint64(0)) @@ -514,17 +486,20 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo panic("implement me") } -// retrieveEarliestVersion retrieves the earliest version from the database +// retrieveEarliestVersion retrieves the earliest version from the database, if not found, return 0. func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) - if err != nil { - fmt.Printf("warning: rocksdb get for earliestVersionKey failed: %v", err) - return 0, nil + if err != nil || len(bz) == 0 { + return 0, err } + return int64(binary.LittleEndian.Uint64(bz)), nil +} - if len(bz) == 0 { - // in case of a fresh database - return 0, nil +// retrieveLatestVersion retrieves the latest version from the database, if not found, return 0. +func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) + if err != nil || len(bz) == 0 { + return 0, err } return int64(binary.LittleEndian.Uint64(bz)), nil diff --git a/ss/sqlite/db.go b/ss/sqlite/db.go index 11dd0fd2..e90fa4e7 100644 --- a/ss/sqlite/db.go +++ b/ss/sqlite/db.go @@ -108,10 +108,10 @@ func (db *Database) Close() error { return err } -func (db *Database) GetLatestVersion() (int64, error) { +func (db *Database) GetLatestVersion() int64 { stmt, err := db.storage.Prepare("SELECT value FROM state_storage WHERE store_key = ? AND key = ?") if err != nil { - return 0, fmt.Errorf("failed to prepare SQL statement: %w", err) + return 0 } defer stmt.Close() @@ -120,13 +120,12 @@ func (db *Database) GetLatestVersion() (int64, error) { if err := stmt.QueryRow(reservedStoreKey, keyLatestHeight).Scan(&latestHeight); err != nil { if errors.Is(err, sql.ErrNoRows) { // in case of a fresh database - return 0, nil + return 0 } - - return 0, fmt.Errorf("failed to query row: %w", err) + return 0 } - return latestHeight, nil + return latestHeight } func (db *Database) SetLatestVersion(version int64) error { @@ -138,7 +137,7 @@ func (db *Database) SetLatestVersion(version int64) error { return nil } -func (db *Database) GetEarliestVersion() (int64, error) { +func (db *Database) GetEarliestVersion() int64 { panic("not implemented") } @@ -339,3 +338,13 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { panic("implement me") } + +func (db *Database) GetLatestMigratedKey() ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) GetLatestMigratedModule() (string, error) { + //TODO implement me + panic("implement me") +} diff --git a/ss/types/store.go b/ss/types/store.go index 5fa574c5..a85d84a7 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -14,9 +14,9 @@ type StateStore interface { Iterator(storeKey string, version int64, start, end []byte) (DBIterator, error) ReverseIterator(storeKey string, version int64, start, end []byte) (DBIterator, error) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) - GetLatestVersion() (int64, error) + GetLatestVersion() int64 SetLatestVersion(version int64) error - GetEarliestVersion() (int64, error) + GetEarliestVersion() int64 SetEarliestVersion(version int64, ignoreVersion bool) error GetLatestMigratedKey() ([]byte, error) GetLatestMigratedModule() (string, error) From 6c443576816dbd8ff19b3c73c422837691c1835a Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 6 Oct 2025 23:10:14 -0700 Subject: [PATCH 2/9] Fix unit test for latest version --- ss/rocksdb/db.go | 5 +- ss/test/storage_test_suite.go | 237 ++++++++++++++++++++++++---------- 2 files changed, 172 insertions(+), 70 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index fff3e921..3f24d6f8 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -388,7 +388,10 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } start, end := util.IterateWithPrefix(prefix, nil, nil) - latestVersion := retrieveLatestVersion(db.storage) + latestVersion, err := retrieveLatestVersion(db.storage) + if err != nil { + return false, err + } var startTs [TimestampSize]byte binary.LittleEndian.PutUint64(startTs[:], uint64(0)) diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 5b9f9f0b..5ba775ff 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -41,7 +41,7 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { db, err := s.NewDB(tempDir, s.Config) s.Require().NoError(err) - lv, err := db.GetLatestVersion() + lv := db.GetLatestVersion() s.Require().NoError(err) s.Require().Zero(lv) @@ -50,7 +50,7 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { err = db.SetLatestVersion(i) s.Require().NoError(err) - lv, err = db.GetLatestVersion() + lv = db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(i, lv) } @@ -61,9 +61,11 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { newDB, err := s.NewDB(tempDir, s.Config) s.Require().NoError(err) - defer newDB.Close() + defer func(newDB types.StateStore) { + _ = newDB.Close() + }(newDB) - lv, err = newDB.GetLatestVersion() + lv = newDB.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(i-1, lv) @@ -72,7 +74,9 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { func (s *StorageTestSuite) TestDatabaseVersionedKeys() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 1, 100)) @@ -86,7 +90,9 @@ func (s *StorageTestSuite) TestDatabaseVersionedKeys() { func (s *StorageTestSuite) TestDatabaseGetVersionedKey() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) key := []byte("key") val := []byte("value001") @@ -155,7 +161,9 @@ func (s *StorageTestSuite) TestDatabaseVersionZero() { // Db should write all keys at version 0 at version 1 db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(DBApplyChangeset(db, 0, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) s.Require().NoError(DBApplyChangeset(db, 0, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value002")})) @@ -189,7 +197,9 @@ func (s *StorageTestSuite) TestDatabaseVersionZero() { func (s *StorageTestSuite) TestDatabaseApplyChangeset() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 100, 1)) @@ -197,8 +207,8 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { cs.Pairs = []*iavl.KVPair{} // Deletes - keys := [][]byte{} - vals := [][]byte{} + var keys [][]byte + var vals [][]byte for i := 0; i < 100; i++ { if i%10 == 0 { keys = append(keys, []byte(fmt.Sprintf("key%03d", i))) @@ -207,7 +217,7 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { } s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, keys, vals)) - lv, err := db.GetLatestVersion() + lv := db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(int64(1), lv) @@ -226,7 +236,9 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { func (s *StorageTestSuite) TestDatabaseIteratorEmptyDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) iter, err := db.Iterator(storeKey1, 1, []byte{}, []byte{}) s.Require().Error(err) @@ -236,20 +248,24 @@ func (s *StorageTestSuite) TestDatabaseIteratorEmptyDomain() { func (s *StorageTestSuite) TestDatabaseIteratorClose() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) iter, err := db.Iterator(storeKey1, 1, []byte("key000"), nil) s.Require().NoError(err) - iter.Close() + _ = iter.Close() s.Require().False(iter.Valid()) - s.Require().Panics(func() { iter.Close() }) + s.Require().Panics(func() { _ = iter.Close() }) } func (s *StorageTestSuite) TestDatabaseIteratorDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) testCases := map[string]struct { start, end []byte @@ -268,7 +284,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorDomain() { iter, err := db.Iterator(storeKey1, 1, tc.start, tc.end) s.Require().NoError(err) - defer iter.Close() + defer func(iter types.DBIterator) { + _ = iter.Close() + }(iter) start, end := iter.Domain() s.Require().Equal(tc.start, start) @@ -280,7 +298,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorDomain() { func (s *StorageTestSuite) TestDatabaseIterator() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 100, 1)) @@ -289,7 +309,9 @@ func (s *StorageTestSuite) TestDatabaseIterator() { itr, err := db.Iterator(storeKey1, v, []byte("key000"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) var i, count int for ; itr.Valid(); itr.Next() { @@ -312,7 +334,9 @@ func (s *StorageTestSuite) TestDatabaseIterator() { itr2, err := db.Iterator(storeKey1, v, []byte("key010"), []byte("key019")) s.Require().NoError(err) - defer itr2.Close() + defer func(itr2 types.DBIterator) { + _ = itr2.Close() + }(itr2) i, count := 10, 0 for ; itr2.Valid(); itr2.Next() { @@ -338,7 +362,9 @@ func (s *StorageTestSuite) TestDatabaseIterator() { func (s *StorageTestSuite) TestDatabaseIteratorRangedDeletes() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value001")})) @@ -348,7 +374,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorRangedDeletes() { itr, err := db.Iterator(storeKey1, 11, []byte("key001"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) // there should only be one valid key in the iterator -- key001 var count int @@ -363,7 +391,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorRangedDeletes() { func (s *StorageTestSuite) TestDatabaseIteratorDeletes() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key002")}, [][]byte{[]byte("value002")})) @@ -380,7 +410,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorDeletes() { } s.Require().Equal(1, count) s.Require().NoError(itr.Error()) - itr.Close() + _ = itr.Close() s.Require().NoError(DBApplyChangeset(db, 10, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) itr, err = db.Iterator(storeKey1, 11, []byte("key001"), nil) @@ -393,20 +423,22 @@ func (s *StorageTestSuite) TestDatabaseIteratorDeletes() { } s.Require().Equal(2, count) s.Require().NoError(itr.Error()) - itr.Close() + _ = itr.Close() } func (s *StorageTestSuite) TestDatabaseIteratorMultiVersion() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 10, 50)) // for versions 50-100, only update even keys for v := int64(51); v <= 100; v++ { - keys := [][]byte{} - vals := [][]byte{} + var keys [][]byte + var vals [][]byte for i := 0; i < 10; i++ { if i%2 == 0 { keys = append(keys, []byte(fmt.Sprintf("key%03d", i))) @@ -419,7 +451,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorMultiVersion() { itr, err := db.Iterator(storeKey1, 69, []byte("key000"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) // All keys should be present; All odd keys should have a value that reflects // version 49, and all even keys should have a value that reflects the desired @@ -445,7 +479,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorMultiVersion() { func (s *StorageTestSuite) TestDatabaseBugInitialReverseIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // Forward Iteration // Less than iterator version @@ -461,7 +497,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIteration() { itr, err := db.ReverseIterator(storeKey1, 5, []byte("keyA"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) count := 0 for ; itr.Valid(); itr.Next() { @@ -474,7 +512,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIteration() { func (s *StorageTestSuite) TestDatabaseBugInitialForwardIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // Forward Iteration // Less than iterator version @@ -491,7 +531,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIteration() { itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ")) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) count := 0 for ; itr.Valid(); itr.Next() { @@ -504,7 +546,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIteration() { func (s *StorageTestSuite) TestDatabaseBugInitialForwardIterationHigher() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // Less than iterator version s.Require().NoError(DBApplyChangeset(db, 9, storeKey1, [][]byte{[]byte("keyB")}, [][]byte{[]byte("value002")})) @@ -519,7 +563,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIterationHigher() { itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ")) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) count := 0 for ; itr.Valid(); itr.Next() { @@ -532,7 +578,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIterationHigher() { func (s *StorageTestSuite) TestDatabaseBugInitialReverseIterationHigher() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // Reverse Iteration // Less than iterator version @@ -548,7 +596,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIterationHigher() { itr, err := db.ReverseIterator(storeKey1, 5, []byte("keyA"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) count := 0 for ; itr.Valid(); itr.Next() { @@ -561,7 +611,9 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIterationHigher() { func (s *StorageTestSuite) TestDatabaseIteratorNoDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 10, 50)) @@ -569,7 +621,9 @@ func (s *StorageTestSuite) TestDatabaseIteratorNoDomain() { itr, err := db.Iterator(storeKey1, 50, nil, nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) var i, count int for ; itr.Valid(); itr.Next() { @@ -590,12 +644,14 @@ func (s *StorageTestSuite) TestDatabasePrune() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 10, 50)) // Verify earliest version is 0 - earliestVersion, err := db.GetEarliestVersion() + earliestVersion := db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(0), earliestVersion) @@ -603,11 +659,11 @@ func (s *StorageTestSuite) TestDatabasePrune() { s.Require().NoError(db.Prune(25)) // Verify earliest version is 26 (first 25 pruned) - earliestVersion, err = db.GetEarliestVersion() + earliestVersion = db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(26), earliestVersion) - latestVersion, err := db.GetLatestVersion() + latestVersion := db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(int64(50), latestVersion) @@ -637,7 +693,7 @@ func (s *StorageTestSuite) TestDatabasePrune() { s.Require().NoError(db.Prune(50)) // Verify earliest version is 51 (first 50 pruned) - earliestVersion, err = db.GetEarliestVersion() + earliestVersion = db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(51), earliestVersion) @@ -655,7 +711,9 @@ func (s *StorageTestSuite) TestDatabasePrune() { func (s *StorageTestSuite) TestDatabasePruneAndTombstone() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // write a key at three different versions 1, 100 and 200 s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) @@ -676,7 +734,9 @@ func (s *StorageTestSuite) TestDatabasePruneKeepRecent() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) key := []byte("key000") @@ -697,7 +757,9 @@ func (s *StorageTestSuite) TestDatabasePruneKeepRecent() { s.Require().NoError(err) s.Require().False(itr.Valid()) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) // ensure the value previously at version 1 is still there for queries greater than 50 bz, err = db.Get(storeKey1, 51, key) @@ -726,7 +788,9 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { stateStoreConfig.KeepLastVersion = false db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) @@ -758,7 +822,9 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { // Now reset KeepLastVersion to true and verify latest version of key exists newDB, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer newDB.Close() + defer func(newDB types.StateStore) { + _ = newDB.Close() + }(newDB) s.Require().NoError(DBApplyChangeset(newDB, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) s.Require().NoError(DBApplyChangeset(newDB, 100, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value002")})) @@ -782,7 +848,9 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { func (s *StorageTestSuite) TestDatabaseReverseIterator() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 100, 1)) @@ -790,7 +858,9 @@ func (s *StorageTestSuite) TestDatabaseReverseIterator() { iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) s.Require().NoError(err) - defer iter.Close() + defer func(iter types.DBIterator) { + _ = iter.Close() + }(iter) i, count := 99, 0 for ; iter.Valid(); iter.Next() { @@ -811,7 +881,9 @@ func (s *StorageTestSuite) TestDatabaseReverseIterator() { iter2, err := db.ReverseIterator(storeKey1, 1, []byte("key010"), []byte("key019")) s.Require().NoError(err) - defer iter2.Close() + defer func(iter2 types.DBIterator) { + _ = iter2.Close() + }(iter2) i, count = 18, 0 for ; iter2.Valid(); iter2.Next() { @@ -837,7 +909,9 @@ func (s *StorageTestSuite) TestDatabaseReverseIterator() { func (s *StorageTestSuite) TestParallelWrites() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) latestVersion := 10 kvCount := 100 @@ -883,7 +957,9 @@ func (s *StorageTestSuite) TestParallelWrites() { func (s *StorageTestSuite) TestParallelWriteAndPruning() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) latestVersion := 100 kvCount := 100 @@ -915,8 +991,7 @@ func (s *StorageTestSuite) TestParallelWriteAndPruning() { defer wg.Done() for i := 10; i < latestVersion; i += prunePeriod { for { - v, err := db.GetLatestVersion() - s.Require().NoError(err) + v := db.GetLatestVersion() if v > int64(i) { s.Require().NoError(db.Prune(v - 1)) break @@ -944,7 +1019,9 @@ func (s *StorageTestSuite) TestParallelWriteAndPruning() { func (s *StorageTestSuite) TestDatabaseParallelDeleteIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 100, 100)) @@ -978,7 +1055,9 @@ func (s *StorageTestSuite) TestDatabaseParallelDeleteIteration() { itr, err := db.Iterator(storeKey1, v, []byte("key000"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) var i, count int for ; itr.Valid(); itr.Next() { @@ -1017,7 +1096,9 @@ func (s *StorageTestSuite) TestDatabaseParallelDeleteIteration() { func (s *StorageTestSuite) TestDatabaseParallelWriteDelete() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 100, 1)) @@ -1080,7 +1161,9 @@ func (s *StorageTestSuite) TestParallelIterationAndPruning() { fmt.Printf("DEBUG - config %+v\n", s.Config) db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 10, 50)) @@ -1111,7 +1194,9 @@ func (s *StorageTestSuite) TestParallelIterationAndPruning() { itr, err := db.Iterator(storeKey1, v, []byte("key000"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) var i, count int for ; itr.Valid(); itr.Next() { @@ -1156,7 +1241,9 @@ func (s *StorageTestSuite) TestParallelIterationAndPruning() { func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) s.Require().NoError(FillData(db, 10, 100)) @@ -1176,7 +1263,9 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { itr, err := db.Iterator(storeKey1, int64(v), []byte("key000"), nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) var i, count int for ; itr.Valid(); itr.Next() { @@ -1204,7 +1293,9 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { func (s *StorageTestSuite) TestDatabaseImport() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) ch := make(chan types.SnapshotNode, 10) go func() { @@ -1236,7 +1327,9 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) ch := make(chan types.RawSnapshotNode, 10) var wg sync.WaitGroup @@ -1269,13 +1362,15 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { } } -// Verifies that ReverseIterator(nil, nil) is clamped to the caller's prefix +// TestDatabaseReverseIteratorPrefixIsolation Verifies that ReverseIterator(nil, nil) is clamped to the caller's prefix // via prefixEnd()/UpperBound and does **not** spill into the next module. func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) - defer db.Close() + defer func(db types.StateStore) { + _ = db.Close() + }(db) // store1 : key000-key009 // store2 : key000-key009 (different prefix, same suffixes) @@ -1301,7 +1396,9 @@ func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { // ---------- nil / nil reverse scan on store1 ---------- itr, err := db.ReverseIterator(storeKey1, 1, nil, nil) s.Require().NoError(err) - defer itr.Close() + defer func(itr types.DBIterator) { + _ = itr.Close() + }(itr) // We should see exactly the 10 keys from store1, in reverse order, // and we should *never* see a key that belongs to store2. @@ -1322,7 +1419,9 @@ func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { itr2, err := db.ReverseIterator(storeKey2, 1, nil, nil) s.Require().NoError(err) - defer itr2.Close() + defer func(itr2 types.DBIterator) { + _ = itr2.Close() + }(itr2) count = 0 for ; itr2.Valid(); itr2.Next() { From d76d6c7c25ad4cc6ce99c917a02acd765672b10a Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 6 Oct 2025 23:12:56 -0700 Subject: [PATCH 3/9] Fix unit test --- ss/pebbledb/db.go | 2 +- ss/rocksdb/db.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 801813b5..f8fe1718 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -187,6 +187,7 @@ func (db *Database) Close() error { } func (db *Database) SetLatestVersion(version int64) error { + db.latestVersion = version var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) @@ -218,7 +219,6 @@ func retrieveLatestVersion(db *pebble.DB) (int64, error) { func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version - var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 3f24d6f8..d000e39b 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -154,9 +154,9 @@ func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grock } func (db *Database) SetLatestVersion(version int64) error { + db.latestVersion = version var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - return db.storage.Put(defaultWriteOpts, []byte(latestVersionKey), ts[:]) } @@ -167,7 +167,6 @@ func (db *Database) GetLatestVersion() int64 { func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version - var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) return db.storage.Put(defaultWriteOpts, []byte(earliestVersionKey), ts[:]) From 164054296080fa48393962eff4bd12a28cfe0f85 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 09:14:43 -0700 Subject: [PATCH 4/9] Fix golint --- ss/rocksdb/db.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index a09e1720..2a661bac 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/binary" "fmt" + "math" "sync" "time" @@ -488,7 +489,11 @@ func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { if err != nil || len(bz) == 0 { return 0, err } - return int64(binary.LittleEndian.Uint64(bz)), nil + ubz := binary.LittleEndian.Uint64(bz) + if ubz > math.MaxInt64 { + return 0, fmt.Errorf("earliest version in rocksdb overflows int64: %d", ubz) + } + return int64(ubz), nil } // retrieveLatestVersion retrieves the latest version from the database, if not found, return 0. @@ -497,6 +502,10 @@ func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) { if err != nil || len(bz) == 0 { return 0, err } + uz := binary.LittleEndian.Uint64(bz) + if uz > math.MaxInt64 { + return 0, fmt.Errorf("latest version in rocksdb overflows int64: %d", uz) + } - return int64(binary.LittleEndian.Uint64(bz)), nil + return int64(uz), nil } From 5d87eec83aa56381b574cdc0d206f18032fa9fd6 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 09:21:18 -0700 Subject: [PATCH 5/9] Fix prune manager --- ss/pruning/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ss/pruning/manager.go b/ss/pruning/manager.go index 0fef14f7..b4c53236 100644 --- a/ss/pruning/manager.go +++ b/ss/pruning/manager.go @@ -41,7 +41,7 @@ func (m *Manager) Start() { go func() { for { pruneStartTime := time.Now() - latestVersion, _ := m.stateStore.GetLatestVersion() + latestVersion := m.stateStore.GetLatestVersion() pruneVersion := latestVersion - m.keepRecent if pruneVersion > 0 { // prune all versions up to and including the pruneVersion From 8a98170bba8c616fcf1694bff2451ca63b938616 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 09:24:35 -0700 Subject: [PATCH 6/9] Fix restore --- ss/store.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ss/store.go b/ss/store.go index a472c867..5bfd9019 100644 --- a/ss/store.go +++ b/ss/store.go @@ -63,11 +63,8 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt // RecoverStateStore will be called during initialization to recover the state from rlog func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error { - ssLatestVersion, err := stateStore.GetLatestVersion() - logger.Info(fmt.Sprintf("Recovering from changelog %s at latest SS version %d", changelogPath, ssLatestVersion)) - if err != nil { - return err - } + ssLatestVersion := stateStore.GetLatestVersion() + logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion)) if ssLatestVersion <= 0 { return nil } From 1518f681dfcf4ee7a615bd5448b7f1cf7b11a660 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 09:51:24 -0700 Subject: [PATCH 7/9] Add todo for readonly mode --- ss/pebbledb/db.go | 8 ++--- ss/rocksdb/db.go | 84 ++++++++++++++++++++++++----------------------- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 0b7bb098..c8519ede 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -117,6 +117,8 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { opts.FlushSplitBytes = opts.Levels[0].TargetFileSize opts = opts.EnsureDefaults() + //TODO: add a new config and check if readonly = true to support readonly mode + db, err := pebble.Open(dataDir, opts) if err != nil { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) @@ -170,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return database, nil } -func NewWithDB(storage *pebble.DB) *Database { - return &Database{ - storage: storage, - } -} - func (db *Database) Close() error { if db.streamHandler != nil { _ = db.streamHandler.Close() diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index 2a661bac..e16c05ab 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -71,6 +71,8 @@ type Database struct { } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { + //TODO: add a new config and check if readonly = true to support readonly mode + storage, cfHandle, err := OpenRocksDB(dataDir) if err != nil { return nil, fmt.Errorf("failed to open RocksDB: %w", err) @@ -126,25 +128,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return database, nil } -func (db *Database) Close() error { - if db.streamHandler != nil { - // Close the changelog stream first - db.streamHandler.Close() - // Close the pending changes channel to signal the background goroutine to stop - close(db.pendingChanges) - // Wait for the async writes to finish processing all buffered items - db.asyncWriteWG.Wait() - // Only set to nil after background goroutine has finished - db.streamHandler = nil - } - - db.storage.Close() - db.storage = nil - db.cfHandle = nil - - return nil -} - func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grocksdb.Slice, error) { return db.storage.GetCF( newTSReadOptions(version), @@ -164,6 +147,20 @@ func (db *Database) GetLatestVersion() int64 { return db.latestVersion } +// retrieveLatestVersion retrieves the latest version from the database, if not found, return 0. +func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) + if err != nil || len(bz) == 0 { + return 0, err + } + uz := binary.LittleEndian.Uint64(bz) + if uz > math.MaxInt64 { + return 0, fmt.Errorf("latest version in rocksdb overflows int64: %d", uz) + } + + return int64(uz), nil +} + func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version @@ -178,6 +175,19 @@ func (db *Database) GetEarliestVersion() int64 { return db.earliestVersion } +// retrieveEarliestVersion retrieves the earliest version from the database, if not found, return 0. +func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) + if err != nil || len(bz) == 0 { + return 0, err + } + ubz := binary.LittleEndian.Uint64(bz) + if ubz > math.MaxInt64 { + return 0, fmt.Errorf("earliest version in rocksdb overflows int64: %d", ubz) + } + return int64(ubz), nil +} + func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { if version < db.earliestVersion { return false, nil @@ -483,29 +493,21 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo panic("implement me") } -// retrieveEarliestVersion retrieves the earliest version from the database, if not found, return 0. -func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { - bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) - if err != nil || len(bz) == 0 { - return 0, err - } - ubz := binary.LittleEndian.Uint64(bz) - if ubz > math.MaxInt64 { - return 0, fmt.Errorf("earliest version in rocksdb overflows int64: %d", ubz) +func (db *Database) Close() error { + if db.streamHandler != nil { + // Close the changelog stream first + db.streamHandler.Close() + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() + // Only set to nil after background goroutine has finished + db.streamHandler = nil } - return int64(ubz), nil -} -// retrieveLatestVersion retrieves the latest version from the database, if not found, return 0. -func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) { - bz, err := storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) - if err != nil || len(bz) == 0 { - return 0, err - } - uz := binary.LittleEndian.Uint64(bz) - if uz > math.MaxInt64 { - return 0, fmt.Errorf("latest version in rocksdb overflows int64: %d", uz) - } + db.storage.Close() + db.storage = nil + db.cfHandle = nil - return int64(uz), nil + return nil } From 6b7dab2980887217fff6b94ec5ddf253456bbbd5 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 10:37:32 -0700 Subject: [PATCH 8/9] Fix duplicate close --- ss/pebbledb/db.go | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index c8519ede..11c90c98 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -219,7 +219,7 @@ func retrieveLatestVersion(db *pebble.DB) (int64, error) { if uz > math.MaxInt64 { return 0, fmt.Errorf("latest version in database overflows int64: %d", uz) } - return int64(uz), closer.Close() + return int64(uz), nil } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { @@ -239,6 +239,28 @@ func (db *Database) GetEarliestVersion() int64 { return db.earliestVersion } +// Retrieves earliest version from db, if not found, return 0 +func retrieveEarliestVersion(db *pebble.DB) (int64, error) { + bz, closer, err := db.Get([]byte(earliestVersionKey)) + defer func() { + if closer != nil { + _ = closer.Close() + } + }() + if err != nil || len(bz) == 0 { + if errors.Is(err, pebble.ErrNotFound) { + return 0, nil + } + return 0, err + } + + ubz := binary.LittleEndian.Uint64(bz) + if ubz > math.MaxInt64 { + return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) + } + return int64(ubz), nil +} + func (db *Database) SetLastRangeHashed(latestHashed int64) error { if latestHashed < 0 { return fmt.Errorf("latestHashed must be non-negative") @@ -264,28 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) { return cachedValue, nil } -// Retrieves earliest version from db, if not found, return 0 -func retrieveEarliestVersion(db *pebble.DB) (int64, error) { - bz, closer, err := db.Get([]byte(earliestVersionKey)) - defer func() { - if closer != nil { - _ = closer.Close() - } - }() - if err != nil || len(bz) == 0 { - if errors.Is(err, pebble.ErrNotFound) { - return 0, nil - } - return 0, err - } - - ubz := binary.LittleEndian.Uint64(bz) - if ubz > math.MaxInt64 { - return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) - } - return int64(ubz), closer.Close() -} - // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) From 10e313a562672387a9f53501e14062cb2384bc21 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 8 Oct 2025 12:48:01 -0700 Subject: [PATCH 9/9] Address comments --- ss/pebbledb/db.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 11c90c98..b55fdb40 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -391,24 +391,21 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro for _, kvPair := range cs.Changeset.Pairs { if kvPair.Value == nil { - if err = b.Delete(cs.Name, kvPair.Key); err != nil { - return err - } - } else { - if err = b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { + if err := b.Delete(cs.Name, kvPair.Key); err != nil { return err } + } else if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { + return err } } // Mark the store as updated db.storeKeyDirty.Store(cs.Name, version) - // Update latest version - err = b.Write() - if err != nil { + if err := b.Write(); err != nil { return err } + // Update latest version on write success db.latestVersion = version return nil }