From e8ffc703e24699fb0892ad6993f5ee6fc61e892e Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 19 Nov 2025 14:17:20 +0000 Subject: [PATCH 1/4] Rename ServerContext databases lock/fields for clarity about mutex scope --- rest/admin_api.go | 32 +++--- rest/api_test.go | 6 +- rest/config.go | 50 ++++---- rest/config_test.go | 6 +- rest/handler_config_database.go | 4 +- rest/server_context.go | 197 ++++++++++++++++---------------- rest/serverless_test.go | 56 ++++----- rest/utilities_testing.go | 18 +-- 8 files changed, 186 insertions(+), 183 deletions(-) diff --git a/rest/admin_api.go b/rest/admin_api.go index 8e841f0608..ae03bc8391 100644 --- a/rest/admin_api.go +++ b/rest/admin_api.go @@ -111,10 +111,10 @@ func (h *handler) handleCreateDB() error { SGVersion: base.ProductVersion.String(), } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() - if _, exists := h.server.databases_[dbName]; exists { + if _, exists := h.server._databases[dbName]; exists { return base.HTTPErrorf(http.StatusPreconditionFailed, // what CouchDB returns "Duplicate database name %q", dbName) } @@ -154,7 +154,7 @@ func (h *handler) handleCreateDB() error { return base.HTTPErrorf(http.StatusInternalServerError, "couldn't save database config: %v", err) } // store the cas in the loaded config after a successful insert - h.server.dbConfigs[dbName].cfgCas = cas + h.server._dbConfigs[dbName].cfgCas = cas } else { // Intentionally pass in an empty BootstrapConfig to avoid inheriting any credentials or server when running with a legacy config (CBG-1764) if err := config.setup(h.ctx(), dbName, BootstrapConfig{}, nil, nil, false); err != nil { @@ -820,9 +820,9 @@ func (h *handler) handlePutDbConfig() (err error) { auditDbAuditEnabled(h.ctx(), dbName, previousAuditEnabled, updatedAuditEnabled, updatedAuditEvents) // store the cas in the loaded config after a successful update h.setEtag(updatedDbConfig.Version) - h.server.lock.Lock() - defer h.server.lock.Unlock() - h.server.dbConfigs[dbName].cfgCas = cas + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() + h.server._dbConfigs[dbName].cfgCas = cas base.Audit(h.ctx(), base.AuditIDUpdateDatabaseConfig, auditFields) return base.HTTPErrorf(http.StatusCreated, "updated") @@ -1145,8 +1145,8 @@ func (h *handler) handleDeleteCollectionConfigSync() error { return err } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() // TODO: Dynamic update instead of reload if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { @@ -1214,8 +1214,8 @@ func (h *handler) handlePutCollectionConfigSync() error { return err } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() // TODO: Dynamic update instead of reload if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { @@ -1314,8 +1314,8 @@ func (h *handler) handleDeleteCollectionConfigImportFilter() error { return err } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() // TODO: Dynamic update instead of reload if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { @@ -1384,8 +1384,8 @@ func (h *handler) handlePutCollectionConfigImportFilter() error { return err } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() // TODO: Dynamic update instead of reload if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { @@ -1542,7 +1542,7 @@ func (h *handler) handleGetStatus() error { status.Vendor.Version = base.ProductAPIVersion } - for _, database := range h.server.databases_ { + for _, database := range h.server._databases { lastSeq := uint64(0) runState := db.RunStateString[atomic.LoadUint32(&database.State)] diff --git a/rest/api_test.go b/rest/api_test.go index 7c463f3aa8..17892f47e3 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2989,7 +2989,7 @@ func TestCreateDBWithXattrsDisabled(t *testing.T) { resp = rt.CreateDatabase(dbName, dbConfig) RequireStatus(t, resp, http.StatusCreated) - rt.RestTesterServerContext.dbConfigs[dbName].DatabaseConfig.EnableXattrs = base.Ptr(false) + rt.RestTesterServerContext._dbConfigs[dbName].DatabaseConfig.EnableXattrs = base.Ptr(false) _, err := rt.RestTesterServerContext.ReloadDatabase(t.Context(), dbName, false) assert.Error(t, err, errResp) @@ -3482,7 +3482,7 @@ func TestAllowConflictsConfig(t *testing.T) { bucketName := rt.GetDatabase().Bucket.GetName() // Attempt to set the `AllowConflicts` property to true in the database configuration. - rt.RestTesterServerContext.dbConfigs[dbName].DatabaseConfig.DbConfig.AllowConflicts = base.Ptr(true) + rt.RestTesterServerContext._dbConfigs[dbName].DatabaseConfig.DbConfig.AllowConflicts = base.Ptr(true) // Reload the database configuration and verify that an error is returned. _, err := rt.RestTesterServerContext.ReloadDatabase(ctx, dbName, false) @@ -3525,7 +3525,7 @@ func TestDisableAllowStarChannel(t *testing.T) { RequireStatus(t, resp, http.StatusCreated) // Attempting to disable `enable_star_channel` - rt.ServerContext().dbConfigs[dbName].DatabaseConfig.CacheConfig.ChannelCacheConfig.EnableStarChannel = base.Ptr(false) + rt.ServerContext()._dbConfigs[dbName].DatabaseConfig.CacheConfig.ChannelCacheConfig.EnableStarChannel = base.Ptr(false) // Reloading the database after updating the config _, err := rt.ServerContext().ReloadDatabase(ctx, dbName, false) diff --git a/rest/config.go b/rest/config.go index 7b67faeb5b..d4bcf28d3f 100644 --- a/rest/config.go +++ b/rest/config.go @@ -1657,21 +1657,21 @@ func (sc *ServerContext) fetchAndLoadConfigs(ctx context.Context, isInitialStart // we don't need to do this two-stage lock on initial startup as the REST APIs aren't even online yet. var deletedDatabases []string if !isInitialStartup { - sc.lock.RLock() - for dbName, _ := range sc.dbRegistry { + sc._databasesLock.RLock() + for dbName, _ := range sc._dbRegistry { if _, foundMatchingDb := fetchedConfigs[dbName]; !foundMatchingDb { deletedDatabases = append(deletedDatabases, dbName) delete(fetchedConfigs, dbName) } } for dbName, fetchedConfig := range fetchedConfigs { - if dbConfig, ok := sc.dbConfigs[dbName]; ok && dbConfig.cfgCas >= fetchedConfig.cfgCas { + if dbConfig, ok := sc._dbConfigs[dbName]; ok && dbConfig.cfgCas >= fetchedConfig.cfgCas { sc.invalidDatabaseConfigTracking.remove(dbName) base.DebugfCtx(ctx, base.KeyConfig, "Database %q bucket %q config has not changed since last update", fetchedConfig.Name, *fetchedConfig.Bucket) delete(fetchedConfigs, dbName) } } - sc.lock.RUnlock() + sc._databasesLock.RUnlock() // nothing to do, we can bail out without needing the write lock if len(deletedDatabases) == 0 && len(fetchedConfigs) == 0 { @@ -1681,10 +1681,10 @@ func (sc *ServerContext) fetchAndLoadConfigs(ctx context.Context, isInitialStart } // we have databases to update/remove - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() for _, dbName := range deletedDatabases { - dbc, ok := sc.databases_[dbName] + dbc, ok := sc._databases[dbName] if !ok { base.DebugfCtx(ctx, base.KeyConfig, "Database %q already removed from server context after acquiring write lock - do not need to remove not removing database", base.MD(dbName)) continue @@ -1723,8 +1723,8 @@ func (sc *ServerContext) fetchAndLoadDatabaseSince(ctx context.Context, dbName s } func (sc *ServerContext) fetchAndLoadDatabase(nonContextStruct base.NonCancellableContext, dbName string, forceReload bool) (found bool, err error) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._fetchAndLoadDatabase(nonContextStruct, dbName, forceReload) } @@ -1809,8 +1809,8 @@ func (sc *ServerContext) findBucketWithCallback(ctx context.Context, callback fu func (sc *ServerContext) fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) { // fetch will update the databses - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._fetchDatabase(ctx, dbName) } @@ -1880,8 +1880,8 @@ func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (fou } func (sc *ServerContext) handleInvalidDatabaseConfig(ctx context.Context, bucket string, cnf DatabaseConfig) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() sc._handleInvalidDatabaseConfig(ctx, bucket, cnf, nil) } @@ -1895,9 +1895,9 @@ func (sc *ServerContext) _handleInvalidDatabaseConfig(ctx context.Context, bucke func (sc *ServerContext) bucketNameFromDbName(ctx context.Context, dbName string) (bucketName string, found bool) { // Minimal representation of config struct to be tolerant of invalid database configurations where we still need to find a database name // see if we find the database in-memory first, otherwise fall back to scanning buckets for db configs - sc.lock.RLock() - dbc, ok := sc.databases_[dbName] - sc.lock.RUnlock() + sc._databasesLock.RLock() + dbc, ok := sc._databases[dbName] + sc._databasesLock.RUnlock() if ok { return dbc.Bucket.GetName(), true @@ -1948,7 +1948,7 @@ func (sc *ServerContext) fetchConfigsSince(ctx context.Context, refreshInterval sc.fetchConfigsLastUpdate = time.Now() } - return sc.dbConfigs, nil + return sc._dbConfigs, nil } // GetBucketNames returns a slice of the bucket names associated with the server context @@ -2064,8 +2064,8 @@ func (sc *ServerContext) _applyConfigs(ctx context.Context, dbNameConfigs map[st } func (sc *ServerContext) applyConfigs(ctx context.Context, dbNameConfigs map[string]DatabaseConfig) (count int) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._applyConfigs(ctx, dbNameConfigs, false, false) } @@ -2104,13 +2104,13 @@ func (sc *ServerContext) _applyConfig(nonContextStruct base.NonCancellableContex } // skip if we already have this config loaded, and we've got a cas value to compare with - _, exists := sc.dbRegistry[cnf.Name] + _, exists := sc._dbRegistry[cnf.Name] if exists { if cnf.cfgCas == 0 { // force an update when the new config's cas was set to zero prior to load base.InfofCtx(ctx, base.KeyConfig, "Forcing update of config for database %q bucket %q", cnf.Name, *cnf.Bucket) } else { - if sc.dbConfigs[cnf.Name].cfgCas >= cnf.cfgCas { + if sc._dbConfigs[cnf.Name].cfgCas >= cnf.cfgCas { base.DebugfCtx(ctx, base.KeyConfig, "Database %q bucket %q config has not changed since last update", cnf.Name, *cnf.Bucket) return false, nil } @@ -2211,8 +2211,8 @@ func StartServer(ctx context.Context, config *StartupConfig, sc *ServerContext) } func sharedBucketDatabaseCheck(ctx context.Context, sc *ServerContext) (errors error) { - bucketUUIDToDBContext := make(map[string][]*db.DatabaseContext, len(sc.databases_)) - for _, dbContext := range sc.databases_ { + bucketUUIDToDBContext := make(map[string][]*db.DatabaseContext, len(sc._databases)) + for _, dbContext := range sc._databases { if uuid, err := dbContext.Bucket.UUID(); err == nil { bucketUUIDToDBContext[uuid] = append(bucketUUIDToDBContext[uuid], dbContext) } @@ -2272,7 +2272,7 @@ func (sc *ServerContext) _findDuplicateCollections(cnf DatabaseConfig) []string // If scopes aren't defined, check the default collection if cnf.Scopes == nil { defaultFQName := base.FullyQualifiedCollectionName(*cnf.Bucket, base.DefaultScope, base.DefaultCollection) - existingDbName, ok := sc.collectionRegistry[defaultFQName] + existingDbName, ok := sc._collectionRegistry[defaultFQName] if ok && existingDbName != cnf.Name { duplicatedCollections = append(duplicatedCollections, defaultFQName) } @@ -2280,7 +2280,7 @@ func (sc *ServerContext) _findDuplicateCollections(cnf DatabaseConfig) []string for scopeName, scope := range cnf.Scopes { for collectionName, _ := range scope.Collections { fqName := base.FullyQualifiedCollectionName(*cnf.Bucket, scopeName, collectionName) - existingDbName, ok := sc.collectionRegistry[fqName] + existingDbName, ok := sc._collectionRegistry[fqName] if ok && existingDbName != cnf.Name { duplicatedCollections = append(duplicatedCollections, fqName) } diff --git a/rest/config_test.go b/rest/config_test.go index 6a9eb089f4..84db83bf2e 100644 --- a/rest/config_test.go +++ b/rest/config_test.go @@ -2978,7 +2978,7 @@ func TestInvalidDbConfigNoLongerPresentInBucket(t *testing.T) { require.EventuallyWithT(t, func(c *assert.CollectT) { invalidDatabases := rt.ServerContext().AllInvalidDatabaseNames(t) assert.Equal(c, 1, len(invalidDatabases)) - assert.Equal(c, 0, len(rt.ServerContext().dbConfigs)) + assert.Equal(c, 0, len(rt.ServerContext()._dbConfigs)) }, time.Second*10, time.Millisecond*100) // remove the invalid config from the bucket @@ -2991,7 +2991,7 @@ func TestInvalidDbConfigNoLongerPresentInBucket(t *testing.T) { require.EventuallyWithT(t, func(c *assert.CollectT) { invalidDatabases := rt.ServerContext().AllInvalidDatabaseNames(t) assert.Equal(c, 0, len(invalidDatabases)) - assert.Equal(c, 0, len(rt.ServerContext().dbConfigs)) + assert.Equal(c, 0, len(rt.ServerContext()._dbConfigs)) }, time.Second*10, time.Millisecond*100) // create db again, should succeed @@ -3064,7 +3064,7 @@ func TestNotFoundOnInvalidDatabase(t *testing.T) { require.EventuallyWithT(t, func(c *assert.CollectT) { invalidDatabases := rt.ServerContext().AllInvalidDatabaseNames(t) assert.Equal(c, 0, len(invalidDatabases)) - assert.Equal(c, 1, len(rt.ServerContext().dbConfigs)) + assert.Equal(c, 1, len(rt.ServerContext()._dbConfigs)) }, time.Second*10, time.Millisecond*100) } diff --git a/rest/handler_config_database.go b/rest/handler_config_database.go index 04693b918c..4e4c37c848 100644 --- a/rest/handler_config_database.go +++ b/rest/handler_config_database.go @@ -83,8 +83,8 @@ func (h *handler) mutateDbConfig(mutator func(*DbConfig) error) error { return err } - h.server.lock.Lock() - defer h.server.lock.Unlock() + h.server._databasesLock.Lock() + defer h.server._databasesLock.Unlock() // TODO: Dynamic update instead of reload if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { diff --git a/rest/server_context.go b/rest/server_context.go index 9e61ffe3e0..1b7d67fac2 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -70,14 +70,16 @@ type serverInfo struct { // This struct is accessed from HTTP handlers running on multiple goroutines, so it needs to // be thread-safe. type ServerContext struct { - Config *StartupConfig // The current runtime configuration of the node - initialStartupConfig *StartupConfig // The configuration at startup of the node. Built from config file + flags - persistentConfig bool - dbRegistry map[string]struct{} // registry of dbNames, used to ensure uniqueness even when db isn't active - collectionRegistry map[string]string // map of fully qualified collection name to db name, used for local uniqueness checks - dbConfigs map[string]*RuntimeDatabaseConfig // dbConfigs is a map of db name to the RuntimeDatabaseConfig - databases_ map[string]*db.DatabaseContext // databases_ is a map of dbname to db.DatabaseContext - lock sync.RWMutex + Config *StartupConfig // The current runtime configuration of the node + initialStartupConfig *StartupConfig // The configuration at startup of the node. Built from config file + flags + persistentConfig bool + + _dbRegistry map[string]struct{} // _dbRegistry is a map of db names, used to ensure uniqueness even when db isn't active + _collectionRegistry map[string]string // _collectionRegistry is a map of fully qualified collection name to db name, used for local uniqueness checks + _dbConfigs map[string]*RuntimeDatabaseConfig // _dbConfigs is a map of db name to the RuntimeDatabaseConfig + _databases map[string]*db.DatabaseContext // _databases is a map of dbname to db.DatabaseContext + _databasesLock sync.RWMutex // Lock for _databases and other db-specific maps above + statsContext *statsContext BootstrapContext *bootstrapContext HTTPClient *http.Client @@ -93,8 +95,7 @@ type ServerContext struct { DatabaseInitManager *DatabaseInitManager // Manages database initialization (index creation and readiness) independent of database stop/start/reload, when using persistent config ActiveReplicationsCounter invalidDatabaseConfigTracking invalidDatabaseConfigs - // handle sgcollect processes for a given Server - SGCollect *sgCollect + SGCollect *sgCollect // singleton instance for this server's sgcollect_info process } type ActiveReplicationsCounter struct { @@ -162,10 +163,10 @@ func NewServerContext(ctx context.Context, config *StartupConfig, persistentConf sc := &ServerContext{ Config: config, persistentConfig: persistentConfig, - dbRegistry: map[string]struct{}{}, - collectionRegistry: map[string]string{}, - dbConfigs: map[string]*RuntimeDatabaseConfig{}, - databases_: map[string]*db.DatabaseContext{}, + _dbRegistry: map[string]struct{}{}, + _collectionRegistry: map[string]string{}, + _dbConfigs: map[string]*RuntimeDatabaseConfig{}, + _databases: map[string]*db.DatabaseContext{}, DatabaseInitManager: &DatabaseInitManager{}, HTTPClient: http.DefaultClient, statsContext: &statsContext{heapProfileEnabled: !config.HeapProfileDisableCollection}, @@ -214,8 +215,8 @@ func (sc *ServerContext) WaitForRESTAPIs(ctx context.Context) error { ctx, cancelFn := context.WithTimeout(ctx, timeout) defer cancelFn() err, _ := base.RetryLoop(ctx, "Wait for REST APIs", func() (shouldRetry bool, err error, value any) { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() if len(sc._httpServers) == len(allServers) { return false, nil, nil } @@ -234,15 +235,15 @@ func (sc *ServerContext) getServerAddr(s serverType) (string, error) { } func (sc *ServerContext) addHTTPServer(t serverType, s *serverInfo) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() sc._httpServers[t] = s } // getHTTPServer returns information about the given HTTP server. func (sc *ServerContext) getHTTPServer(t serverType) (*serverInfo, error) { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() s, ok := sc._httpServers[t] if !ok { return nil, fmt.Errorf("server type %q not found running in server context", t) @@ -275,21 +276,23 @@ func (sc *ServerContext) Close(ctx context.Context) { base.InfofCtx(ctx, base.KeyAll, "Couldn't stop background config update worker: %v", err) } - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() // close cached bootstrap bucket connections if sc.BootstrapContext != nil && sc.BootstrapContext.Connection != nil { sc.BootstrapContext.Connection.Close() } - for _, db := range sc.databases_ { + for _, db := range sc._databases { db.Close(ctx) _ = db.EventMgr.RaiseDBStateChangeEvent(ctx, db.Name, "offline", "Database context closed", &sc.Config.API.AdminInterface) } - sc.databases_ = nil + sc._databases = nil sc.invalidDatabaseConfigTracking.dbNames = nil + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() for _, s := range sc._httpServers { if s.server != nil { base.InfofCtx(ctx, base.KeyHTTP, "Closing HTTP Server: %v", s.addr) @@ -320,9 +323,9 @@ func (sc *ServerContext) GetDatabase(ctx context.Context, name string) (*db.Data // GetActiveDatabase attempts to return the DatabaseContext of a loaded database. If not found, the database name will be // validated to make sure it's valid and then an error returned. func (sc *ServerContext) GetActiveDatabase(name string) (*db.DatabaseContext, error) { - sc.lock.RLock() - dbc := sc.databases_[name] - sc.lock.RUnlock() + sc._databasesLock.RLock() + dbc := sc._databases[name] + sc._databasesLock.RUnlock() if dbc != nil { return dbc, nil } else if db.ValidateDatabaseName(name) != nil { @@ -352,9 +355,9 @@ func (sc *ServerContext) GetInactiveDatabase(ctx context.Context, name string) ( dbConfigFound, _ = sc.fetchAndLoadDatabase(base.NewNonCancelCtx(), name, false) } if dbConfigFound { - sc.lock.RLock() - defer sc.lock.RUnlock() - dbc := sc.databases_[name] + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() + dbc := sc._databases[name] if dbc != nil { return dbc, dbConfigFound, nil } @@ -393,9 +396,9 @@ func (sc *ServerContext) GetDbConfig(name string) *DbConfig { } func (sc *ServerContext) GetDatabaseConfig(name string) *RuntimeDatabaseConfig { - sc.lock.RLock() - config, ok := sc.dbConfigs[name] - sc.lock.RUnlock() + sc._databasesLock.RLock() + config, ok := sc._dbConfigs[name] + sc._databasesLock.RUnlock() if !ok { return nil } @@ -403,11 +406,11 @@ func (sc *ServerContext) GetDatabaseConfig(name string) *RuntimeDatabaseConfig { } func (sc *ServerContext) AllDatabaseNames() []string { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() - names := make([]string, 0, len(sc.databases_)) - for name := range sc.databases_ { + names := make([]string, 0, len(sc._databases)) + for name := range sc._databases { names = append(names, name) } slices.Sort(names) @@ -415,11 +418,11 @@ func (sc *ServerContext) AllDatabaseNames() []string { } func (sc *ServerContext) allDatabaseSummaries() []DbSummary { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() - dbs := make([]DbSummary, 0, len(sc.databases_)) - for name, dbctx := range sc.databases_ { + dbs := make([]DbSummary, 0, len(sc._databases)) + for name, dbctx := range sc._databases { state := db.RunStateString[atomic.LoadUint32(&dbctx.State)] summary := DbSummary{ DBName: name, @@ -441,7 +444,7 @@ func (sc *ServerContext) allDatabaseSummaries() []DbSummary { defer sc.invalidDatabaseConfigTracking.m.RUnlock() for name, invalidConfig := range sc.invalidDatabaseConfigTracking.dbNames { // skip adding any invalid dbs with no error associated with them or that exist in above list - if invalidConfig.databaseError == nil || sc.databases_[name] != nil { + if invalidConfig.databaseError == nil || sc._databases[name] != nil { continue } summary := DbSummary{ @@ -458,14 +461,14 @@ func (sc *ServerContext) allDatabaseSummaries() []DbSummary { return dbs } -// AllDatabases returns a copy of the databases_ map. +// AllDatabases returns a copy of the _databases map. func (sc *ServerContext) AllDatabases() map[string]*db.DatabaseContext { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() - databases := make(map[string]*db.DatabaseContext, len(sc.databases_)) + databases := make(map[string]*db.DatabaseContext, len(sc._databases)) - maps.Copy(databases, sc.databases_) + maps.Copy(databases, sc._databases) return databases } @@ -478,16 +481,16 @@ type PostUpgradeDatabaseResult struct { // PostUpgrade performs post-upgrade processing for each database func (sc *ServerContext) PostUpgrade(ctx context.Context, preview bool) (postUpgradeResults PostUpgradeResult, err error) { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() var errs *base.MultiError buckets := make(map[string]base.Bucket) // map of bucket name to bucket object - dbs := make(map[string]string, len(sc.databases_)) // map of db name to bucket name - dbDesignDocs := make(map[string][]string, len(sc.databases_)) // map of db name to removed design docs + dbs := make(map[string]string, len(sc._databases)) // map of db name to bucket name + dbDesignDocs := make(map[string][]string, len(sc._databases)) // map of db name to removed design docs bucketInUseIndexes := make(map[string]db.CollectionIndexes) // map of buckets to in use index names bucketRemovedIndexes := make(map[string][]string) // map of bucket name to removed index names - for dbName, database := range sc.databases_ { + for dbName, database := range sc._databases { bucketName := database.Bucket.GetName() dbs[dbName] = bucketName buckets[bucketName] = database.Bucket @@ -530,8 +533,8 @@ func (sc *ServerContext) PostUpgrade(ctx context.Context, preview bool) (postUpg } bucketRemovedIndexes[bucketName] = removedIndexes } - postUpgradeResults = make(map[string]PostUpgradeDatabaseResult, len(sc.databases_)) - for dbName, database := range sc.databases_ { + postUpgradeResults = make(map[string]PostUpgradeDatabaseResult, len(sc._databases)) + for dbName, database := range sc._databases { postUpgradeResults[dbName] = PostUpgradeDatabaseResult{ RemovedDDocs: dbDesignDocs[dbName], RemovedIndexes: bucketRemovedIndexes[database.Bucket.GetName()], @@ -544,7 +547,7 @@ func (sc *ServerContext) PostUpgrade(ctx context.Context, preview bool) (postUpg // Removes and re-adds a database to the ServerContext. func (sc *ServerContext) _reloadDatabase(ctx context.Context, reloadDbName string, failFast bool, forceOnline bool) (*db.DatabaseContext, error) { sc._unloadDatabase(ctx, reloadDbName) - config := sc.dbConfigs[reloadDbName] + config := sc._dbConfigs[reloadDbName] return sc._getOrAddDatabaseFromConfig(ctx, config.DatabaseConfig, getOrAddDatabaseConfigOptions{ useExisting: true, failFast: failFast, @@ -555,16 +558,16 @@ func (sc *ServerContext) _reloadDatabase(ctx context.Context, reloadDbName strin // Removes and re-adds a database to the ServerContext. func (sc *ServerContext) ReloadDatabase(ctx context.Context, reloadDbName string, forceOnline bool) (*db.DatabaseContext, error) { // Obtain write lock during add database, to avoid race condition when creating based on ConfigServer - sc.lock.Lock() + sc._databasesLock.Lock() dbContext, err := sc._reloadDatabase(ctx, reloadDbName, false, forceOnline) - sc.lock.Unlock() + sc._databasesLock.Unlock() return dbContext, err } func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig) error { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true, false) } @@ -586,8 +589,8 @@ func (sc *ServerContext) _reloadDatabaseWithConfig(ctx context.Context, config D // existing DatabaseContext or an error based on the useExisting flag. func (sc *ServerContext) getOrAddDatabaseFromConfig(ctx context.Context, config DatabaseConfig, options getOrAddDatabaseConfigOptions) (*db.DatabaseContext, error) { // Obtain write lock during add database, to avoid race condition when creating based on ConfigServer - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._getOrAddDatabaseFromConfig(ctx, config, options) } @@ -676,7 +679,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config return } // database exists in global map, management is deferred to REST api - _, dbRegistered := sc.databases_[dbName] + _, dbRegistered := sc._databases[dbName] if dbRegistered { return } @@ -691,7 +694,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config return nil, err } - previousDatabase := sc.databases_[dbName] + previousDatabase := sc._databases[dbName] if previousDatabase != nil { if options.useExisting { return previousDatabase, nil @@ -1032,7 +1035,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config } else { stateChangeMsg = dbLoadedStateChangeMsg + " in offline state" } - // Defer state change event to after the databases are registered. This should not matter because this function holds ServerContext.lock and databases_ can't be read while the lock is present. + // Defer state change event to after the databases are registered. This should not matter because this function holds ServerContext._databasesLock and _databases can't be read while the lock is present. defer func() { atomic.StoreUint32(&dbcontext.State, db.DBOffline) _ = dbcontext.EventMgr.RaiseDBStateChangeEvent(ctx, dbName, "offline", stateChangeMsg, &sc.Config.API.AdminInterface) @@ -1042,11 +1045,11 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config } // Register it so HTTP handlers can find it: - sc.databases_[dbcontext.Name] = dbcontext - sc.dbConfigs[dbcontext.Name] = &RuntimeDatabaseConfig{DatabaseConfig: config} - sc.dbRegistry[dbName] = struct{}{} + sc._databases[dbcontext.Name] = dbcontext + sc._dbConfigs[dbcontext.Name] = &RuntimeDatabaseConfig{DatabaseConfig: config} + sc._dbRegistry[dbName] = struct{}{} for _, name := range fqCollections { - sc.collectionRegistry[name] = dbName + sc._collectionRegistry[name] = dbName } if !startOnlineProcesses { @@ -1131,9 +1134,9 @@ func (sc *ServerContext) asyncDatabaseOnline(nonCancelCtx base.NonCancellableCon } func (sc *ServerContext) GetDbVersion(dbName string) string { - sc.lock.RLock() - defer sc.lock.RUnlock() - currentDbConfig, ok := sc.dbConfigs[dbName] + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() + currentDbConfig, ok := sc._dbConfigs[dbName] if !ok { return "" } @@ -1644,8 +1647,8 @@ func (sc *ServerContext) processEventHandlersForEvent(ctx context.Context, event // RemoveDatabase is called when an external request is made to delete the database func (sc *ServerContext) RemoveDatabase(ctx context.Context, dbName string, reason string) bool { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() // If async init is running for the database, cancel it for an external remove. (cannot be // done in _removeDatabase, as this is called during reload) @@ -1658,19 +1661,19 @@ func (sc *ServerContext) RemoveDatabase(ctx context.Context, dbName string, reas // _unloadDatabase unloads and stops the database, but does not remove the in-memory config. func (sc *ServerContext) _unloadDatabase(ctx context.Context, dbName string) bool { - dbCtx := sc.databases_[dbName] + dbCtx := sc._databases[dbName] if dbCtx == nil { return false } base.InfofCtx(ctx, base.KeyAll, "Closing db /%s (bucket %q)", base.MD(dbCtx.Name), base.MD(dbCtx.Bucket.GetName())) dbCtx.Close(ctx) - delete(sc.databases_, dbName) + delete(sc._databases, dbName) return true } // _removeDatabase unloads and removes all references to the given database. func (sc *ServerContext) _removeDatabase(ctx context.Context, dbName string) bool { - dbCtx := sc.databases_[dbName] + dbCtx := sc._databases[dbName] if dbCtx == nil { return false } @@ -1678,30 +1681,30 @@ func (sc *ServerContext) _removeDatabase(ctx context.Context, dbName string) boo if ok := sc._unloadDatabase(ctx, dbName); !ok { return ok } - delete(sc.dbConfigs, dbName) - delete(sc.dbRegistry, dbName) - for fqCollection, registryDbName := range sc.collectionRegistry { + delete(sc._dbConfigs, dbName) + delete(sc._dbRegistry, dbName) + for fqCollection, registryDbName := range sc._collectionRegistry { if dbName == registryDbName { - delete(sc.collectionRegistry, fqCollection) + delete(sc._collectionRegistry, fqCollection) } } return true } func (sc *ServerContext) _isDatabaseSuspended(dbName string) bool { - if config, loaded := sc.dbConfigs[dbName]; loaded && config.isSuspended { + if config, loaded := sc._dbConfigs[dbName]; loaded && config.isSuspended { return true } return false } func (sc *ServerContext) _suspendDatabase(ctx context.Context, dbName string) error { - dbCtx := sc.databases_[dbName] + dbCtx := sc._databases[dbName] if dbCtx == nil { return base.ErrNotFound } - config := sc.dbConfigs[dbName] + config := sc._dbConfigs[dbName] if config != nil && !base.ValDefault(config.Suspendable, sc.Config.IsServerless()) { return ErrSuspendingDisallowed } @@ -1718,20 +1721,20 @@ func (sc *ServerContext) _suspendDatabase(ctx context.Context, dbName string) er } func (sc *ServerContext) unsuspendDatabase(ctx context.Context, dbName string) (*db.DatabaseContext, error) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._unsuspendDatabase(ctx, dbName) } func (sc *ServerContext) _unsuspendDatabase(ctx context.Context, dbName string) (*db.DatabaseContext, error) { - dbCtx := sc.databases_[dbName] + dbCtx := sc._databases[dbName] if dbCtx != nil { return dbCtx, nil } // Check if database is in dbConfigs so no need to search through buckets - if dbConfig, ok := sc.dbConfigs[dbName]; ok { + if dbConfig, ok := sc._dbConfigs[dbName]; ok { if !dbConfig.isSuspended { base.WarnfCtx(ctx, "attempting to unsuspend database %q that is not suspended", base.MD(dbName)) } @@ -1748,7 +1751,7 @@ func (sc *ServerContext) _unsuspendDatabase(ctx context.Context, dbName string) if err == base.ErrNotFound { // Database no longer exists, so clean up dbConfigs base.InfofCtx(ctx, base.KeyConfig, "Database %q has been removed while suspended from bucket %q", base.MD(dbName), base.MD(bucket)) - delete(sc.dbConfigs, dbName) + delete(sc._dbConfigs, dbName) return nil, err } else if err != nil { return nil, fmt.Errorf("unsuspending db %q failed due to an error while trying to retrieve latest config from bucket %q: %w", base.MD(dbName).Redact(), base.MD(bucket).Redact(), err) @@ -1858,9 +1861,9 @@ func (sc *ServerContext) logNetworkInterfaceStats(ctx context.Context) { // Updates stats that are more efficient to calculate at stats collection time func (sc *ServerContext) updateCalculatedStats(ctx context.Context) { - sc.lock.RLock() - defer sc.lock.RUnlock() - for _, dbContext := range sc.databases_ { + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() + for _, dbContext := range sc._databases { dbState := atomic.LoadUint32(&dbContext.State) if dbState == db.DBOnline { dbContext.UpdateCalculatedStats(ctx) @@ -2334,13 +2337,13 @@ func (sc *ServerContext) getClusterUUID(ctx context.Context) (string, error) { // removeBucketAndRecreateDatabase will flush all data from the backing bucket associated with this database. Note, this will take down // other databases backed by the same bucket. It will recreate an empty database from the existing configuration. func (sc *ServerContext) removeBucketAndRecreateDatabase(ctx context.Context, dbName string, deleteFunc func(base.BucketSpec) error) error { - sc.lock.Lock() - defer sc.lock.Unlock() - config, ok := sc.dbConfigs[dbName] + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() + config, ok := sc._dbConfigs[dbName] if !ok { return fmt.Errorf("no config found for database %q", dbName) } - for otherDBName, dbCtx := range sc.databases_ { + for otherDBName, dbCtx := range sc._databases { if dbCtx.Bucket.GetName() == *config.Bucket { // If async init is running for the database, cancel it for an external remove. (cannot be // done in _removeDatabase, as this is called during reload) @@ -2376,7 +2379,7 @@ func (sc *ServerContext) removeBucketAndRecreateDatabase(ctx context.Context, db return err } // store cas in db config after update - sc.dbConfigs[dbName].cfgCas = cas + sc._dbConfigs[dbName].cfgCas = cas } else { // Re-open database and add to Sync Gateway _, err := sc._getOrAddDatabaseFromConfig(ctx, config.DatabaseConfig, @@ -2393,7 +2396,7 @@ func (sc *ServerContext) removeBucketAndRecreateDatabase(ctx context.Context, db func (sc *ServerContext) getBucketCCVSettings() map[string]bool { bucketCCVSettings := make(map[string]bool) - for _, _db := range sc.databases_ { + for _, _db := range sc._databases { bucketName := _db.BucketSpec.BucketName bucketCCVSettings[bucketName] = _db.CachedCCVEnabled.Load() } diff --git a/rest/serverless_test.go b/rest/serverless_test.go index 95e2c9bfdb..dedf9689c9 100644 --- a/rest/serverless_test.go +++ b/rest/serverless_test.go @@ -293,9 +293,9 @@ func TestServerlessSuspendDatabase(t *testing.T) { RequireStatus(t, resp, http.StatusCreated) assert.False(t, sc.isDatabaseSuspended(t, "db")) - assert.NotNil(t, sc.databases_["db"]) - assert.NotNil(t, sc.dbRegistry["db"]) - assert.NotNil(t, sc.dbConfigs["db"]) + assert.NotNil(t, sc._databases["db"]) + assert.NotNil(t, sc._dbRegistry["db"]) + assert.NotNil(t, sc._dbConfigs["db"]) // Unsuspend db that is not suspended should just return db context dbCtx, err := sc.unsuspendDatabase(rt.Context(), "db") @@ -312,38 +312,38 @@ func TestServerlessSuspendDatabase(t *testing.T) { // Make sure database is suspended assert.True(t, sc.isDatabaseSuspended(t, "db")) - assert.Nil(t, sc.databases_["db"]) - assert.NotNil(t, sc.dbRegistry["db"]) - assert.NotNil(t, sc.dbConfigs["db"]) + assert.Nil(t, sc._databases["db"]) + assert.NotNil(t, sc._dbRegistry["db"]) + assert.NotNil(t, sc._dbConfigs["db"]) // Update config in bucket to see if unsuspending check for updates - sc.dbConfigs["db"].EnableXattrs = base.Ptr(true) // xattrs must be enabled + sc._dbConfigs["db"].EnableXattrs = base.Ptr(true) // xattrs must be enabled cas, err := sc.BootstrapContext.UpdateConfig(base.TestCtx(t), tb.GetName(), sc.Config.Bootstrap.ConfigGroupID, "db", func(bucketDbConfig *DatabaseConfig) (updatedConfig *DatabaseConfig, err error) { - config := sc.dbConfigs["db"].ToDatabaseConfig() + config := sc._dbConfigs["db"].ToDatabaseConfig() config.cfgCas = bucketDbConfig.cfgCas return config, nil }) require.NoError(t, err) - assert.NotEqual(t, cas, sc.dbConfigs["db"].cfgCas) + assert.NotEqual(t, cas, sc._dbConfigs["db"].cfgCas) // Unsuspend db dbCtx, err = sc.unsuspendDatabase(rt.Context(), "db") require.NoError(t, err) assert.NotNil(t, dbCtx) assert.False(t, sc.isDatabaseSuspended(t, "db")) - assert.NotNil(t, sc.databases_["db"]) - assert.NotNil(t, sc.dbRegistry["db"]) - require.NotNil(t, sc.dbConfigs["db"]) + assert.NotNil(t, sc._databases["db"]) + assert.NotNil(t, sc._dbRegistry["db"]) + require.NotNil(t, sc._dbConfigs["db"]) // Make sure updated config is being used - assert.Equal(t, cas, sc.dbConfigs["db"].cfgCas) + assert.Equal(t, cas, sc._dbConfigs["db"].cfgCas) // Attempt unsuspend of invalid db dbCtx, err = sc.unsuspendDatabase(rt.Context(), "invalid") require.Error(t, err) assert.Nil(t, dbCtx) - assert.Nil(t, sc.databases_["invalid"]) - assert.Nil(t, sc.dbConfigs["invalid"]) + assert.Nil(t, sc._databases["invalid"]) + assert.Nil(t, sc._dbConfigs["invalid"]) } // Confirms that when the database config is not in sc.dbConfigs, the fetch callback is check if the config is in a bucket @@ -375,15 +375,15 @@ func TestServerlessUnsuspendFetchFallback(t *testing.T) { // Suspend the database and remove it from dbConfigs, forcing unsuspendDatabase to fetch config from the bucket err := sc.suspendDatabase(t, rt.Context(), "db") assert.NoError(t, err) - delete(sc.dbConfigs, "db") - delete(sc.dbRegistry, "db") - assert.Nil(t, sc.databases_["db"]) + delete(sc._dbConfigs, "db") + delete(sc._dbRegistry, "db") + assert.Nil(t, sc._databases["db"]) // Unsuspend db and confirm unsuspending worked dbCtx, err := sc.GetDatabase(rt.Context(), "db") assert.NoError(t, err) assert.NotNil(t, dbCtx) - assert.NotNil(t, sc.databases_["db"]) + assert.NotNil(t, sc._databases["db"]) // Attempt to get invalid database _, err = sc.GetDatabase(rt.Context(), "invalid") @@ -501,15 +501,15 @@ func TestServerlessUpdateSuspendedDb(t *testing.T) { assert.NoError(t, sc.suspendDatabase(t, rt.Context(), "db")) // Update database config newCas, err := sc.BootstrapContext.UpdateConfig(base.TestCtx(t), tb.GetName(), sc.Config.Bootstrap.ConfigGroupID, "db", func(bucketDbConfig *DatabaseConfig) (updatedConfig *DatabaseConfig, err error) { - config := sc.dbConfigs["db"].ToDatabaseConfig() + config := sc._dbConfigs["db"].ToDatabaseConfig() config.cfgCas = bucketDbConfig.cfgCas return config, nil }) require.NoError(t, err) // Confirm dbConfig cas did not update yet in SG, or get unsuspended - assert.NotEqual(t, sc.dbConfigs["db"].cfgCas, newCas) + assert.NotEqual(t, sc._dbConfigs["db"].cfgCas, newCas) assert.True(t, sc.isDatabaseSuspended(t, "db")) - assert.Nil(t, sc.databases_["db"]) + assert.Nil(t, sc._databases["db"]) // Trigger update frequency (would usually happen every ConfigUpdateFrequency seconds) count, err := sc.fetchAndLoadConfigs(rt.Context(), false) require.NoError(t, err) @@ -517,7 +517,7 @@ func TestServerlessUpdateSuspendedDb(t *testing.T) { // Make sure database is still suspended assert.True(t, sc.isDatabaseSuspended(t, "db")) - assert.Nil(t, sc.databases_["db"]) + assert.Nil(t, sc._databases["db"]) } // Tests scenarios a database is and is not allowed to suspend @@ -626,7 +626,7 @@ func TestServerlessUnsuspendAPI(t *testing.T) { // Confirm db is suspended require.True(t, sc.isDatabaseSuspended(t, "db")) - require.Nil(t, sc.databases_["db"]) + require.Nil(t, sc._databases["db"]) // Attempt to unsuspend using unauthenticated public API request resp = rt.SendRequest(http.MethodGet, "/db/doc", "") @@ -634,7 +634,7 @@ func TestServerlessUnsuspendAPI(t *testing.T) { // Confirm db is unsuspended require.False(t, sc.isDatabaseSuspended(t, "db")) - require.NotNil(t, sc.databases_["db"]) + require.NotNil(t, sc._databases["db"]) } // Makes sure admin API calls do not unsuspend DB if they fail authentication @@ -662,18 +662,18 @@ func TestServerlessUnsuspendAdminAuth(t *testing.T) { // Confirm db is suspended require.True(t, sc.isDatabaseSuspended(t, "db")) - require.Nil(t, sc.databases_["db"]) + require.Nil(t, sc._databases["db"]) // Confirm unauthenticated admin request does not trigger unsuspend resp = rt.SendAdminRequest(http.MethodGet, "/db/doc", "") AssertStatus(t, resp, http.StatusUnauthorized) - require.Nil(t, sc.databases_["db"]) // Confirm suspended + require.Nil(t, sc._databases["db"]) // Confirm suspended require.True(t, sc.isDatabaseSuspended(t, "db")) // Confirm authenticated admin request triggers unsuspend resp = rt.SendAdminRequestWithAuth(http.MethodGet, "/db/doc", "", base.TestClusterUsername(), base.TestClusterPassword()) AssertStatus(t, resp, http.StatusNotFound) - require.NotNil(t, sc.databases_["db"]) // Confirm unsuspended + require.NotNil(t, sc._databases["db"]) // Confirm unsuspended require.False(t, sc.isDatabaseSuspended(t, "db")) // Attempt to get DB that does not exist diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 9dca79b4dc..e460ebcbfc 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2412,20 +2412,20 @@ func MarshalConfig(t *testing.T, config db.ReplicationConfig) string { } func (sc *ServerContext) isDatabaseSuspended(t *testing.T, dbName string) bool { - sc.lock.RLock() - defer sc.lock.RUnlock() + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() return sc._isDatabaseSuspended(dbName) } func (sc *ServerContext) getBucketSpec(dbName string) base.BucketSpec { - sc.lock.RLock() - defer sc.lock.RUnlock() - return sc.databases_[dbName].BucketSpec + sc._databasesLock.RLock() + defer sc._databasesLock.RUnlock() + return sc._databases[dbName].BucketSpec } func (sc *ServerContext) suspendDatabase(t *testing.T, ctx context.Context, dbName string) error { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._suspendDatabase(ctx, dbName) } @@ -2755,8 +2755,8 @@ func JsonToMap(t *testing.T, jsonStr string) map[string]any { // reloadDatabaseWithConfigLoadFromBucket forces reload of db as if it was being picked up from the bucket func (sc *ServerContext) reloadDatabaseWithConfigLoadFromBucket(nonContextStruct base.NonCancellableContext, config DatabaseConfig) error { - sc.lock.Lock() - defer sc.lock.Unlock() + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true, true) } From ce2cd1247f889aea0d1b72605c1a94fb03daf546 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 19 Nov 2025 14:19:02 +0000 Subject: [PATCH 2/4] Move ServerContext serverInfo map to newly scoped httpServers mutex --- rest/server_context.go | 45 ++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/rest/server_context.go b/rest/server_context.go index 1b7d67fac2..0d20569d54 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -80,19 +80,22 @@ type ServerContext struct { _databases map[string]*db.DatabaseContext // _databases is a map of dbname to db.DatabaseContext _databasesLock sync.RWMutex // Lock for _databases and other db-specific maps above - statsContext *statsContext - BootstrapContext *bootstrapContext - HTTPClient *http.Client - cpuPprofFileMutex sync.Mutex // Protect cpuPprofFile from concurrent Start and Stop CPU profiling requests - cpuPprofFile *os.File // An open file descriptor holds the reference during CPU profiling - _httpServers map[serverType]*serverInfo // A list of HTTP servers running under the ServerContext - GoCBAgent *gocbcore.Agent // GoCB Agent to use when obtaining management endpoints - NoX509HTTPClient *http.Client // httpClient for the cluster that doesn't include x509 credentials, even if they are configured for the cluster - hasStarted chan struct{} // A channel that is closed via PostStartup once the ServerContext has fully started - LogContextID string // ID to differentiate log messages from different server context - fetchConfigsLastUpdate time.Time // The last time fetchConfigsWithTTL() updated dbConfigs - allowScopesInPersistentConfig bool // Test only backdoor to allow scopes in persistent config, not supported for multiple databases with different collections targeting the same bucket - DatabaseInitManager *DatabaseInitManager // Manages database initialization (index creation and readiness) independent of database stop/start/reload, when using persistent config + statsContext *statsContext + BootstrapContext *bootstrapContext + HTTPClient *http.Client + cpuPprofFileMutex sync.Mutex // Protect cpuPprofFile from concurrent Start and Stop CPU profiling requests + cpuPprofFile *os.File // An open file descriptor holds the reference during CPU profiling + + _httpServers map[serverType]*serverInfo // A list of HTTP servers running under the ServerContext + _httpServersLock sync.RWMutex // Lock for managing access to _httpServers + + GoCBAgent *gocbcore.Agent // GoCB Agent to use when obtaining management endpoints + NoX509HTTPClient *http.Client // httpClient for the cluster that doesn't include x509 credentials, even if they are configured for the cluster + hasStarted chan struct{} // A channel that is closed via PostStartup once the ServerContext has fully started + LogContextID string // ID to differentiate log messages from different server context + fetchConfigsLastUpdate time.Time // The last time fetchConfigsWithTTL() updated dbConfigs + allowScopesInPersistentConfig bool // Test only backdoor to allow scopes in persistent config, not supported for multiple databases with different collections targeting the same bucket + DatabaseInitManager *DatabaseInitManager // Manages database initialization (index creation and readiness) independent of database stop/start/reload, when using persistent config ActiveReplicationsCounter invalidDatabaseConfigTracking invalidDatabaseConfigs SGCollect *sgCollect // singleton instance for this server's sgcollect_info process @@ -215,8 +218,8 @@ func (sc *ServerContext) WaitForRESTAPIs(ctx context.Context) error { ctx, cancelFn := context.WithTimeout(ctx, timeout) defer cancelFn() err, _ := base.RetryLoop(ctx, "Wait for REST APIs", func() (shouldRetry bool, err error, value any) { - sc._databasesLock.RLock() - defer sc._databasesLock.RUnlock() + sc._httpServersLock.RLock() + defer sc._httpServersLock.RUnlock() if len(sc._httpServers) == len(allServers) { return false, nil, nil } @@ -235,15 +238,15 @@ func (sc *ServerContext) getServerAddr(s serverType) (string, error) { } func (sc *ServerContext) addHTTPServer(t serverType, s *serverInfo) { - sc._databasesLock.Lock() - defer sc._databasesLock.Unlock() + sc._httpServersLock.Lock() + defer sc._httpServersLock.Unlock() sc._httpServers[t] = s } // getHTTPServer returns information about the given HTTP server. func (sc *ServerContext) getHTTPServer(t serverType) (*serverInfo, error) { - sc._databasesLock.RLock() - defer sc._databasesLock.RUnlock() + sc._httpServersLock.RLock() + defer sc._httpServersLock.RUnlock() s, ok := sc._httpServers[t] if !ok { return nil, fmt.Errorf("server type %q not found running in server context", t) @@ -291,8 +294,8 @@ func (sc *ServerContext) Close(ctx context.Context) { sc._databases = nil sc.invalidDatabaseConfigTracking.dbNames = nil - sc._databasesLock.Lock() - defer sc._databasesLock.Unlock() + sc._httpServersLock.Lock() + defer sc._httpServersLock.Unlock() for _, s := range sc._httpServers { if s.server != nil { base.InfofCtx(ctx, base.KeyHTTP, "Closing HTTP Server: %v", s.addr) From ca69b405f5f0444789b68f534fb93ca7c138d953 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 19 Nov 2025 18:41:49 +0000 Subject: [PATCH 3/4] Rearrange code in ServerContext Close --- rest/server_context.go | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/rest/server_context.go b/rest/server_context.go index 0d20569d54..df7b43839d 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -267,33 +267,41 @@ func (sc *ServerContext) PostStartup() { const serverContextStopMaxWait = 30 * time.Second func (sc *ServerContext) Close(ctx context.Context) { - - err := base.TerminateAndWaitForClose(sc.statsContext.terminator, sc.statsContext.doneChan, serverContextStopMaxWait) - if err != nil { - base.InfofCtx(ctx, base.KeyAll, "Couldn't stop stats logger: %v", err) - } + // stop HTTP servers - prevents any further requests from coming in before we continue with tearing down everything else + sc.stopHTTPServers(ctx) // stop the config polling - err = base.TerminateAndWaitForClose(sc.BootstrapContext.terminator, sc.BootstrapContext.doneChan, serverContextStopMaxWait) - if err != nil { + if err := base.TerminateAndWaitForClose(sc.BootstrapContext.terminator, sc.BootstrapContext.doneChan, serverContextStopMaxWait); err != nil { base.InfofCtx(ctx, base.KeyAll, "Couldn't stop background config update worker: %v", err) } - sc._databasesLock.Lock() - defer sc._databasesLock.Unlock() - - // close cached bootstrap bucket connections + // close cached bootstrap bucket connections for config polling if sc.BootstrapContext != nil && sc.BootstrapContext.Connection != nil { sc.BootstrapContext.Connection.Close() } + if agent := sc.GoCBAgent; agent != nil { + if err := agent.Close(); err != nil { + base.WarnfCtx(ctx, "Error closing agent connection: %v", err) + } + } + + if err := base.TerminateAndWaitForClose(sc.statsContext.terminator, sc.statsContext.doneChan, serverContextStopMaxWait); err != nil { + base.InfofCtx(ctx, base.KeyAll, "Couldn't stop stats logger: %v", err) + } + + // close all databases + sc._databasesLock.Lock() + defer sc._databasesLock.Unlock() for _, db := range sc._databases { db.Close(ctx) _ = db.EventMgr.RaiseDBStateChangeEvent(ctx, db.Name, "offline", "Database context closed", &sc.Config.API.AdminInterface) } sc._databases = nil sc.invalidDatabaseConfigTracking.dbNames = nil +} +func (sc *ServerContext) stopHTTPServers(ctx context.Context) { sc._httpServersLock.Lock() defer sc._httpServersLock.Unlock() for _, s := range sc._httpServers { @@ -305,12 +313,6 @@ func (sc *ServerContext) Close(ctx context.Context) { } } sc._httpServers = nil - - if agent := sc.GoCBAgent; agent != nil { - if err := agent.Close(); err != nil { - base.WarnfCtx(ctx, "Error closing agent connection: %v", err) - } - } } // GetDatabase attempts to return the DatabaseContext of the database. It will load the database if necessary. From 4d6cdd404da228a9821972202f954e4dce810561 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 26 Nov 2025 12:49:57 +0000 Subject: [PATCH 4/4] Make it obvious that the tests are modifying the stored database config and loading it to simulate upgrade/old valid but now invalid configs --- rest/api_test.go | 14 ++++++++++---- rest/server_context.go | 6 ++---- rest/utilities_testing.go | 9 +++++++++ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/rest/api_test.go b/rest/api_test.go index 17892f47e3..86da06f8d6 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2989,7 +2989,9 @@ func TestCreateDBWithXattrsDisabled(t *testing.T) { resp = rt.CreateDatabase(dbName, dbConfig) RequireStatus(t, resp, http.StatusCreated) - rt.RestTesterServerContext._dbConfigs[dbName].DatabaseConfig.EnableXattrs = base.Ptr(false) + rt.updatePersistedConfig(dbName, func(config *DatabaseConfig) { + config.EnableXattrs = base.Ptr(false) + }) _, err := rt.RestTesterServerContext.ReloadDatabase(t.Context(), dbName, false) assert.Error(t, err, errResp) @@ -3481,8 +3483,10 @@ func TestAllowConflictsConfig(t *testing.T) { require.Equal(t, fmt.Sprintf(`[{"db_name":"%s","bucket":"%s","state":"Online"}]`, rt.GetDatabase().Name, rt.GetDatabase().Bucket.GetName()), resp.Body.String()) bucketName := rt.GetDatabase().Bucket.GetName() - // Attempt to set the `AllowConflicts` property to true in the database configuration. - rt.RestTesterServerContext._dbConfigs[dbName].DatabaseConfig.DbConfig.AllowConflicts = base.Ptr(true) + // Set the `AllowConflicts` property to true in the persisted database configuration and try to load it (this covers cases where the config value was previously set from an older SGW version that allowed this configuration). + rt.updatePersistedConfig(dbName, func(config *DatabaseConfig) { + config.DbConfig.AllowConflicts = base.Ptr(true) + }) // Reload the database configuration and verify that an error is returned. _, err := rt.RestTesterServerContext.ReloadDatabase(ctx, dbName, false) @@ -3525,7 +3529,9 @@ func TestDisableAllowStarChannel(t *testing.T) { RequireStatus(t, resp, http.StatusCreated) // Attempting to disable `enable_star_channel` - rt.ServerContext()._dbConfigs[dbName].DatabaseConfig.CacheConfig.ChannelCacheConfig.EnableStarChannel = base.Ptr(false) + rt.updatePersistedConfig(dbName, func(config *DatabaseConfig) { + config.CacheConfig.ChannelCacheConfig.EnableStarChannel = base.Ptr(false) + }) // Reloading the database after updating the config _, err := rt.ServerContext().ReloadDatabase(ctx, dbName, false) diff --git a/rest/server_context.go b/rest/server_context.go index df7b43839d..2ab73b411f 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -564,10 +564,8 @@ func (sc *ServerContext) _reloadDatabase(ctx context.Context, reloadDbName strin func (sc *ServerContext) ReloadDatabase(ctx context.Context, reloadDbName string, forceOnline bool) (*db.DatabaseContext, error) { // Obtain write lock during add database, to avoid race condition when creating based on ConfigServer sc._databasesLock.Lock() - dbContext, err := sc._reloadDatabase(ctx, reloadDbName, false, forceOnline) - sc._databasesLock.Unlock() - - return dbContext, err + defer sc._databasesLock.Unlock() + return sc._reloadDatabase(ctx, reloadDbName, false, forceOnline) } func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig) error { diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index e460ebcbfc..a1fe04d1fa 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2615,6 +2615,15 @@ func (rt *RestTester) NewDbConfig() DbConfig { return config } +// updatePersistedConfig is a helper function to update the persisted db config for a given db in the rest tester, bypassing the REST API and not (yet) reloading the database. +// this can be used to test upgrades or changes in behaviour on older configurations (i.e. change a config to an older state that may not pass new validation rules) +func (rt *RestTester) updatePersistedConfig(dbName string, updateFunc func(*DatabaseConfig)) { + // it's safe to just hold the read lock to fetch 'dbName' entry, since we're not modifying the map itself (adding or removing a database by name) + rt.ServerContext()._databasesLock.RLock() + defer rt.ServerContext()._databasesLock.RUnlock() + updateFunc(&rt.ServerContext()._dbConfigs[dbName].DatabaseConfig) +} + func setChannelsAllCollections(dbConfig DbConfig, principal *auth.PrincipalConfig, channels ...string) { if dbConfig.Scopes == nil { principal.ExplicitChannels = base.SetOf(channels...)