diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 7f02a28f..b55fdb40 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -59,6 +59,8 @@ type Database struct { config config.StateStoreConfig // Earliest version for db after pruning earliestVersion int64 + // Latest version for db + latestVersion int64 // Map of module to when each was last updated // Used in pruning to skip over stores that have not been updated recently @@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { opts.FlushSplitBytes = opts.Levels[0].TargetFileSize opts = opts.EnsureDefaults() + //TODO: add a new config and check if readonly = true to support readonly mode + db, err := pebble.Open(dataDir, opts) if err != nil { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } + // Initialize earliest version earliestVersion, err := retrieveEarliestVersion(db) if err != nil { - return nil, fmt.Errorf("failed to open PebbleDB: %w", err) + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + + // Initialize latest version + latestVersion, err := retrieveLatestVersion(db) + if err != nil { + return nil, fmt.Errorf("failed to retrieve latest version: %w", err) } + database := &Database{ storage: db, asyncWriteWG: sync.WaitGroup{}, config: config, earliestVersion: earliestVersion, + latestVersion: latestVersion, pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } @@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return database, nil } -func NewWithDB(storage *pebble.DB) *Database { - return &Database{ - storage: storage, - } -} - func (db *Database) Close() error { if db.streamHandler != nil { _ = db.streamHandler.Close() @@ -182,32 +189,37 @@ func (db *Database) SetLatestVersion(version int64) error { if version < 0 { return fmt.Errorf("version must be non-negative") } + db.latestVersion = version var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) return err } -func (db *Database) GetLatestVersion() (int64, error) { - bz, closer, err := db.storage.Get([]byte(latestVersionKey)) - if err != nil { +func (db *Database) GetLatestVersion() int64 { + return db.latestVersion +} + +// Retrieve latestVersion from db, if not found, return 0. +func retrieveLatestVersion(db *pebble.DB) (int64, error) { + bz, closer, err := db.Get([]byte(latestVersionKey)) + defer func() { + if closer != nil { + _ = closer.Close() + } + }() + if err != nil || len(bz) == 0 { if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database return 0, nil } - return 0, err } - if len(bz) == 0 { - return 0, closer.Close() - } - uz := binary.LittleEndian.Uint64(bz) if uz > math.MaxInt64 { return 0, fmt.Errorf("latest version in database overflows int64: %d", uz) } - return int64(uz), closer.Close() + return int64(uz), nil } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { @@ -216,7 +228,6 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error } if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version - var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) @@ -224,8 +235,30 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error return nil } -func (db *Database) GetEarliestVersion() (int64, error) { - return db.earliestVersion, nil +func (db *Database) GetEarliestVersion() int64 { + return db.earliestVersion +} + +// Retrieves earliest version from db, if not found, return 0 +func retrieveEarliestVersion(db *pebble.DB) (int64, error) { + bz, closer, err := db.Get([]byte(earliestVersionKey)) + defer func() { + if closer != nil { + _ = closer.Close() + } + }() + if err != nil || len(bz) == 0 { + if errors.Is(err, pebble.ErrNotFound) { + return 0, nil + } + return 0, err + } + + ubz := binary.LittleEndian.Uint64(bz) + if ubz > math.MaxInt64 { + return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) + } + return int64(ubz), nil } func (db *Database) SetLastRangeHashed(latestHashed int64) error { @@ -253,29 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) { return cachedValue, nil } -// Retrieves earliest version from db -func retrieveEarliestVersion(db *pebble.DB) (int64, error) { - bz, closer, err := db.Get([]byte(earliestVersionKey)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - // in case of a fresh database - return 0, nil - } - - return 0, err - } - - if len(bz) == 0 { - return 0, closer.Close() - } - - ubz := binary.LittleEndian.Uint64(bz) - if ubz > math.MaxInt64 { - return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) - } - return int64(ubz), closer.Close() -} - // SetLatestKey sets the latest key processed during migration. func (db *Database) SetLatestMigratedKey(key []byte) error { return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) @@ -373,6 +383,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } + // Create batch and persist latest version in the batch b, err := NewBatch(db.storage, version) if err != nil { return err @@ -383,17 +394,20 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro if err := b.Delete(cs.Name, kvPair.Key); err != nil { return err } - } else { - if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { - return err - } + } else if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { + return err } } // Mark the store as updated db.storeKeyDirty.Store(cs.Name, version) - return b.Write() + if err := b.Write(); err != nil { + return err + } + // Update latest version on write success + db.latestVersion = version + return nil } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { @@ -537,10 +551,6 @@ func (db *Database) writeAsyncInBackground() { panic(err) } } - err := db.SetLatestVersion(version) - if err != nil { - panic(err) - } } } diff --git a/ss/pruning/manager.go b/ss/pruning/manager.go index 0fef14f7..b4c53236 100644 --- a/ss/pruning/manager.go +++ b/ss/pruning/manager.go @@ -41,7 +41,7 @@ func (m *Manager) Start() { go func() { for { pruneStartTime := time.Now() - latestVersion, _ := m.stateStore.GetLatestVersion() + latestVersion := m.stateStore.GetLatestVersion() pruneVersion := latestVersion - m.keepRecent if pruneVersion > 0 { // prune all versions up to and including the pruneVersion diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index f171d636..e16c05ab 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/binary" "fmt" + "math" "sync" "time" @@ -57,6 +58,8 @@ type Database struct { // Earliest version for db after pruning earliestVersion int64 + // Latest version for db + latestVersion int64 asyncWriteWG sync.WaitGroup @@ -68,6 +71,8 @@ type Database struct { } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { + //TODO: add a new config and check if readonly = true to support readonly mode + storage, cfHandle, err := OpenRocksDB(dataDir) if err != nil { return nil, fmt.Errorf("failed to open RocksDB: %w", err) @@ -84,17 +89,24 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) } + // Initialize earliest version earliestVersion, err := retrieveEarliestVersion(storage) if err != nil { return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) } + latestVersion, err := retrieveLatestVersion(storage) + if err != nil { + return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) + } + database := &Database{ storage: storage, config: config, cfHandle: cfHandle, tsLow: tsLow, earliestVersion: earliestVersion, + latestVersion: latestVersion, pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), } @@ -116,51 +128,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { return database, nil } -func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Database, error) { - slice, err := storage.GetFullHistoryTsLow(cfHandle) - if err != nil { - return nil, fmt.Errorf("failed to get full_history_ts_low: %w", err) - } - - var tsLow int64 - tsLowBz := copyAndFreeSlice(slice) - if len(tsLowBz) > 0 { - tsLow = int64(binary.LittleEndian.Uint64(tsLowBz)) - } - - earliestVersion, err := retrieveEarliestVersion(storage) - if err != nil { - return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) - } - - return &Database{ - storage: storage, - cfHandle: cfHandle, - tsLow: tsLow, - earliestVersion: earliestVersion, - }, nil -} - -func (db *Database) Close() error { - if db.streamHandler != nil { - // Close the changelog stream first - db.streamHandler.Close() - // Close the pending changes channel to signal the background goroutine to stop - close(db.pendingChanges) - // Wait for the async writes to finish processing all buffered items - db.asyncWriteWG.Wait() - // Only set to nil after background goroutine has finished - db.streamHandler = nil - } - - db.storage.Close() - - db.storage = nil - db.cfHandle = nil - - return nil -} - func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grocksdb.Slice, error) { return db.storage.GetCF( newTSReadOptions(version), @@ -170,30 +137,33 @@ func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grock } func (db *Database) SetLatestVersion(version int64) error { + db.latestVersion = version var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) - return db.storage.Put(defaultWriteOpts, []byte(latestVersionKey), ts[:]) } -func (db *Database) GetLatestVersion() (int64, error) { - bz, err := db.storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) - if err != nil { +func (db *Database) GetLatestVersion() int64 { + return db.latestVersion +} + +// retrieveLatestVersion retrieves the latest version from the database, if not found, return 0. +func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(latestVersionKey)) + if err != nil || len(bz) == 0 { return 0, err } - - if len(bz) == 0 { - // in case of a fresh database - return 0, nil + uz := binary.LittleEndian.Uint64(bz) + if uz > math.MaxInt64 { + return 0, fmt.Errorf("latest version in rocksdb overflows int64: %d", uz) } - return int64(binary.LittleEndian.Uint64(bz)), nil + return int64(uz), nil } func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { if version > db.earliestVersion || ignoreVersion { db.earliestVersion = version - var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) return db.storage.Put(defaultWriteOpts, []byte(earliestVersionKey), ts[:]) @@ -201,8 +171,21 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error return nil } -func (db *Database) GetEarliestVersion() (int64, error) { - return db.earliestVersion, nil +func (db *Database) GetEarliestVersion() int64 { + return db.earliestVersion +} + +// retrieveEarliestVersion retrieves the earliest version from the database, if not found, return 0. +func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { + bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) + if err != nil || len(bz) == 0 { + return 0, err + } + ubz := binary.LittleEndian.Uint64(bz) + if ubz > math.MaxInt64 { + return 0, fmt.Errorf("earliest version in rocksdb overflows int64: %d", ubz) + } + return int64(ubz), nil } func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) { @@ -240,6 +223,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro version = 1 } + // Update latest version in batch b := NewBatch(db, version) for _, kvPair := range cs.Changeset.Pairs { @@ -254,7 +238,12 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro } } - return b.Write() + err := b.Write() + if err != nil { + return err + } + db.latestVersion = version + return nil } func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { @@ -291,10 +280,6 @@ func (db *Database) writeAsyncInBackground() { panic(err) } } - err := db.SetLatestVersion(version) - if err != nil { - panic(err) - } } } } @@ -407,7 +392,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte } start, end := util.IterateWithPrefix(prefix, nil, nil) - latestVersion, err := db.GetLatestVersion() + latestVersion, err := retrieveLatestVersion(db.storage) if err != nil { return false, err } @@ -508,18 +493,21 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo panic("implement me") } -// retrieveEarliestVersion retrieves the earliest version from the database -func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) { - bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey)) - if err != nil { - fmt.Printf("warning: rocksdb get for earliestVersionKey failed: %v", err) - return 0, nil +func (db *Database) Close() error { + if db.streamHandler != nil { + // Close the changelog stream first + db.streamHandler.Close() + // Close the pending changes channel to signal the background goroutine to stop + close(db.pendingChanges) + // Wait for the async writes to finish processing all buffered items + db.asyncWriteWG.Wait() + // Only set to nil after background goroutine has finished + db.streamHandler = nil } - if len(bz) == 0 { - // in case of a fresh database - return 0, nil - } + db.storage.Close() + db.storage = nil + db.cfHandle = nil - return int64(binary.LittleEndian.Uint64(bz)), nil + return nil } diff --git a/ss/sqlite/db.go b/ss/sqlite/db.go index d5278eb9..bc1ccd2a 100644 --- a/ss/sqlite/db.go +++ b/ss/sqlite/db.go @@ -108,10 +108,10 @@ func (db *Database) Close() error { return err } -func (db *Database) GetLatestVersion() (int64, error) { +func (db *Database) GetLatestVersion() int64 { stmt, err := db.storage.Prepare("SELECT value FROM state_storage WHERE store_key = ? AND key = ?") if err != nil { - return 0, fmt.Errorf("failed to prepare SQL statement: %w", err) + return 0 } defer func() { _ = stmt.Close() }() @@ -120,13 +120,12 @@ func (db *Database) GetLatestVersion() (int64, error) { if err := stmt.QueryRow(reservedStoreKey, keyLatestHeight).Scan(&latestHeight); err != nil { if errors.Is(err, sql.ErrNoRows) { // in case of a fresh database - return 0, nil + return 0 } - - return 0, fmt.Errorf("failed to query row: %w", err) + return 0 } - return latestHeight, nil + return latestHeight } func (db *Database) SetLatestVersion(version int64) error { @@ -138,7 +137,7 @@ func (db *Database) SetLatestVersion(version int64) error { return nil } -func (db *Database) GetEarliestVersion() (int64, error) { +func (db *Database) GetEarliestVersion() int64 { panic("not implemented") } @@ -343,3 +342,13 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error { func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { panic("implement me") } + +func (db *Database) GetLatestMigratedKey() ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (db *Database) GetLatestMigratedModule() (string, error) { + //TODO implement me + panic("implement me") +} diff --git a/ss/store.go b/ss/store.go index a472c867..5bfd9019 100644 --- a/ss/store.go +++ b/ss/store.go @@ -63,11 +63,8 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt // RecoverStateStore will be called during initialization to recover the state from rlog func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error { - ssLatestVersion, err := stateStore.GetLatestVersion() - logger.Info(fmt.Sprintf("Recovering from changelog %s at latest SS version %d", changelogPath, ssLatestVersion)) - if err != nil { - return err - } + ssLatestVersion := stateStore.GetLatestVersion() + logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion)) if ssLatestVersion <= 0 { return nil } diff --git a/ss/test/storage_test_suite.go b/ss/test/storage_test_suite.go index 11ebc1e8..c0577815 100644 --- a/ss/test/storage_test_suite.go +++ b/ss/test/storage_test_suite.go @@ -40,7 +40,7 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { db, err := s.NewDB(tempDir, s.Config) s.Require().NoError(err) - lv, err := db.GetLatestVersion() + lv := db.GetLatestVersion() s.Require().NoError(err) s.Require().Zero(lv) @@ -49,7 +49,7 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { err = db.SetLatestVersion(i) s.Require().NoError(err) - lv, err = db.GetLatestVersion() + lv = db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(i, lv) } @@ -60,9 +60,10 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { newDB, err := s.NewDB(tempDir, s.Config) s.Require().NoError(err) + defer func() { _ = newDB.Close() }() - lv, err = newDB.GetLatestVersion() + lv = newDB.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(i-1, lv) @@ -71,6 +72,7 @@ func (s *StorageTestSuite) TestDatabaseLatestVersion() { func (s *StorageTestSuite) TestDatabaseVersionedKeys() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 1, 100)) @@ -85,6 +87,7 @@ func (s *StorageTestSuite) TestDatabaseVersionedKeys() { func (s *StorageTestSuite) TestDatabaseGetVersionedKey() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() key := []byte("key") @@ -154,6 +157,7 @@ func (s *StorageTestSuite) TestDatabaseVersionZero() { // Db should write all keys at version 0 at version 1 db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(DBApplyChangeset(db, 0, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) @@ -188,6 +192,7 @@ func (s *StorageTestSuite) TestDatabaseVersionZero() { func (s *StorageTestSuite) TestDatabaseApplyChangeset() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 100, 1)) @@ -196,8 +201,8 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { cs.Pairs = []*iavl.KVPair{} // Deletes - keys := [][]byte{} - vals := [][]byte{} + var keys [][]byte + var vals [][]byte for i := 0; i < 100; i++ { if i%10 == 0 { keys = append(keys, []byte(fmt.Sprintf("key%03d", i))) @@ -206,7 +211,7 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { } s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, keys, vals)) - lv, err := db.GetLatestVersion() + lv := db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(int64(1), lv) @@ -225,6 +230,7 @@ func (s *StorageTestSuite) TestDatabaseApplyChangeset() { func (s *StorageTestSuite) TestDatabaseIteratorEmptyDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() iter, err := db.Iterator(storeKey1, 1, []byte{}, []byte{}) @@ -235,6 +241,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorEmptyDomain() { func (s *StorageTestSuite) TestDatabaseIteratorClose() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() iter, err := db.Iterator(storeKey1, 1, []byte("key000"), nil) @@ -248,6 +255,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorClose() { func (s *StorageTestSuite) TestDatabaseIteratorDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() testCases := map[string]struct { @@ -279,6 +287,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorDomain() { func (s *StorageTestSuite) TestDatabaseIterator() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 100, 1)) @@ -337,6 +346,7 @@ func (s *StorageTestSuite) TestDatabaseIterator() { func (s *StorageTestSuite) TestDatabaseIteratorRangedDeletes() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) @@ -362,6 +372,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorRangedDeletes() { func (s *StorageTestSuite) TestDatabaseIteratorDeletes() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(DBApplyChangeset(db, 1, storeKey1, [][]byte{[]byte("key001")}, [][]byte{[]byte("value001")})) @@ -398,14 +409,15 @@ func (s *StorageTestSuite) TestDatabaseIteratorDeletes() { func (s *StorageTestSuite) TestDatabaseIteratorMultiVersion() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 10, 50)) // for versions 50-100, only update even keys for v := int64(51); v <= 100; v++ { - keys := [][]byte{} - vals := [][]byte{} + var keys [][]byte + var vals [][]byte for i := 0; i < 10; i++ { if i%2 == 0 { keys = append(keys, []byte(fmt.Sprintf("key%03d", i))) @@ -444,6 +456,7 @@ func (s *StorageTestSuite) TestDatabaseIteratorMultiVersion() { func (s *StorageTestSuite) TestDatabaseBugInitialReverseIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // Forward Iteration @@ -473,6 +486,7 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIteration() { func (s *StorageTestSuite) TestDatabaseBugInitialForwardIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // Forward Iteration @@ -503,6 +517,7 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIteration() { func (s *StorageTestSuite) TestDatabaseBugInitialForwardIterationHigher() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // Less than iterator version @@ -531,6 +546,7 @@ func (s *StorageTestSuite) TestDatabaseBugInitialForwardIterationHigher() { func (s *StorageTestSuite) TestDatabaseBugInitialReverseIterationHigher() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // Reverse Iteration @@ -560,6 +576,7 @@ func (s *StorageTestSuite) TestDatabaseBugInitialReverseIterationHigher() { func (s *StorageTestSuite) TestDatabaseIteratorNoDomain() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 10, 50)) @@ -589,12 +606,13 @@ func (s *StorageTestSuite) TestDatabasePrune() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 10, 50)) // Verify earliest version is 0 - earliestVersion, err := db.GetEarliestVersion() + earliestVersion := db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(0), earliestVersion) @@ -602,11 +620,11 @@ func (s *StorageTestSuite) TestDatabasePrune() { s.Require().NoError(db.Prune(25)) // Verify earliest version is 26 (first 25 pruned) - earliestVersion, err = db.GetEarliestVersion() + earliestVersion = db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(26), earliestVersion) - latestVersion, err := db.GetLatestVersion() + latestVersion := db.GetLatestVersion() s.Require().NoError(err) s.Require().Equal(int64(50), latestVersion) @@ -635,7 +653,7 @@ func (s *StorageTestSuite) TestDatabasePrune() { s.Require().NoError(db.Prune(50)) // Verify earliest version is 51 (first 50 pruned) - earliestVersion, err = db.GetEarliestVersion() + earliestVersion = db.GetEarliestVersion() s.Require().NoError(err) s.Require().Equal(int64(51), earliestVersion) @@ -653,6 +671,7 @@ func (s *StorageTestSuite) TestDatabasePrune() { func (s *StorageTestSuite) TestDatabasePruneAndTombstone() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // write a key at three different versions 1, 100 and 200 @@ -674,6 +693,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepRecent() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() key := []byte("key000") @@ -724,6 +744,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { stateStoreConfig.KeepLastVersion = false db, err := s.NewDB(s.T().TempDir(), stateStoreConfig) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(DBApplyChangeset(db, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) @@ -756,6 +777,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { // Now reset KeepLastVersion to true and verify latest version of key exists newDB, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = newDB.Close() }() s.Require().NoError(DBApplyChangeset(newDB, 100, storeKey1, [][]byte{[]byte("key000")}, [][]byte{[]byte("value001")})) @@ -780,6 +802,7 @@ func (s *StorageTestSuite) TestDatabasePruneKeepLastVersion() { func (s *StorageTestSuite) TestDatabaseReverseIterator() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 100, 1)) @@ -835,6 +858,7 @@ func (s *StorageTestSuite) TestDatabaseReverseIterator() { func (s *StorageTestSuite) TestParallelWrites() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() latestVersion := 10 @@ -881,6 +905,7 @@ func (s *StorageTestSuite) TestParallelWrites() { func (s *StorageTestSuite) TestParallelWriteAndPruning() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() latestVersion := 100 @@ -913,8 +938,7 @@ func (s *StorageTestSuite) TestParallelWriteAndPruning() { defer wg.Done() for i := 10; i < latestVersion; i += prunePeriod { for { - v, err := db.GetLatestVersion() - s.Require().NoError(err) + v := db.GetLatestVersion() if v > int64(i) { s.Require().NoError(db.Prune(v - 1)) break @@ -942,6 +966,7 @@ func (s *StorageTestSuite) TestParallelWriteAndPruning() { func (s *StorageTestSuite) TestDatabaseParallelDeleteIteration() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 100, 100)) @@ -1015,6 +1040,7 @@ func (s *StorageTestSuite) TestDatabaseParallelDeleteIteration() { func (s *StorageTestSuite) TestDatabaseParallelWriteDelete() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 100, 1)) @@ -1078,6 +1104,7 @@ func (s *StorageTestSuite) TestParallelIterationAndPruning() { fmt.Printf("DEBUG - config %+v\n", s.Config) db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 10, 50)) @@ -1153,6 +1180,7 @@ func (s *StorageTestSuite) TestParallelIterationAndPruning() { func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() s.Require().NoError(FillData(db, 10, 100)) @@ -1201,6 +1229,7 @@ func (s *StorageTestSuite) TestDatabaseParallelIterationVersions() { func (s *StorageTestSuite) TestDatabaseImport() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() ch := make(chan types.SnapshotNode, 10) @@ -1233,6 +1262,7 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() ch := make(chan types.RawSnapshotNode, 10) @@ -1266,12 +1296,13 @@ func (s *StorageTestSuite) TestDatabaseRawImport() { } } -// Verifies that ReverseIterator(nil, nil) is clamped to the caller's prefix +// TestDatabaseReverseIteratorPrefixIsolation Verifies that ReverseIterator(nil, nil) is clamped to the caller's prefix // via prefixEnd()/UpperBound and does **not** spill into the next module. func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { db, err := s.NewDB(s.T().TempDir(), s.Config) s.Require().NoError(err) + defer func() { _ = db.Close() }() // store1 : key000-key009 @@ -1298,6 +1329,7 @@ func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { // ---------- nil / nil reverse scan on store1 ---------- itr, err := db.ReverseIterator(storeKey1, 1, nil, nil) s.Require().NoError(err) + defer func() { _ = itr.Close() }() // We should see exactly the 10 keys from store1, in reverse order, @@ -1319,6 +1351,7 @@ func (s *StorageTestSuite) TestDatabaseReverseIteratorPrefixIsolation() { itr2, err := db.ReverseIterator(storeKey2, 1, nil, nil) s.Require().NoError(err) + defer func() { _ = itr2.Close() }() count = 0 diff --git a/ss/types/store.go b/ss/types/store.go index 5fa574c5..a85d84a7 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -14,9 +14,9 @@ type StateStore interface { Iterator(storeKey string, version int64, start, end []byte) (DBIterator, error) ReverseIterator(storeKey string, version int64, start, end []byte) (DBIterator, error) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) - GetLatestVersion() (int64, error) + GetLatestVersion() int64 SetLatestVersion(version int64) error - GetEarliestVersion() (int64, error) + GetEarliestVersion() int64 SetEarliestVersion(version int64, ignoreVersion bool) error GetLatestMigratedKey() ([]byte, error) GetLatestMigratedModule() (string, error)