From e83d7799867db63470304c8bdaa1c6cc6ce7b91f Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Dec 2025 14:14:03 -0500 Subject: [PATCH 1/3] Create an abstract DCP client, support rosmar DCP --- base/abstract_dcp_client.go | 109 +++++++ base/collection.go | 3 +- base/collection_common.go | 17 ++ base/dcp_client.go | 105 ++++--- base/dcp_client_metadata.go | 1 - base/dcp_client_stream_observer.go | 28 +- base/dcp_client_test.go | 288 ++++++++---------- base/dcp_common.go | 12 +- base/dcp_common_test.go | 4 +- base/dcp_sharded.go | 6 +- base/gocb_dcp_feed.go | 94 ++---- base/rosmar_dcp_client.go | 80 +++++ base/util_testing.go | 7 + db/attachment_compaction.go | 95 ++---- db/attachment_compaction_test.go | 17 +- db/background_mgr.go | 3 +- db/background_mgr_attachment_migration.go | 73 +++-- ...ackground_mgr_attachment_migration_test.go | 13 - db/background_mgr_resync_dcp.go | 58 ++-- db/background_mgr_resync_dcp_test.go | 28 -- db/change_listener.go | 83 ++--- db/import_listener.go | 34 +-- db/util_testing.go | 49 ++- rest/adminapitest/resync_test.go | 3 +- .../attachment_compaction_api_test.go | 4 +- tools/cache_perf_tool/dcpDataGeneration.go | 7 +- 26 files changed, 609 insertions(+), 612 deletions(-) create mode 100644 base/abstract_dcp_client.go create mode 100644 base/rosmar_dcp_client.go diff --git a/base/abstract_dcp_client.go b/base/abstract_dcp_client.go new file mode 100644 index 0000000000..45449e8e3a --- /dev/null +++ b/base/abstract_dcp_client.go @@ -0,0 +1,109 @@ +// Copyright 2025-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package base + +import ( + "context" + "expvar" + "fmt" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbaselabs/rosmar" +) + +// DCPClient is an interface for all DCP implementations. +type DCPClient interface { + // Start will start the DCP feed. It returns a channel marking the end of the feed. + Start(ctx context.Context) (chan error, error) + // Close will shut down the DCP feed. + Close() + // GetMetadata returns the current DCP metadata. + GetMetadata() []DCPMetadata + // GetMetadataKeyPrefix returns the key prefix used for storing any persistent data. + GetMetadataKeyPrefix() string +} + +// DCPClientOptions are options for creating a DCPClient. +type DCPClientOptions struct { + FeedPrefix string // name of the DCP feed, used for logging locally and stored by Couchbase Server + Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events + DBStats *expvar.Map // these options are used only for gocbcore implementation, these stats are not shared by prometheus stats + CheckpointPrefix string // start of the checkpoint documents + CollectionNames CollectionNames // scopes and collections to monitor + InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with + MetadataStoreType DCPMetadataStoreType // persistent or in memory storage + OneShot bool // if true, the feed runs to latest document found when the client is started + FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition + Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option. + FromLatestSequence bool // If true, start at latest sequence. +} + +// NewDCPClient creates a new DCPClient to receive events from a bucket. +func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) { + if opts.FeedPrefix == "" { + return nil, fmt.Errorf("DCPClientOptions.IDPrefix must be provided") + } else if bucket == nil { + return nil, fmt.Errorf("bucket must be provided") + } else if opts.Callback == nil { + return nil, fmt.Errorf("DCPClientOptions.Callback must be provided") + } else if len(opts.CollectionNames) == 0 { + return nil, fmt.Errorf("DCPClientOptions.CollectionNames must be provided") + } else if opts.FromLatestSequence && len(opts.InitialMetadata) > 0 { + return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true") + } else if opts.MetadataStoreType == DCPMetadataStoreInMemory && opts.CheckpointPrefix != "" { + return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory") + } + underlyingBucket := GetBaseBucket(bucket) + if _, ok := underlyingBucket.(*rosmar.Bucket); ok { + return NewRosmarDCPClient(bucket, opts) + } else if gocbBucket, ok := underlyingBucket.(*GocbV2Bucket); ok { + return newGocbDCPClient(ctx, gocbBucket, opts) + } + return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket) +} + +// StartDCPFeed creates and starts a DCP feed. This function will return as soon as the feed is started. doneChan is +// sent a single error value when the feed terminates. +func StartDCPFeed(ctx context.Context, bucket Bucket, opts DCPClientOptions) (doneChan <-chan error, err error) { + client, err := NewDCPClient(ctx, bucket, opts) + if err != nil { + return nil, err + } + bucketName := bucket.GetName() + feedName := opts.FeedPrefix + + doneChan, err = client.Start(ctx) + if err != nil { + ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) + client.Close() + ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) + if doneChan != nil { + <-doneChan + } + return nil, err + } + InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) + go func() { + select { + case err := <-doneChan: + WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err) + // FIXME: close dbContext here + break + case <-opts.Terminator: + InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) + client.Close() + dcpCloseErr := <-doneChan + if dcpCloseErr != nil { + WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) + } + break + } + }() + return doneChan, err +} diff --git a/base/collection.go b/base/collection.go index 4994b1acb0..7f3fa05260 100644 --- a/base/collection.go +++ b/base/collection.go @@ -263,8 +263,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool { } func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error { - groupID := "" - return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID) + return errors.New("GocbV2Bucket does not support StartDCPFeed; use NewDCPClient instead") } func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) { diff --git a/base/collection_common.go b/base/collection_common.go index 7c4e4a3cf0..1aa0a5e54c 100644 --- a/base/collection_common.go +++ b/base/collection_common.go @@ -12,6 +12,7 @@ package base import ( "errors" + "slices" sgbucket "github.com/couchbase/sg-bucket" ) @@ -20,6 +21,9 @@ var ErrCollectionsUnsupported = errors.New("collections not supported") type ScopeAndCollectionName = sgbucket.DataStoreNameImpl +// CollectionNames is map of scope name to slice of collection names. +type CollectionNames map[string][]string + func DefaultScopeAndCollectionName() ScopeAndCollectionName { return ScopeAndCollectionName{Scope: DefaultScope, Collection: DefaultCollection} } @@ -45,3 +49,16 @@ func (s ScopeAndCollectionNames) ScopeAndCollectionNames() []string { func FullyQualifiedCollectionName(bucketName, scopeName, collectionName string) string { return bucketName + "." + scopeName + "." + collectionName } + +// Add adds any collections to the collections. Any duplicates will be ignored. +func (c CollectionNames) Add(ds ...sgbucket.DataStoreName) { + for _, d := range ds { + if _, ok := c[d.ScopeName()]; !ok { + c[d.ScopeName()] = []string{} + } else if slices.Contains(c[d.ScopeName()], d.CollectionName()) { + // avoid duplicates + continue + } + c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName()) + } +} diff --git a/base/dcp_client.go b/base/dcp_client.go index 029f40e391..41de6f6e02 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -40,17 +40,17 @@ type endStreamCallbackFunc func(e endStreamEvent) var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set") -type DCPClient struct { +type GoCBDCPClient struct { ctx context.Context - ID string // unique ID for DCPClient - used for DCP stream name, must be unique - agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to DCPClient stream observer implementation + ID string // unique ID for GoCBDCPClient - used for DCP stream name, must be unique + agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to GoCBDCPClient stream observer implementation callback sgbucket.FeedEventCallbackFunc // Callback invoked on DCP mutations/deletions workers []*DCPWorker // Workers for concurrent processing of incoming mutations and callback. vbuckets are partitioned across workers - workersWg sync.WaitGroup // Active workers WG - used for signaling when the DCPClient workers have all stopped so the doneChannel can be closed + workersWg sync.WaitGroup // Active workers WG - used for signaling when the GoCBDCPClient workers have all stopped so the doneChannel can be closed spec BucketSpec // Bucket spec for the target data store supportsCollections bool // Whether the target data store supports collections numVbuckets uint16 // number of vbuckets on target data store - terminator chan bool // Used to close worker goroutines spawned by the DCPClient + terminator chan bool // Used to close worker goroutines spawned by the GoCBDCPClient doneChannel chan error // Returns nil on successful completion of one-shot feed or external close of feed, error otherwise metadata DCPMetadataStore // Implementation of DCPMetadataStore for metadata persistence activeVbuckets map[uint16]struct{} // vbuckets that have an open stream @@ -67,21 +67,20 @@ type DCPClient struct { collectionIDs []uint32 // collectionIDs used by gocbcore, if empty, uses default collections } -type DCPClientOptions struct { +type GoCBDCPClientOptions struct { NumWorkers int OneShot bool FailOnRollback bool // When true, the DCP client will terminate on DCP rollback InitialMetadata []DCPMetadata // When set, will be used as initial metadata for the DCP feed. Will override any persisted metadata CheckpointPersistFrequency *time.Duration // Overrides metadata persistence frequency - intended for test use MetadataStoreType DCPMetadataStoreType // define storage type for DCPMetadata - GroupID string // specify GroupID, only used when MetadataStoreType is DCPMetadataCS DbStats *expvar.Map // Optional stats AgentPriority gocbcore.DcpAgentPriority // agentPriority specifies the priority level for a dcp stream CollectionIDs []uint32 // CollectionIDs used by gocbcore, if empty, uses default collections CheckpointPrefix string } -func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) { +func NewGocbDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) { numVbuckets, err := bucket.GetMaxVbno() if err != nil { @@ -91,7 +90,7 @@ func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCal return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets) } -func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) { +func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) { numWorkers := DefaultNumWorkers if options.NumWorkers > 0 { @@ -106,21 +105,21 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata") } } - client := &DCPClient{ - ctx: ctx, - workers: make([]*DCPWorker, numWorkers), - numVbuckets: numVbuckets, - callback: callback, - ID: ID, - spec: bucket.GetSpec(), - supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections), - terminator: make(chan bool), - doneChannel: make(chan error, 1), - failOnRollback: options.FailOnRollback, - checkpointPrefix: options.CheckpointPrefix, - dbStats: options.DbStats, - agentPriority: options.AgentPriority, - collectionIDs: options.CollectionIDs, + client := &GoCBDCPClient{ + workers: make([]*DCPWorker, numWorkers), + numVbuckets: numVbuckets, + callback: callback, + ID: ID, + spec: bucket.GetSpec(), + supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections), + terminator: make(chan bool), + doneChannel: make(chan error, 1), + failOnRollback: options.FailOnRollback, + checkpointPrefix: options.CheckpointPrefix, + dbStats: options.DbStats, + agentPriority: options.AgentPriority, + collectionIDs: options.CollectionIDs, + checkpointPersistFrequency: options.CheckpointPersistFrequency, } // Initialize active vbuckets @@ -155,7 +154,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke } // getCollectionHighSeqNo returns the highSeqNo for a given KV collection ID. -func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) { +func (dc *GoCBDCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) { vbucketSeqnoOptions := gocbcore.GetVbucketSeqnoOptions{} if dc.supportsCollections { vbucketSeqnoOptions.FilterOptions = &gocbcore.GetVbucketSeqnoFilterOptions{CollectionID: collectionID} @@ -213,7 +212,7 @@ func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, err } // getHighSeqNos returns the maximum sequence number for every collection configured by the DCP agent. -func (dc *DCPClient) getHighSeqNos() ([]uint64, error) { +func (dc *GoCBDCPClient) getHighSeqNos() ([]uint64, error) { highSeqNos := make([]uint64, dc.numVbuckets) // Initialize highSeqNo to the current metadata's StartSeqNo - we don't want to use a value lower than what // we've already processed @@ -235,7 +234,7 @@ func (dc *DCPClient) getHighSeqNos() ([]uint64, error) { } // configureOneShot sets highSeqnos for a one shot feed. -func (dc *DCPClient) configureOneShot() error { +func (dc *GoCBDCPClient) configureOneShot() error { highSeqNos, err := dc.getHighSeqNos() if err != nil { return err @@ -250,8 +249,9 @@ func (dc *DCPClient) configureOneShot() error { return nil } -// Start returns an error and a channel to indicate when the DCPClient is done. If Start returns an error, DCPClient.Close() needs to be called. -func (dc *DCPClient) Start() (doneChan chan error, err error) { +// Start returns an error and a channel to indicate when the GoCBDCPClient is done. If Start returns an error, GoCBDCPClient.Close() needs to be called. +func (dc *GoCBDCPClient) Start(ctx context.Context) (doneChan chan error, err error) { + dc.ctx = ctx err = dc.initAgent(dc.spec) if err != nil { return dc.doneChannel, err @@ -273,14 +273,13 @@ func (dc *DCPClient) Start() (doneChan chan error, err error) { return dc.doneChannel, nil } -// Close is used externally to stop the DCP client. If the client was already closed due to error, returns that error -func (dc *DCPClient) Close() error { +// Close is used externally to stop the DCP client. +func (dc *GoCBDCPClient) Close() { dc.close() - return dc.getCloseError() } // GetMetadata returns metadata for all vbuckets -func (dc *DCPClient) GetMetadata() []DCPMetadata { +func (dc *GoCBDCPClient) GetMetadata() []DCPMetadata { metadata := make([]DCPMetadata, dc.numVbuckets) for i := uint16(0); i < dc.numVbuckets; i++ { metadata[i] = dc.metadata.GetMeta(i) @@ -290,7 +289,7 @@ func (dc *DCPClient) GetMetadata() []DCPMetadata { // close is used internally to stop the DCP client. Sends any fatal errors to the client's done channel, and // closes that channel. -func (dc *DCPClient) close() { +func (dc *GoCBDCPClient) close() { // set dc.closing to true, avoid re-triggering close if it's already in progress if !dc.closing.CompareAndSwap(false, true) { @@ -316,7 +315,7 @@ func (dc *DCPClient) close() { } // getAgentConfig returns a gocbcore.DCPAgentConfig for the given BucketSpec -func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) { +func (dc *GoCBDCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) { connStr, err := spec.GetGoCBConnStringForDCP() if err != nil { return nil, err @@ -360,7 +359,7 @@ func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, } // initAgent creates a DCP agent and waits for it to be ready -func (dc *DCPClient) initAgent(spec BucketSpec) error { +func (dc *GoCBDCPClient) initAgent(spec BucketSpec) error { agentConfig, err := dc.getAgentConfig(spec) if err != nil { return err @@ -405,13 +404,13 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error { return nil } -func (dc *DCPClient) workerForVbno(vbNo uint16) *DCPWorker { +func (dc *GoCBDCPClient) workerForVbno(vbNo uint16) *DCPWorker { workerIndex := int(vbNo % uint16(len(dc.workers))) return dc.workers[workerIndex] } // startWorkers initializes the DCP workers to receive stream events from eventFeed -func (dc *DCPClient) startWorkers(ctx context.Context) { +func (dc *GoCBDCPClient) startWorkers(ctx context.Context) { // vbuckets are assigned to workers as vbNo % NumWorkers. Create set of assigned vbuckets assignedVbs := make(map[int][]uint16) @@ -434,7 +433,7 @@ func (dc *DCPClient) startWorkers(ctx context.Context) { } } -func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error { +func (dc *GoCBDCPClient) openStream(vbID uint16, maxRetries uint32) error { var openStreamErr error var attempts uint32 @@ -488,7 +487,7 @@ func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error { return fmt.Errorf("openStream failed to complete after %d attempts, last error: %w", attempts, openStreamErr) } -func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) { +func (dc *GoCBDCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) { if dc.dbStats != nil { dc.dbStats.Add("dcp_rollback_count", 1) } @@ -497,7 +496,7 @@ func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.S // openStreamRequest issues the OpenStream request, but doesn't perform any error handling. Callers // should generally use openStream() for error and retry handling -func (dc *DCPClient) openStreamRequest(vbID uint16) error { +func (dc *GoCBDCPClient) openStreamRequest(vbID uint16) error { vbMeta := dc.metadata.GetMeta(vbID) @@ -548,7 +547,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error { // verifyFailoverLog checks for VbUUID changes when failOnRollback is set, and // writes the failover log to the client metadata store. If previous VbUUID is zero, it's // not considered a rollback - it's not required to initialize vbUUIDs into meta. -func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error { +func (dc *GoCBDCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error { if dc.failOnRollback { previousMeta := dc.metadata.GetMeta(vbID) @@ -566,7 +565,7 @@ func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) return nil } -func (dc *DCPClient) deactivateVbucket(vbID uint16) { +func (dc *GoCBDCPClient) deactivateVbucket(vbID uint16) { dc.activeVbucketLock.Lock() delete(dc.activeVbuckets, vbID) activeCount := len(dc.activeVbuckets) @@ -580,7 +579,7 @@ func (dc *DCPClient) deactivateVbucket(vbID uint16) { } } -func (dc *DCPClient) onStreamEnd(e endStreamEvent) { +func (dc *GoCBDCPClient) onStreamEnd(e endStreamEvent) { if e.err == nil { DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed, all items streamed", e.vbID) dc.deactivateVbucket(e.vbID) @@ -588,8 +587,8 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) { } if errors.Is(e.err, gocbcore.ErrDCPStreamClosed) { - DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by DCPClient", e.vbID) - dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by DCPClient", e.vbID)) + DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by GoCBDCPClient", e.vbID) + dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by GoCBDCPClient", e.vbID)) return } @@ -616,15 +615,15 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) { }(e.vbID, retries) } -func (dc *DCPClient) fatalError(err error) { +func (dc *GoCBDCPClient) fatalError(err error) { dc.setCloseError(err) dc.close() } -func (dc *DCPClient) setCloseError(err error) { +func (dc *GoCBDCPClient) setCloseError(err error) { dc.closeErrorLock.Lock() defer dc.closeErrorLock.Unlock() - // If the DCPClient is already closing, don't update the error. If an initial error triggered the close, + // If the GoCBDCPClient is already closing, don't update the error. If an initial error triggered the close, // then closeError will already be set. In the event of a requested close, we want to ignore EOF errors associated // with stream close if dc.closing.IsTrue() { @@ -635,7 +634,7 @@ func (dc *DCPClient) setCloseError(err error) { } } -func (dc *DCPClient) getCloseError() error { +func (dc *GoCBDCPClient) getCloseError() error { dc.closeErrorLock.Lock() defer dc.closeErrorLock.Unlock() return dc.closeError @@ -661,16 +660,16 @@ func getLatestVbUUID(failoverLog []gocbcore.FailoverEntry) (vbUUID gocbcore.VbUU return entry.VbUUID } -func (dc *DCPClient) GetMetadataKeyPrefix() string { +func (dc *GoCBDCPClient) GetMetadataKeyPrefix() string { return dc.metadata.GetKeyPrefix() } // StartWorkersForTest will iterate through dcp workers to start them, to be used for caching testing purposes only. -func (dc *DCPClient) StartWorkersForTest(t *testing.T) { +func (dc *GoCBDCPClient) StartWorkersForTest(t *testing.T) { dc.startWorkers(dc.ctx) } // NewDCPClientForTest is a test-only function to create a DCP client with a specific number of vbuckets. -func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) { +func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) { return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets) } diff --git a/base/dcp_client_metadata.go b/base/dcp_client_metadata.go index bd1aa261b5..fd06663978 100644 --- a/base/dcp_client_metadata.go +++ b/base/dcp_client_metadata.go @@ -223,7 +223,6 @@ func NewDCPMetadataCS(ctx context.Context, store DataStore, numVbuckets uint16, // avoid read/write races on vbucket data. Calls to persist must be blocking on the worker goroutine, and vbuckets are // only assigned to a single worker func (m *DCPMetadataCS) Persist(ctx context.Context, workerID int, vbIDs []uint16) { - meta := WorkerMetadata{} meta.DCPMeta = make(map[uint16]DCPMetadata) for _, vbID := range vbIDs { diff --git a/base/dcp_client_stream_observer.go b/base/dcp_client_stream_observer.go index da33c5daf1..3266046c76 100644 --- a/base/dcp_client_stream_observer.go +++ b/base/dcp_client_stream_observer.go @@ -16,7 +16,7 @@ import ( // to the DCPClient's workers to be processed, but performs the following additional functionality: // - key-based filtering for document-based events (Deletion, Expiration, Mutation) // - stream End handling, including restart on error -func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { +func (dc *GoCBDCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { e := snapshotEvent{ streamEventCommon: streamEventCommon{ @@ -30,7 +30,7 @@ func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { dc.workerForVbno(snapshotMarker.VbID).Send(dc.ctx, e) } -func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) { +func (dc *GoCBDCPClient) Mutation(mutation gocbcore.DcpMutation) { if dc.filteredKey(mutation.Key) { return @@ -56,7 +56,7 @@ func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) { dc.workerForVbno(mutation.VbID).Send(dc.ctx, e) } -func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) { +func (dc *GoCBDCPClient) Deletion(deletion gocbcore.DcpDeletion) { if dc.filteredKey(deletion.Key) { return @@ -80,7 +80,7 @@ func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) { } -func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) { +func (dc *GoCBDCPClient) End(end gocbcore.DcpStreamEnd, err error) { e := endStreamEvent{ streamEventCommon: streamEventCommon{ @@ -92,41 +92,41 @@ func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) { } -func (dc *DCPClient) Expiration(expiration gocbcore.DcpExpiration) { +func (dc *GoCBDCPClient) Expiration(expiration gocbcore.DcpExpiration) { // SG doesn't opt in to expirations, so they'll come through as deletion events // (cf.https://github.com/couchbase/kv_engine/blob/master/docs/dcp/documentation/expiry-opcode-output.md) WarnfCtx(dc.ctx, "Unexpected DCP expiration event (vb:%d) for key %v", expiration.VbID, UD(string(expiration.Key))) } -func (dc *DCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) { +func (dc *GoCBDCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) { // Not used by SG at this time } -func (dc *DCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) { +func (dc *GoCBDCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) { // Not used by SG at this time } -func (dc *DCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) { +func (dc *GoCBDCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) { // Not used by SG at this time } -func (dc *DCPClient) CreateScope(creation gocbcore.DcpScopeCreation) { +func (dc *GoCBDCPClient) CreateScope(creation gocbcore.DcpScopeCreation) { // Not used by SG at this time } -func (dc *DCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) { +func (dc *GoCBDCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) { // Not used by SG at this time } -func (dc *DCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) { +func (dc *GoCBDCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) { // Not used by SG at this time } -func (dc *DCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) { +func (dc *GoCBDCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) { // Not used by SG at this time } -func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { +func (dc *GoCBDCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { dc.workerForVbno(seqNoAdvanced.VbID).Send(dc.ctx, seqnoAdvancedEvent{ streamEventCommon: streamEventCommon{ vbID: seqNoAdvanced.VbID, @@ -136,6 +136,6 @@ func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { }) } -func (dc *DCPClient) filteredKey(key []byte) bool { +func (dc *GoCBDCPClient) filteredKey(key []byte) bool { return false } diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index 245bb97053..2e54fc9565 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -28,10 +28,6 @@ const oneShotDCPTimeout = 5 * time.Minute func TestOneShotDCP(t *testing.T) { - if UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } - ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -54,33 +50,23 @@ func TestOneShotDCP(t *testing.T) { return false } - // start one shot feed - feedID := t.Name() - - collection, err := AsCollection(dataStore) - require.NoError(t, err) - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } - - clientOptions := DCPClientOptions{ + dcpOptions := DCPClientOptions{ + FeedPrefix: t.Name(), + CollectionNames: CollectionNames{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, OneShot: true, - CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, clientOptions, gocbv2Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) require.NoError(t, err) - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) - defer func() { - _ = dcpClient.Close() - }() + defer dcpClient.Close() // Add additional documents in a separate goroutine, to verify one-shot behaviour var additionalDocsWg sync.WaitGroup @@ -112,10 +98,6 @@ func TestOneShotDCP(t *testing.T) { func TestTerminateDCPFeed(t *testing.T) { - if UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } - ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -129,15 +111,17 @@ func TestTerminateDCPFeed(t *testing.T) { return false } - // start continuous feed with terminator - feedID := t.Name() - - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - options := DCPClientOptions{ + dcpOptions := DCPClientOptions{ + FeedPrefix: t.Name(), + CollectionNames: map[string][]string{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, + OneShot: false, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, options, gocbv2Bucket) + + dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) require.NoError(t, err) // Add documents in a separate goroutine @@ -157,15 +141,14 @@ func TestTerminateDCPFeed(t *testing.T) { } }() - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) // Wait for some processing to complete, then close the feed time.Sleep(10 * time.Millisecond) log.Printf("Closing DCP Client") - err = dcpClient.Close() + dcpClient.Close() log.Printf("DCP Client closed, waiting for feed close notification") - require.NoError(t, err) // wait for done timeout := time.After(oneShotDCPTimeout) @@ -224,8 +207,6 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { return false } - feedID := t.Name() - // Add documents updatedBody := map[string]any{"foo": "bar"} for i := range 10000 { @@ -233,27 +214,22 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { err := dataStore.Set(key, 0, nil, updatedBody) require.NoError(t, err) } - collection, ok := dataStore.(*Collection) - require.True(t, ok) - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } + collectionNames := CollectionNames{dataStore.ScopeName(): []string{dataStore.CollectionName()}} // Perform first one-shot DCP feed - normal one-shot dcpClientOpts := DCPClientOptions{ OneShot: true, FailOnRollback: true, - CollectionIDs: collectionIDs, + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, + FeedPrefix: t.Name(), } - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - dcpClient, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) // Wait for first feed to complete @@ -278,37 +254,43 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { uuidMismatchMetadata[0].SnapEndSeqNo = test.startSeqNo dcpClientOpts = DCPClientOptions{ + FeedPrefix: t.Name(), + CollectionNames: collectionNames, InitialMetadata: uuidMismatchMetadata, FailOnRollback: true, OneShot: true, - CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - dcpClient2, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) + dcpClient2, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - doneChan2, startErr2 := dcpClient2.Start() + doneChan2, startErr2 := dcpClient2.Start(ctx) require.Error(t, startErr2) - require.NoError(t, dcpClient2.Close()) + dcpClient2.Close() <-doneChan2 log.Printf("Starting third feed") // Perform a third DCP feed - mismatched VbUUID, failOnRollback=false atomic.StoreUint64(&mutationCount, 0) dcpClientOpts = DCPClientOptions{ + FeedPrefix: t.Name(), InitialMetadata: uuidMismatchMetadata, FailOnRollback: false, OneShot: true, - CollectionIDs: collectionIDs, + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - dcpClient3, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) + dcpClient3, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - doneChan3, startErr3 := dcpClient3.Start() + doneChan3, startErr3 := dcpClient3.Start(ctx) require.NoError(t, startErr3) + client, ok := dcpClient3.(*GoCBDCPClient) + require.True(t, ok) // Wait for third feed to complete feed3Timeout := time.After(oneShotDCPTimeout) select { @@ -317,7 +299,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) { mutationCount := atomic.LoadUint64(&mutationCount) require.Equal(t, int(vbucketZeroExpected), int(mutationCount)) // check the rolled back vBucket has in fact closed the stream after its finished - numVBuckets := len(dcpClient.activeVbuckets) + numVBuckets := len(client.activeVbuckets) require.Equal(t, uint16(0), uint16(numVBuckets)) case <-feed3Timeout: t.Errorf("timeout waiting for first one-shot feed to complete") @@ -352,33 +334,25 @@ func TestContinuousDCPRollback(t *testing.T) { return false } - feedID := t.Name() - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - - collection, err := AsCollection(dataStore) - require.NoError(t, err) - - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } + collectionNames := CollectionNames{dataStore.ScopeName(): []string{dataStore.CollectionName()}} dcpClientOpts := DCPClientOptions{ FailOnRollback: false, OneShot: false, - CollectionIDs: collectionIDs, + FeedPrefix: t.Name(), + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), - MetadataStoreType: DCPMetadataStoreInMemory, + MetadataStoreType: DCPMetadataStoreCS, + Callback: counterCallback, } // timeout for feed to complete timeout := time.After(20 * time.Second) - dcpClient, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - _, startErr := dcpClient.Start() + _, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) // Add documents @@ -404,34 +378,39 @@ func TestContinuousDCPRollback(t *testing.T) { InitialMetadata: dcpClient.GetMetadata(), FailOnRollback: false, OneShot: false, - CollectionIDs: collectionIDs, + CollectionNames: collectionNames, + FeedPrefix: t.Name(), + Callback: counterCallback, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), - MetadataStoreType: DCPMetadataStoreInMemory, + MetadataStoreType: DCPMetadataStoreCS, } - require.NoError(t, dcpClient.Close()) + dcpClient.Close() - dcpClient1, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) + dcpClient1, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) + client, ok := dcpClient1.(*GoCBDCPClient) + require.True(t, ok) + // function to force the rollback of some vBuckets - dcpClient1.forceRollbackvBucket(vbUUID) + client.forceRollbackvBucket(vbUUID) - _, startErr = dcpClient1.Start() + _, startErr = dcpClient1.Start(ctx) require.NoError(t, startErr) // Assert that the number of vBuckets active are the same as the total number of vBuckets on the client. // In continuous rollback the streams should not close after they're finished. - numVBuckets := len(dcpClient1.activeVbuckets) - require.Equal(t, dcpClient1.numVbuckets, uint16(numVBuckets)) + numVBuckets := len(client.activeVbuckets) + require.Equal(t, client.numVbuckets, uint16(numVBuckets)) defer func() { - assert.NoError(t, dcpClient1.Close()) + dcpClient1.Close() }() } // forceRollbackvBucket forces the rollback of vBucket IDs that are even // Test helper function. This should not be used elsewhere. -func (dc *DCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) { +func (dc *GoCBDCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) { metadata := make([]DCPMetadata, dc.numVbuckets) for i := uint16(0); i < dc.numVbuckets; i++ { // rollback roughly half the vBuckets @@ -445,20 +424,14 @@ func (dc *DCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) { // TestResumeInterruptedFeed uses persisted metadata to resume the feed func TestResumeStoppedFeed(t *testing.T) { - - if UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } - - SetUpTestLogging(t, LevelDebug, KeyAll) - + //SetUpTestLogging(t, LevelDebug, KeyAll) ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) dataStore := bucket.GetSingleDataStore() - var dcpClient *DCPClient + var dcpClient DCPClient // create callback mutationCount := uint64(0) @@ -466,14 +439,13 @@ func TestResumeStoppedFeed(t *testing.T) { if bytes.HasPrefix(event.Key, []byte(t.Name())) { count := atomic.AddUint64(&mutationCount, 1) if count > 5000 { - err := dcpClient.Close() - assert.NoError(t, err) + dcpClient.Close() } } return false } - feedID := t.Name() + collectionNames := CollectionNames{dataStore.ScopeName(): []string{dataStore.CollectionName()}} // Add documents updatedBody := map[string]any{"foo": "bar"} @@ -483,32 +455,25 @@ func TestResumeStoppedFeed(t *testing.T) { require.NoError(t, err) } - // Start first one-shot DCP feed, will be stopped by callback after processing 5000 records - // Set metadata persistence frequency to zero to force persistence on every mutation - highFrequency := 0 * time.Second - - collection, ok := dataStore.(*Collection) - require.True(t, ok) - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } - dcpClientOpts := DCPClientOptions{ - OneShot: true, - FailOnRollback: false, - CheckpointPersistFrequency: &highFrequency, - CollectionIDs: collectionIDs, - CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + FeedPrefix: t.Name(), + Callback: counterCallback, + CollectionNames: collectionNames, + OneShot: true, + FailOnRollback: false, + CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), } - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - dcpClient, err = NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket) - require.NoError(t, err) + if !UnitTestUrlIsWalrus() { + dc, ok := dcpClient.(*GoCBDCPClient) + require.True(t, ok) + dc.checkpointPersistFrequency = Ptr(0 * time.Second) // disable periodic checkpointing for test + } - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) // Wait for first feed to complete @@ -519,7 +484,7 @@ func TestResumeStoppedFeed(t *testing.T) { require.Greater(t, int(mutationCount), 5000) log.Printf("Total processed first feed: %v", mutationCount) case <-timeout: - t.Errorf("timeout waiting for first one-shot feed to complete") + t.Fatalf("timeout waiting for first one-shot feed to complete") } var secondFeedCount uint64 @@ -535,14 +500,22 @@ func TestResumeStoppedFeed(t *testing.T) { dcpClientOpts = DCPClientOptions{ FailOnRollback: false, OneShot: true, - CollectionIDs: collectionIDs, + FeedPrefix: t.Name(), + Callback: secondCallback, + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), } - dcpClient2, err := NewDCPClient(ctx, feedID, secondCallback, dcpClientOpts, gocbv2Bucket) + dcpClient2, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - doneChan2, startErr2 := dcpClient2.Start() + if !UnitTestUrlIsWalrus() { + dc, ok := dcpClient2.(*GoCBDCPClient) + require.True(t, ok) + dc.checkpointPersistFrequency = Ptr(0 * time.Second) // disable periodic checkpointing for test + } + + doneChan2, startErr2 := dcpClient2.Start(ctx) require.NoError(t, startErr2) // Wait for second feed to complete @@ -576,14 +549,14 @@ func TestBadAgentPriority(t *testing.T) { t.Error(t, "Should not hit this callback") return false } - dcpClientOpts := DCPClientOptions{ + dcpClientOpts := GoCBDCPClientOptions{ AgentPriority: gocbcore.DcpAgentPriorityHigh, } gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) require.NoError(t, err) - dcpClient, err := NewDCPClient(TestCtx(t), feedID, panicCallback, dcpClientOpts, gocbv2Bucket) + dcpClient, err := NewGocbDCPClient(TestCtx(t), feedID, panicCallback, dcpClientOpts, gocbv2Bucket) require.Error(t, err) require.Nil(t, dcpClient) } @@ -604,29 +577,27 @@ func TestDCPOutOfRangeSequence(t *testing.T) { return false } - feedID := t.Name() + dataStore := bucket.GetSingleDataStore() + collectionNames := CollectionNames{dataStore.ScopeName(): []string{dataStore.CollectionName()}} dcpClientOpts := DCPClientOptions{ + FeedPrefix: t.Name(), FailOnRollback: false, OneShot: true, - CollectionIDs: getCollectionIDs(t, bucket), + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), - MetadataStoreType: DCPMetadataStoreInMemory, + MetadataStoreType: DCPMetadataStoreCS, + Callback: callback, } // timeout for feed to complete timeout := time.After(20 * time.Second) - gocbv2Bucket, err := AsGocbV2Bucket(bucket) - require.NoError(t, err) - - dcpClient, err := NewDCPClient(ctx, feedID, callback, dcpClientOpts, gocbv2Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) - defer func() { - assert.NoError(t, dcpClient.Close()) - }() + defer dcpClient.Close() select { case <-doneChan: @@ -640,16 +611,18 @@ func TestDCPOutOfRangeSequence(t *testing.T) { dcpClientOpts = DCPClientOptions{ FailOnRollback: false, OneShot: true, - CollectionIDs: getCollectionIDs(t, bucket), + FeedPrefix: t.Name(), + CollectionNames: collectionNames, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), - MetadataStoreType: DCPMetadataStoreInMemory, + MetadataStoreType: DCPMetadataStoreCS, InitialMetadata: metadata, + Callback: callback, } - dcpClient, err = NewDCPClient(ctx, feedID, callback, dcpClientOpts, gocbv2Bucket) + dcpClient, err = NewDCPClient(ctx, bucket, dcpClientOpts) require.NoError(t, err) - _, startErr = dcpClient.Start() + _, startErr = dcpClient.Start(ctx) require.Error(t, startErr) require.Contains(t, startErr.Error(), "out of range") @@ -676,17 +649,13 @@ func TestDCPFeedEventTypes(t *testing.T) { collection := bucket.GetSingleDataStore() + collectionNames := CollectionNames{collection.ScopeName(): []string{collection.CollectionName()}} // start one shot feed var collectionIDs []uint32 if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { collectionIDs = append(collectionIDs, collection.GetCollectionID()) } - clientOptions := DCPClientOptions{ - CollectionIDs: collectionIDs, - CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), - } - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) require.NoError(t, err) @@ -721,16 +690,19 @@ func TestDCPFeedEventTypes(t *testing.T) { } return true } - - dcpClient, err := NewDCPClient(ctx, t.Name(), callback, clientOptions, gocbv2Bucket) + clientOptions := DCPClientOptions{ + FeedPrefix: t.Name(), + CollectionNames: collectionNames, + CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: callback, + } + dcpClient, err := NewDCPClient(ctx, gocbv2Bucket, clientOptions) require.NoError(t, err) - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) - defer func() { - _ = dcpClient.Close() // extra close in case of early exit - }() + defer dcpClient.Close() // extra close in case of early exit xattrName := "_xattr1" xattrBody := []byte(`{"an": "xattr"}`) writeMutationCas, err := collection.WriteWithXattrs(ctx, docID, 0, 0, []byte(`{"foo":"bar"}`), map[string][]byte{xattrName: xattrBody}, nil, nil) @@ -749,7 +721,7 @@ func TestDCPFeedEventTypes(t *testing.T) { select { case <-foundEvent: - require.NoError(t, dcpClient.Close()) + dcpClient.Close() case <-timeout: t.Fatalf("timeout waiting for doc deletion") } @@ -807,17 +779,21 @@ func TestDCPClientAgentConfig(t *testing.T) { oldBucketSpecServer := gocbv2Bucket.Spec.Server defer func() { gocbv2Bucket.Spec.Server = oldBucketSpecServer }() gocbv2Bucket.Spec.Server += tc.serverSuffix - dcpClient, err := NewDCPClient(ctx, - "fakeFeedID", - func(sgbucket.FeedEvent) bool { return true }, - DCPClientOptions{MetadataStoreType: DCPMetadataStoreInMemory}, - gocbv2Bucket) + opts := DCPClientOptions{ + FeedPrefix: t.Name(), + CollectionNames: CollectionNames{ + bucket.GetSingleDataStore().ScopeName(): {bucket.GetSingleDataStore().CollectionName()}, + }, + MetadataStoreType: DCPMetadataStoreInMemory, + Callback: func(sgbucket.FeedEvent) bool { return true }, + } + dcpClient, err := NewDCPClient(ctx, bucket, opts) require.NoError(t, err) - defer func() { - assert.NoError(t, dcpClient.Close()) - }() + defer dcpClient.Close() - config, err := dcpClient.getAgentConfig(gocbv2Bucket.GetSpec()) + client, ok := dcpClient.(*GoCBDCPClient) + require.True(t, ok, "expected GoCBDCPClient type for DCP client") + config, err := client.getAgentConfig(gocbv2Bucket.GetSpec()) require.NoError(t, err) require.Equal(t, tc.networkType, config.IoConfig.NetworkType) diff --git a/base/dcp_common.go b/base/dcp_common.go index 47da59fba2..dde021838c 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -284,10 +284,10 @@ func makeFeedEvent(key []byte, value []byte, dataType uint8, cas uint64, expiry return event } -// Create a prefix that will be used to create the dcp stream name, which must be globally unique +// generateDcpStreamName creates a prefix that will be used to create the dcp stream name, which must be globally unique // in order to avoid https://issues.couchbase.com/browse/MB-24237. It's also useful to have the Sync Gateway // version number / commit for debugging purposes -func GenerateDcpStreamName(feedID string) (string, error) { +func generateDcpStreamName(feedID string) (string, error) { // Create a time-based UUID for uniqueness of DCP Stream Names u, err := uuid.NewUUID() @@ -297,12 +297,16 @@ func GenerateDcpStreamName(feedID string) (string, error) { commitTruncated := StringPrefix(GitCommit, 7) - return fmt.Sprintf( + feedName := fmt.Sprintf( "%v-v-%v-commit-%v-uuid-%v", feedID, ProductAPIVersion, commitTruncated, u.String(), - ), nil + ) + if len(feedName) > 200 { + return "", fmt.Errorf("generated DCP feed name exceeds 200 character limit: %s", feedName) + } + return feedName, nil } diff --git a/base/dcp_common_test.go b/base/dcp_common_test.go index c5c8b570ba..cbf3828e56 100644 --- a/base/dcp_common_test.go +++ b/base/dcp_common_test.go @@ -33,7 +33,7 @@ func TestDCPNameLength(t *testing.T) { for _, feedID := range feedIDs { t.Run(feedID, func(t *testing.T) { - dcpStreamName, err := GenerateDcpStreamName(feedID) + dcpStreamName, err := generateDcpStreamName(feedID) require.NoError(t, err) t.Logf("generated name of length %d: %s", len(dcpStreamName), dcpStreamName) @@ -43,7 +43,7 @@ func TestDCPNameLength(t *testing.T) { }) t.Run("cbgt"+feedID, func(t *testing.T) { - dcpStreamName, err := GenerateDcpStreamName(feedID) + dcpStreamName, err := generateDcpStreamName(feedID) require.NoError(t, err) // Format string copied from cbgt's 'NewDCPFeed' diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index ac9ee146b1..545dac7118 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -64,14 +64,18 @@ type CbgtContext struct { // StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket. // dbName is used to define a unique path name for local file storage of pindex files func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) { + fmt.Printf("cfg=%+v\n", cfg) // Ensure we don't try to start collections-enabled feed if there are any pre-collection SG nodes in the cluster. minVersion, err := getMinNodeVersion(cfg) if err != nil { return nil, fmt.Errorf("failed to get minimum node version in cluster: %w", err) } if minVersion.Less(firstVersionToSupportCollections) { + if scope != DefaultScope { + return nil, fmt.Errorf("cannot start DCP feed on non-default scope with legacy nodes present in the cluster") + } // DefaultScope is allowed by older versions of CBGT as long as no collections are specified. - if len(collections) > 0 { + if len(collections) != 1 || collections[0] != DefaultCollection { return nil, fmt.Errorf("cannot start DCP feed on non-default collection with legacy nodes present in the cluster") } } diff --git a/base/gocb_dcp_feed.go b/base/gocb_dcp_feed.go index 60b26e7a4b..e5ab77305c 100644 --- a/base/gocb_dcp_feed.go +++ b/base/gocb_dcp_feed.go @@ -10,7 +10,6 @@ package base import ( "context" - "expvar" "fmt" "github.com/couchbase/gocbcore/v10" @@ -47,23 +46,16 @@ func getHighSeqMetadata(cbstore CouchbaseBucketStore) ([]DCPMetadata, error) { return metadata, nil } -// StartGocbDCPFeed starts a DCP Feed. -func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) error { - - feedName, err := GenerateDcpStreamName(args.ID) - if err != nil { - return err - } - +func newGocbDCPClient(ctx context.Context, bucket *GocbV2Bucket, opts DCPClientOptions) (*GoCBDCPClient, error) { var collectionIDs []uint32 if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) { cm, err := bucket.GetCollectionManifest() if err != nil { - return err + return nil, err } // should only be one args.Scope so cheaper to iterate this way around - for scopeName, collections := range args.Scopes { + for scopeName, collections := range opts.CollectionNames { scopeFound := false for _, manifestScope := range cm.Scopes { if scopeName != manifestScope.Name { @@ -84,87 +76,47 @@ func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName stri if len(collectionsFound) != len(collections) { for _, collectionName := range collections { if _, ok := collectionsFound[collectionName]; !ok { - return RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections) + return nil, RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections) } } } break } if !scopeFound { - return RedactErrorf("scope %s not found", MD(scopeName)) + return nil, RedactErrorf("scope %s not found", MD(scopeName)) } } } - options := DCPClientOptions{ - MetadataStoreType: metadataStoreType, - GroupID: groupID, - DbStats: dbStats, + options := GoCBDCPClientOptions{ + MetadataStoreType: opts.MetadataStoreType, + DbStats: opts.DBStats, CollectionIDs: collectionIDs, AgentPriority: gocbcore.DcpAgentPriorityMed, - CheckpointPrefix: args.CheckpointPrefix, + CheckpointPrefix: opts.CheckpointPrefix, + OneShot: opts.OneShot, + FailOnRollback: opts.FailOnRollback, + InitialMetadata: opts.InitialMetadata, } - if args.Backfill == sgbucket.FeedNoBackfill { + if opts.FromLatestSequence { + if len(opts.InitialMetadata) > 0 { + return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true") + } metadata, err := getHighSeqMetadata(bucket) if err != nil { - return err + return nil, err } options.InitialMetadata = metadata } - dcpClient, err := NewDCPClient( + feedName, err := generateDcpStreamName(opts.FeedPrefix) + if err != nil { + return nil, err + } + return NewGocbDCPClient( ctx, feedName, - callback, + opts.Callback, options, bucket) - if err != nil { - return err - } - - doneChan, err := dcpClient.Start() - if err != nil { - ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) - // simplify in CBG-2234 - closeErr := dcpClient.Close() - ErrorfCtx(ctx, "Finished called async close error from DCP Feed %q for bucket %q", feedName, MD(bucketName)) - if closeErr != nil { - ErrorfCtx(ctx, "Close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), closeErr) - } - asyncCloseErr := <-doneChan - ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), asyncCloseErr) - return err - } - InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) - go func() { - select { - case dcpCloseError := <-doneChan: - // simplify close in CBG-2234 - // This is a close because DCP client closed on its own, which should never happen since once - // DCP feed is started, there is nothing that will close it - InfofCtx(ctx, KeyDCP, "Forced closed DCP Feed %q for %q", feedName, MD(bucketName)) - // wait for channel close - <-doneChan - if dcpCloseError != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseError) - } - // FIXME: close dbContext here - break - case <-args.Terminator: - InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName)) - dcpCloseErr := dcpClient.Close() - if dcpCloseErr != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) - } - dcpCloseErr = <-doneChan - if dcpCloseErr != nil { - WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr) - } - break - } - if args.DoneChan != nil { - close(args.DoneChan) - } - }() - return err } diff --git a/base/rosmar_dcp_client.go b/base/rosmar_dcp_client.go new file mode 100644 index 0000000000..386d9cc27f --- /dev/null +++ b/base/rosmar_dcp_client.go @@ -0,0 +1,80 @@ +// Copyright 2025-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package base + +import ( + "context" + + sgbucket "github.com/couchbase/sg-bucket" +) + +// RosmarDCPClient implements a DCPClient for rosmar buckets. +type RosmarDCPClient struct { + bucket Bucket + opts DCPClientOptions + doneChan chan struct{} + terminator chan bool +} + +// NewRosmarDCPClient creates a new DCPClient for a rosmar bucket. +func NewRosmarDCPClient(bucket Bucket, opts DCPClientOptions) (*RosmarDCPClient, error) { + return &RosmarDCPClient{ + bucket: bucket, + opts: opts, + }, nil +} + +// Start a DCP feed, returns a channel that will be closed when the feed is done. +func (dc *RosmarDCPClient) Start(ctx context.Context) (chan error, error) { + doneChan := make(chan error) + dc.doneChan = make(chan struct{}) + dc.terminator = make(chan bool) + feedArgs := sgbucket.FeedArguments{ + ID: dc.opts.FeedPrefix, + CheckpointPrefix: dc.opts.CheckpointPrefix, + Dump: dc.opts.OneShot, + DoneChan: dc.doneChan, + Terminator: dc.terminator, + Scopes: dc.opts.CollectionNames, + Backfill: sgbucket.FeedResume, + } + if dc.opts.FromLatestSequence { + feedArgs.Backfill = sgbucket.FeedNoBackfill + } + err := dc.bucket.StartDCPFeed(ctx, feedArgs, dc.opts.Callback, nil) + if err != nil { + close(doneChan) + return nil, err + } + // This extra goroutine can be removed if sgbucket.FeedArguments.DoneChan is changed to chan error + go func() { + <-dc.doneChan + close(doneChan) + }() + return doneChan, nil +} + +// Close the DCP feed. This is a non blocking operation to allow for use in a callback function. +func (dc *RosmarDCPClient) Close() { + if dc.terminator != nil { + close(dc.terminator) + dc.terminator = nil + } +} + +func (dc *RosmarDCPClient) GetMetadata() []DCPMetadata { + // Rosmar DCP client does not support getting metadata yet + return nil +} + +// GetMetadataKeyPrefix returns the document prefix for the checkpoint documents. +func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string { + // this value is probably not correct + return dc.opts.CheckpointPrefix +} diff --git a/base/util_testing.go b/base/util_testing.go index 21a4426d5c..6ca3b8e4b1 100644 --- a/base/util_testing.go +++ b/base/util_testing.go @@ -36,6 +36,7 @@ import ( "github.com/couchbase/gocb/v2" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbaselabs/rosmar" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1023,3 +1024,9 @@ func RequireXattrNotFound(t testing.TB, dataStore sgbucket.DataStore, docID stri func underGoTest() bool { return testing.Testing() } + +func UUID(t testing.TB) string { + id, err := uuid.NewRandom() + require.NoError(t, err) + return id.String() +} diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 6ed3b4daec..da5e8c60d2 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -131,31 +131,21 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return 0, nil, "", err - } + clientOptions := getCompactionDCPClientOptions(db, compactionID, MarkPhase, dataStore, callback, nil) base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for mark phase of attachment compaction", compactionLoggingID) - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, MarkPhase) - - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return 0, nil, "", err - } - - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return 0, nil, "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) - _ = dcpClient.Close() + dcpClient.Close() return 0, nil, metadataKeyPrefix, err } base.DebugfCtx(ctx, base.KeyAll, "[%s] DCP feed started.", compactionLoggingID) @@ -163,13 +153,12 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c select { case <-doneChan: base.InfofCtx(ctx, base.KeyAll, "[%s] Mark phase of attachment compaction completed. Marked %d attachments", compactionLoggingID, markedAttachmentCount.Value()) - err = dcpClient.Close() if markProcessFailureErr != nil { return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr } case <-terminator.Done(): base.DebugfCtx(ctx, base.KeyAll, "[%s] Terminator closed. Stopping mark phase.", compactionLoggingID) - err = dcpClient.Close() + dcpClient.Close() if markProcessFailureErr != nil { return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr } @@ -377,30 +366,24 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return 0, err - } - clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs) - - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, SweepPhase) + clientOptions := getCompactionDCPClientOptions(db, compactionID, SweepPhase, dataStore, callback, base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs)) bucket, err := base.AsGocbV2Bucket(db.Bucket) if err != nil { return 0, err } - base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, dcpFeedKey) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, clientOptions.FeedPrefix) + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return 0, err } - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) - _ = dcpClient.Close() + dcpClient.Close() return 0, err } base.DebugfCtx(ctx, base.KeyAll, "[%s] DCP client started.", compactionLoggingID) @@ -408,15 +391,10 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, select { case <-doneChan: base.InfofCtx(ctx, base.KeyAll, "[%s] Sweep phase of attachment compaction completed. Deleted %d attachments", compactionLoggingID, purgedAttachmentCount.Value()) - err = dcpClient.Close() + dcpClient.Close() case <-terminator.Done(): base.DebugfCtx(ctx, base.KeyAll, "[%s] Terminator closed. Ending sweep phase.", compactionLoggingID) - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close attachment compaction DCP client! %v", compactionLoggingID, err) - return purgedAttachmentCount.Value(), err - } - + dcpClient.Close() err = <-doneChan if err != nil { return purgedAttachmentCount.Value(), err @@ -513,49 +491,34 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return "", err - } - clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs) + clientOptions := getCompactionDCPClientOptions(db, compactionID, CleanupPhase, dataStore, callback, base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs)) base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for cleanup phase of attachment compaction", compactionLoggingID) - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, CleanupPhase) - bucket, err := base.AsGocbV2Bucket(db.Bucket) if err != nil { return "", err } - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) - // simplify close in CBG-2234 - _ = dcpClient.Close() + dcpClient.Close() return metadataKeyPrefix, err } select { case <-doneChan: base.InfofCtx(ctx, base.KeyAll, "[%s] Cleanup phase of attachment compaction completed", compactionLoggingID) - // simplify close in CBG-2234 - err = dcpClient.Close() case <-terminator.Done(): - // simplify close in CBG-2234 - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close attachment compaction DCP client! %v", compactionLoggingID, err) - return metadataKeyPrefix, err - } - + dcpClient.Close() err = <-doneChan if err != nil { return metadataKeyPrefix, err @@ -574,27 +537,31 @@ func getCompactionIDSubDocPath(compactionID string) string { } // getCompactionDCPClientOptions returns the default set of DCPClientOptions suitable for attachment compaction -func getCompactionDCPClientOptions(collectionID uint32, groupID string, prefix string) (*base.DCPClientOptions, error) { - clientOptions := &base.DCPClientOptions{ +func getCompactionDCPClientOptions(db *Database, compactionID string, compactionAction string, dataStore sgbucket.DataStore, callback sgbucket.FeedEventCallbackFunc, initialMetadata []base.DCPMetadata) base.DCPClientOptions { + return base.DCPClientOptions{ + FeedPrefix: getAttachmentionCompactionPrefix(compactionID, compactionAction), OneShot: true, FailOnRollback: true, MetadataStoreType: base.DCPMetadataStoreCS, - GroupID: groupID, - CollectionIDs: []uint32{collectionID}, - CheckpointPrefix: prefix, + CollectionNames: map[string][]string{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, + CheckpointPrefix: GetAttachmentCompactionCheckpointPrefix(db.DatabaseContext, compactionID, compactionAction), + Callback: callback, + InitialMetadata: initialMetadata, } - return clientOptions, nil - } -func GenerateCompactionDCPStreamName(compactionID, compactionAction string) string { - return fmt.Sprintf( - "sg-%v:att_compaction:%v_%v", +func getAttachmentionCompactionPrefix(compactionID string, compactionAction string) string { + return fmt.Sprintf("sg-%v:att_compaction:%v_%v", base.ProductAPIVersion, compactionID, compactionAction, ) } +func GetAttachmentCompactionCheckpointPrefix(db *DatabaseContext, compactionID string, compactionAction string) string { + return getAttachmentionCompactionPrefix(compactionID, compactionAction) + db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) +} // getAttachmentCompactionXattr returns the value of the attachment compaction xattr from a DCP stream. The value will be nil if the xattr is not found. func getAttachmentCompactionXattr(data []byte) ([]byte, error) { diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 4a86925440..166a5934bd 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -24,11 +24,12 @@ import ( "github.com/stretchr/testify/require" ) -func TestAttachmentMark(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Requires CBS") - } +func testRequiresRosmarHierachicalSubdocOps(t *testing.T) { + t.Skip("hierachical subdoc operations are not supported by rosmar yet - CBG-4232") +} +func TestAttachmentMark(t *testing.T) { + testRequiresRosmarHierachicalSubdocOps(t) testDb, ctx := setupTestDB(t) defer testDb.Close(ctx) @@ -256,7 +257,6 @@ func TestAttachmentCleanupRollback(t *testing.T) { var garbageVBUUID gocbcore.VbUUID = 1234 collection := GetSingleDatabaseCollection(t, testDb.DatabaseContext) dataStore := collection.dataStore - collectionID := collection.GetCollectionID() makeMarkedDoc := func(docid string, compactID string) { err := dataStore.SetRaw(docid, 0, nil, []byte("{}")) @@ -283,10 +283,9 @@ func TestAttachmentCleanupRollback(t *testing.T) { bucket, err := base.AsGocbV2Bucket(testDb.Bucket) require.NoError(t, err) - dcpFeedKey := GenerateCompactionDCPStreamName(t.Name(), CleanupPhase) - clientOptions, err := getCompactionDCPClientOptions(collectionID, testDb.Options.GroupID, testDb.MetadataKeys.DCPCheckpointPrefix(testDb.Options.GroupID)) - require.NoError(t, err) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, nil, *clientOptions, bucket) + clientOptions := getCompactionDCPClientOptions(testDb, t.Name(), CleanupPhase, dataStore, func(sgbucket.FeedEvent) bool { return true }, nil) + + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) require.NoError(t, err) // alter dcp metadata to feed into the compaction manager diff --git a/db/background_mgr.go b/db/background_mgr.go index e574d4373c..503cb3a615 100644 --- a/db/background_mgr.go +++ b/db/background_mgr.go @@ -10,6 +10,7 @@ package db import ( "context" + "fmt" "net/http" "sync" "time" @@ -199,7 +200,7 @@ func (b *BackgroundManager) markStart(ctx context.Context) error { if err == nil && status.ShouldStop { return base.HTTPErrorf(http.StatusServiceUnavailable, "Process stop still in progress - please wait before restarting") } - return processAlreadyRunningErr + return fmt.Errorf("cas mismatch on heartbeat document: %w", processAlreadyRunningErr) } // Now we know that we're the only running process we should instantiate these values diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index afd4f55afd..36f868d01f 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -158,48 +158,36 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string return true } - bucket, err := base.AsGocbV2Bucket(db.Bucket) + scopes, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) if err != nil { return err } - - currCollectionIDs, err := getCollectionIDsForMigration(db) - if err != nil { - return err - } - dcpFeedKey := GenerateAttachmentMigrationDCPStreamName(a.MigrationID) - dcpPrefix := db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) + dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) // check for mismatch in collection id's between current collections on the db and prev run - checkpointPrefix := fmt.Sprintf("%s:%v", dcpPrefix, dcpFeedKey) - err = a.resetDCPMetadataIfNeeded(ctx, db, checkpointPrefix, currCollectionIDs) + err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, currCollectionIDs) if err != nil { return err } a.SetCollectionIDs(currCollectionIDs) - dcpOptions := getMigrationDCPClientOptions(currCollectionIDs, db.Options.GroupID, dcpPrefix) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *dcpOptions, bucket) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, dcpOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment migration DCP client: %v", migrationLoggingID, err) return err } - base.DebugfCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for attachment migration", migrationLoggingID, dcpFeedKey) + base.DebugfCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for attachment migration", migrationLoggingID) - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment migration DCP feed: %v", migrationLoggingID, err) - _ = dcpClient.Close() + dcpClient.Close() return err } base.TracefCtx(ctx, base.KeyAll, "[%s] DCP client started for Attachment Migration.", migrationLoggingID) select { case <-doneChan: - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close attachment migration DCP client after attachment migration process was finished %v", migrationLoggingID, err) - } updatedDsNames := make(map[base.ScopeAndCollectionName]struct{}, len(db.CollectionByID)) // set sync info metadata version for _, collectionID := range currCollectionIDs { @@ -225,11 +213,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string } base.InfofCtx(ctx, base.KeyAll, msg) case <-terminator.Done(): - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close attachment migration DCP client after attachment migration process was terminated %v", migrationLoggingID, err) - return err - } + dcpClient.Close() err = <-doneChan if err != nil { return err @@ -291,16 +275,18 @@ func (a *AttachmentMigrationManager) GetProcessStatus(status BackgroundManagerSt return statusJSON, metaJSON, err } -func getMigrationDCPClientOptions(collectionIDs []uint32, groupID, prefix string) *base.DCPClientOptions { - clientOptions := &base.DCPClientOptions{ +func getMigrationDCPClientOptions(db *DatabaseContext, migrationID string, scopes base.CollectionNames, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { + prefix := getAttachmentMigrationPrefix(migrationID) + + return base.DCPClientOptions{ + FeedPrefix: prefix, OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, - GroupID: groupID, - CollectionIDs: collectionIDs, - CheckpointPrefix: prefix, + CollectionNames: scopes, + CheckpointPrefix: fmt.Sprintf("%s:%v", db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID), prefix), + Callback: callback, } - return clientOptions } type AttachmentMigrationManagerResponse struct { @@ -320,8 +306,8 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } -// GenerateAttachmentMigrationDCPStreamName returns the DCP stream name for a resync. -func GenerateAttachmentMigrationDCPStreamName(migrationID string) string { +// getAttachmentMigrationPrefix returns a prefix for identifying attachment migration dcp feed and checkpoints. +func getAttachmentMigrationPrefix(migrationID string) string { return fmt.Sprintf( "sg-%v:att_migration:%v", base.ProductAPIVersion, @@ -355,25 +341,34 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex return nil } -// getCollectionIDsForMigration will get all collection IDs required for DCP client on migration run -func getCollectionIDsForMigration(db *DatabaseContext) ([]uint32, error) { +// getCollectionsForAttachmentMigration will get all datastores. +func getCollectionsForAttachmentMigration(db *DatabaseContext) (scopes map[string][]string, ids []uint32, err error) { + collections := make(map[string][]string, 1) // one scope always collectionIDs := make([]uint32, 0) - // if all collections are included in RequireAttachmentMigration then we need to run against all collections, // if no collections are specified in RequireAttachmentMigration, run against all collections. This is to support job // being triggered by rest api (even after job was previously completed) if len(db.RequireAttachmentMigration) == 0 { - // get all collection IDs - collectionIDs = db.GetCollectionIDs() + for _, collection := range db.CollectionByID { + if _, ok := collections[collection.ScopeName]; !ok { + collections[collection.ScopeName] = make([]string, 0) + } + collections[collection.ScopeName] = append(collections[collection.ScopeName], collection.Name) + collectionIDs = append(collectionIDs, collection.GetCollectionID()) + } } else { // iterate through and grab collectionIDs we need for _, v := range db.RequireAttachmentMigration { collection, err := db.GetDatabaseCollection(v.ScopeName(), v.CollectionName()) if err != nil { - return nil, base.RedactErrorf("failed to find ID for collection %s.%s", base.MD(v.ScopeName()), base.MD(v.CollectionName())) + return nil, nil, base.RedactErrorf("failed to find collection %s.%s", base.MD(v.ScopeName()), base.MD(v.CollectionName())) + } + if _, ok := collections[collection.ScopeName]; !ok { + collections[collection.ScopeName] = make([]string, 0) } + collections[collection.ScopeName] = append(collections[collection.ScopeName], collection.Name) collectionIDs = append(collectionIDs, collection.GetCollectionID()) } } - return collectionIDs, nil + return collections, collectionIDs, nil } diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index 8914ad3864..23e15acab7 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -20,9 +20,6 @@ import ( ) func TestAttachmentMigrationTaskMixMigratedAndNonMigratedDocs(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -76,10 +73,6 @@ func getAttachmentMigrationStats(t *testing.T, migrationManager BackgroundManage } func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } - db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -142,9 +135,6 @@ func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { } func TestAttachmentMigrationManagerNoDocsToMigrate(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -182,9 +172,6 @@ func TestAttachmentMigrationManagerNoDocsToMigrate(t *testing.T) { } func TestMigrationManagerDocWithSyncAndGlobalAttachmentMetadata(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 643182f553..5db98061f2 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -163,31 +163,24 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers return true } - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return err - } - if r.hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against all collections", resyncLoggingID) } else { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against specified collections", resyncLoggingID) } - clientOptions := getResyncDCPClientOptions(r.collectionIDs, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - - dcpFeedKey := GenerateResyncDCPStreamName(r.ResyncID) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + clientOptions := getResyncDCPClientOptions(db, r.ResyncID, r.ResyncedCollections, callback) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create resync DCP client! %v", resyncLoggingID, err) return err } - base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for resync", resyncLoggingID, dcpFeedKey) - doneChan, err := dcpClient.Start() + base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for resync", resyncLoggingID, clientOptions.FeedPrefix) + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start resync DCP feed! %v", resyncLoggingID, err) - _ = dcpClient.Close() + dcpClient.Close() return err } base.DebugfCtx(ctx, base.KeyAll, "[%s] DCP client started.", resyncLoggingID) @@ -197,11 +190,6 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers select { case <-doneChan: base.InfofCtx(ctx, base.KeyAll, "[%s] Finished running sync function. %d/%d docs changed", resyncLoggingID, r.DocsChanged.Value(), r.DocsProcessed.Value()) - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close resync DCP client! %v", resyncLoggingID, err) - return err - } // If the principal docs sequences are regenerated, or the user doc need to be invalidated after a dynamic channel grant, db.QueryPrincipals is called to find the principal docs. // In the case that a database is created with "start_offline": true, it is possible the index needed to create this is not yet ready, so make sure it is ready for use. @@ -265,13 +253,9 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers } case <-terminator.Done(): base.DebugfCtx(ctx, base.KeyAll, "[%s] Terminator closed. Ending Resync process.", resyncLoggingID) - err = dcpClient.Close() - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to close resync DCP client! %v", resyncLoggingID, err) - return err - } + dcpClient.Close() - err = <-doneChan + err := <-doneChan if err != nil { return err } @@ -406,22 +390,28 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error { return InitializeIndexes(ctx, n1qlStore, options) } +func getResyncDCPPrefix(resyncID string) string { + return fmt.Sprintf( + "sg-%v:resync:%v", + base.ProductAPIVersion, + resyncID) +} + // getResyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync -func getResyncDCPClientOptions(collectionIDs []uint32, groupID string, prefix string) *base.DCPClientOptions { - return &base.DCPClientOptions{ +func getResyncDCPClientOptions(db *Database, resyncID string, collectionNames base.CollectionNames, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { + resyncPrefix := getResyncDCPPrefix(resyncID) + return base.DCPClientOptions{ + FeedPrefix: resyncPrefix, OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, - GroupID: groupID, - CollectionIDs: collectionIDs, - CheckpointPrefix: prefix, + CollectionNames: collectionNames, + CheckpointPrefix: GenerateResyncCheckpointPrefix(db.DatabaseContext, resyncPrefix), + Callback: callback, } } -// GenerateResyncDCPStreamName returns the DCP stream name for a resync. -func GenerateResyncDCPStreamName(resyncID string) string { - return fmt.Sprintf( - "sg-%v:resync:%v", - base.ProductAPIVersion, - resyncID) +// GenerateResyncCheckpointPrefix returns the prefix for the checkpoint documents +func GenerateResyncCheckpointPrefix(db *DatabaseContext, resyncID string) string { + return db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) + getResyncDCPPrefix(resyncID) } diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index 105e4ecd2a..cd52c74de8 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -169,10 +169,6 @@ func TestResyncDCPInit(t *testing.T) { } func TestResyncManagerDCPStopInMidWay(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 1000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) @@ -204,10 +200,6 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) { func TestResyncManagerDCPStart(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - t.Run("Resync without updating sync function", func(t *testing.T) { docsToCreate := 100 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, false) @@ -284,10 +276,6 @@ func TestResyncManagerDCPStart(t *testing.T) { } func TestResyncManagerDCPRunTwice(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 1000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, false) defer db.Close(ctx) @@ -325,10 +313,6 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { } func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 5000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) @@ -372,11 +356,6 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { // TestResyncManagerDCPResumeStoppedProcessChangeCollections starts a resync with a single collection, stops it, and re-runs with an additional collection. // Expects the resync process to reset with a new ID, and new checkpoints, and reprocess the full set of documents across both collections. func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - - base.SetUpTestLogging(t, base.LevelDebug) base.TestRequiresCollections(t) docsPerCollection := 5000 @@ -511,13 +490,6 @@ function sync(doc, oldDoc){ // TestResyncMou ensures that resync updates create mou, and preserve pcas in mou in the case where resync is reprocessing an import func TestResyncMou(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - if !base.TestUseXattrs() { - t.Skip("_mou is written to xattrs only") - } - base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport) db, ctx := setupTestDBWithOptionsAndImport(t, nil, DatabaseContextOptions{}) defer db.Close(ctx) diff --git a/db/change_listener.go b/db/change_listener.go index 10a62c6c6b..eaf3c541d6 100644 --- a/db/change_listener.go +++ b/db/change_listener.go @@ -36,7 +36,6 @@ type changeListener struct { dbCtx *DatabaseContext bucket base.Bucket bucketName string // Used for logging - tapFeed base.TapFeed // Observes changes to bucket tapNotifier *sync.Cond // Posts notifications when documents are updated FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc) counter uint64 // Event counter; increments on every doc update @@ -44,6 +43,7 @@ type changeListener struct { keyCounts map[channels.ID]uint64 // Latest count at which each doc key was updated OnChangeCallback DocChangedFunc terminator chan bool // Signal to cause DCP feed to exit + doneChan <-chan error // Channel that's closed when DCP feed has exited broadcastChangesDoneChan chan struct{} // Channel to signal that broadcast changes goroutine has terminated sgCfgPrefix string // SG config key prefix started base.AtomicBool // whether the feed has been started @@ -81,58 +81,32 @@ func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, d listener.terminator = make(chan bool) listener.bucket = bucket listener.bucketName = bucket.GetName() - listener.FeedArgs = sgbucket.FeedArguments{ - ID: base.DCPCachingFeedID, - Backfill: sgbucket.FeedNoBackfill, - Terminator: listener.terminator, - DoneChan: make(chan struct{}), - } - if len(scopes) > 0 { - // build the set of collections to be requested - - // Add the metadata collection first - metadataStoreFoundInScopes := false - scopeArgs := make(map[string][]string) - for scopeName, scope := range scopes { - collections := make([]string, 0) - for collectionName, _ := range scope.Collections { - collections = append(collections, collectionName) - if scopeName == metadataStore.ScopeName() && collectionName == metadataStore.CollectionName() { - metadataStoreFoundInScopes = true - } - } - scopeArgs[scopeName] = collections - } - // If the metadataStore's collection isn't already present in the list of scopes, add it to the DCP scopes - if !metadataStoreFoundInScopes { - _, ok := scopeArgs[metadataStore.ScopeName()] - if !ok { - scopeArgs[metadataStore.ScopeName()] = []string{metadataStore.CollectionName()} - } else { - scopeArgs[metadataStore.ScopeName()] = append(scopeArgs[metadataStore.ScopeName()], metadataStore.CollectionName()) - } + collectionNames := base.CollectionNames{} + collectionNames.Add(metadataStore) + for scopeName, collections := range scopes { + for collectionName := range collections.Collections { + collectionNames.Add(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName}) } - listener.FeedArgs.Scopes = scopeArgs - } listener.StartNotifierBroadcaster(ctx) // start broadcast changes goroutine - return listener.StartMutationFeed(ctx, bucket, dbStats) -} - -func (listener *changeListener) StartMutationFeed(ctx context.Context, bucket base.Bucket, dbStats *expvar.Map) (err error) { - - defer func() { - if err == nil { - listener.started.Set(true) - } - }() - - // DCP Feed - // DCP receiver isn't go-channel based - DCPReceiver calls ProcessEvent directly. - base.InfofCtx(ctx, base.KeyDCP, "Using DCP feed for bucket: %q (based on feed_type specified in config file)", base.MD(bucket.GetName())) - return bucket.StartDCPFeed(ctx, listener.FeedArgs, listener.ProcessFeedEvent, dbStats) + opts := base.DCPClientOptions{ + FeedPrefix: base.DCPCachingFeedID, + Callback: listener.ProcessFeedEvent, + Terminator: listener.terminator, + FromLatestSequence: true, + CollectionNames: collectionNames, + DBStats: dbStats, + MetadataStoreType: base.DCPMetadataStoreInMemory, + } + var err error + listener.doneChan, err = base.StartDCPFeed(ctx, bucket, opts) + if err != nil { + return err + } + listener.started.Set(true) + return nil } // DocumentType returns the type of document received over mutation feed based on its key prefix. @@ -215,17 +189,10 @@ func (listener *changeListener) Stop(ctx context.Context) { listener.tapNotifier.Broadcast() } - if listener.tapFeed != nil { - err := listener.tapFeed.Close() - if err != nil { - base.InfofCtx(ctx, base.KeyChanges, "Error closing listener tap feed: %v", err) - } - } - // Wait for mutation feed worker to terminate. waitTime := MutationFeedStopMaxWait select { - case <-listener.FeedArgs.DoneChan: + case <-listener.doneChan: // Mutation feed worker goroutine is terminated and doneChan is already closed. case <-time.After(waitTime): base.WarnfCtx(ctx, "Timeout after %v of waiting for mutation feed worker to terminate", waitTime) @@ -240,10 +207,6 @@ func (listener *changeListener) Stop(ctx context.Context) { } } -func (listener *changeListener) TapFeed() base.TapFeed { - return listener.tapFeed -} - //////// NOTIFICATIONS: // Changes the counter, notifying waiting clients. diff --git a/db/import_listener.go b/db/import_listener.go index e0bd1826f6..3087acc3c7 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -76,9 +76,7 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error DatabaseCollection: collection, user: nil, // admin } - if il.bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) && !dbContext.OnlyDefaultCollection() { - collectionNamesByScope[collection.ScopeName] = append(collectionNamesByScope[collection.ScopeName], collection.Name) - } + collectionNamesByScope[collection.ScopeName] = append(collectionNamesByScope[collection.ScopeName], collection.Name) } sort.Strings(collectionNamesByScope[scopeName]) if dbContext.OnlyDefaultCollection() { @@ -86,14 +84,6 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error } else { il.importDestKey = base.ImportDestKey(il.dbName, scopeName, collectionNamesByScope[scopeName]) } - feedArgs := sgbucket.FeedArguments{ - ID: base.DCPImportFeedID, - Backfill: sgbucket.FeedResume, - Terminator: il.terminator, - DoneChan: make(chan struct{}), - CheckpointPrefix: il.checkpointPrefix, - Scopes: collectionNamesByScope, - } base.InfofCtx(il.loggingCtx, base.KeyDCP, "Attempting to start import DCP feed %v...", base.MD(il.importDestKey)) @@ -105,20 +95,18 @@ func (il *importListener) StartImportFeed(dbContext *DatabaseContext) (err error // Start DCP mutation feed base.InfofCtx(il.loggingCtx, base.KeyImport, "Starting DCP import feed for bucket: %q ", base.UD(il.bucket.GetName())) - // TODO: need to clean up StartDCPFeed to push bucket dependencies down cbStore, ok := base.AsCouchbaseBucketStore(il.bucket) - if !ok { - // walrus is not a couchbasestore - return il.bucket.StartDCPFeed(il.loggingCtx, feedArgs, il.ProcessFeedEvent, importFeedStatsMap.Map) - } - - if !base.IsEnterpriseEdition() { - groupID := "" - gocbv2Bucket, err := base.AsGocbV2Bucket(il.bucket) - if err != nil { - return err + if !base.IsEnterpriseEdition() || !ok { + opts := base.DCPClientOptions{ + FeedPrefix: base.DCPImportFeedID, + Terminator: il.terminator, + CheckpointPrefix: il.checkpointPrefix, + CollectionNames: collectionNamesByScope, + DBStats: importFeedStatsMap.Map, + Callback: il.ProcessFeedEvent, } - return base.StartGocbDCPFeed(il.loggingCtx, gocbv2Bucket, il.bucket.GetName(), feedArgs, il.ProcessFeedEvent, importFeedStatsMap.Map, base.DCPMetadataStoreCS, groupID) + _, err = base.StartDCPFeed(il.loggingCtx, il.bucket, opts) + return err } il.cbgtContext, err = base.StartShardedDCPFeed(il.loggingCtx, dbContext.Name, dbContext.Options.GroupID, dbContext.UUID, dbContext.Heartbeater, diff --git a/db/util_testing.go b/db/util_testing.go index 277c0db9bd..6f127afda1 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -14,7 +14,6 @@ import ( "context" "errors" "fmt" - "maps" "net/http" "slices" "strconv" @@ -187,26 +186,19 @@ func purgeWithDCPFeed(ctx context.Context, bucket base.Bucket, tbp *base.TestBuc var purgeErrors *base.MultiError - collections := make(map[uint32]sgbucket.DataStore) - if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) { - dataStores, err := bucket.ListDataStores() + dataStores, err := bucket.ListDataStores() + if err != nil { + return err + } + collections := make(map[uint32]sgbucket.DataStore, len(dataStores)) + collectionNames := make(base.CollectionNames) + for _, dataStoreName := range dataStores { + collection, err := bucket.NamedDataStore(dataStoreName) if err != nil { return err } - for _, dataStoreName := range dataStores { - collection, err := bucket.NamedDataStore(dataStoreName) - if err != nil { - return err - } - collections[collection.GetCollectionID()] = collection - } - } - - dcpClientOpts := base.DCPClientOptions{ - OneShot: true, - FailOnRollback: false, - CollectionIDs: slices.Collect(maps.Keys(collections)), - MetadataStoreType: base.DCPMetadataStoreInMemory, + collectionNames.Add(dataStoreName) + collections[collection.GetCollectionID()] = collection } purgeCallback := func(event sgbucket.FeedEvent) bool { @@ -273,16 +265,20 @@ func purgeWithDCPFeed(ctx context.Context, bucket base.Bucket, tbp *base.TestBuc } return false } - feedID := "purgeFeed-" + bucket.GetName() - gocbBucket, err := base.AsGocbV2Bucket(bucket) - if err != nil { - return err + dcpClientOpts := base.DCPClientOptions{ + FeedPrefix: "purgeFeed", + OneShot: true, + FailOnRollback: false, + CollectionNames: collectionNames, + MetadataStoreType: base.DCPMetadataStoreInMemory, + Callback: purgeCallback, } - dcpClient, err := base.NewDCPClient(ctx, feedID, purgeCallback, dcpClientOpts, gocbBucket) + + dcpClient, err := base.NewDCPClient(ctx, bucket, dcpClientOpts) if err != nil { return err } - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { return fmt.Errorf("error starting purge DCP feed: %w", err) } @@ -294,12 +290,9 @@ func purgeWithDCPFeed(ctx context.Context, bucket base.Bucket, tbp *base.TestBuc tbp.Logf(ctx, "purgeDCPFeed finished with error: %v", err) } case <-timeout: + dcpClient.Close() return fmt.Errorf("timeout waiting for purge DCP feed to complete") } - closeErr := dcpClient.Close() - if closeErr != nil { - tbp.Logf(ctx, "error closing purge DCP feed: %v", closeErr) - } tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount.Load()) tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount.Load()) diff --git a/rest/adminapitest/resync_test.go b/rest/adminapitest/resync_test.go index 8e6a5af44d..d8588c6907 100644 --- a/rest/adminapitest/resync_test.go +++ b/rest/adminapitest/resync_test.go @@ -56,8 +56,7 @@ func TestResyncRollback(t *testing.T) { require.Equal(t, db.BackgroundProcessStateStopped, status.State) // alter persisted dcp metadata from the first run to force a rollback - name := db.GenerateResyncDCPStreamName(status.ResyncID) - checkpointPrefix := fmt.Sprintf("%s:%v", rt.GetDatabase().MetadataKeys.DCPCheckpointPrefix(rt.GetDatabase().Options.GroupID), name) + checkpointPrefix := db.GenerateResyncCheckpointPrefix(rt.GetDatabase(), status.ResyncID) meta := base.NewDCPMetadataCS(rt.Context(), rt.Bucket().DefaultDataStore(), 1024, 8, checkpointPrefix) vbMeta := meta.GetMeta(0) var garbageVBUUID gocbcore.VbUUID = 1234 diff --git a/rest/attachmentcompactiontest/attachment_compaction_api_test.go b/rest/attachmentcompactiontest/attachment_compaction_api_test.go index 8a007fb579..22c40703b0 100644 --- a/rest/attachmentcompactiontest/attachment_compaction_api_test.go +++ b/rest/attachmentcompactiontest/attachment_compaction_api_test.go @@ -412,9 +412,7 @@ func TestAttachmentCompactionMarkPhaseRollback(t *testing.T) { require.Equal(t, db.MarkPhase, stat.Phase) // alter persisted dcp metadata from the first run to force a rollback - name := db.GenerateCompactionDCPStreamName(stat.CompactID, "mark") - checkpointPrefix := fmt.Sprintf("%s:%v", "_sync:dcp_ck:", name) - + checkpointPrefix := db.GetAttachmentCompactionCheckpointPrefix(rt.GetDatabase(), stat.CompactID, db.MarkPhase) meta := base.NewDCPMetadataCS(rt.Context(), dataStore, 1024, 8, checkpointPrefix) vbMeta := meta.GetMeta(0) vbMeta.VbUUID = garbageVBUUID diff --git a/tools/cache_perf_tool/dcpDataGeneration.go b/tools/cache_perf_tool/dcpDataGeneration.go index 86c312fa61..b152d48118 100644 --- a/tools/cache_perf_tool/dcpDataGeneration.go +++ b/tools/cache_perf_tool/dcpDataGeneration.go @@ -31,7 +31,7 @@ type dcpDataGen struct { seqAlloc *sequenceAllocator delays []time.Duration dbCtx *db.DatabaseContext - client *base.DCPClient + client *base.GoCBDCPClient numChannelsPerDoc int numTotalChannels int simRapidUpdate bool @@ -327,10 +327,9 @@ func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int, casValue u return encodedVal, chanCount, nil } -func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.DCPClient, error) { - options := base.DCPClientOptions{ +func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.GoCBDCPClient, error) { + options := base.GoCBDCPClientOptions{ MetadataStoreType: base.DCPMetadataStoreInMemory, - GroupID: "", DbStats: dbStats, CollectionIDs: []uint32{0}, AgentPriority: gocbcore.DcpAgentPriorityMed, From dc06fb04c7ee4871f57efa0eaea998dcae0148e2 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Dec 2025 15:20:51 -0500 Subject: [PATCH 2/3] Unskip tests --- base/dcp_client_test.go | 29 ++-------------- base/dcp_sharded.go | 1 - base/util_testing.go | 16 +-------- db/attachment_compaction.go | 33 ++++++++----------- db/attachment_compaction_test.go | 14 ++++---- db/background_mgr_attachment_compaction.go | 14 ++++---- db/background_mgr_attachment_migration.go | 10 +++--- db/database.go | 16 +++++++-- rest/access_test.go | 1 - rest/adminapitest/admin_api_test.go | 11 ------- rest/adminapitest/resync_test.go | 5 +-- .../attachment_migration_test.go | 8 ++--- rest/indextest/resync_test.go | 1 - rest/sync_fn_test.go | 8 ----- 14 files changed, 54 insertions(+), 113 deletions(-) diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index 2e54fc9565..e60840a38c 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -424,7 +424,6 @@ func (dc *GoCBDCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) { // TestResumeInterruptedFeed uses persisted metadata to resume the feed func TestResumeStoppedFeed(t *testing.T) { - //SetUpTestLogging(t, LevelDebug, KeyAll) ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -628,21 +627,7 @@ func TestDCPOutOfRangeSequence(t *testing.T) { } -func getCollectionIDs(t *testing.T, bucket *TestBucket) []uint32 { - collection, err := AsCollection(bucket.GetSingleDataStore()) - require.NoError(t, err) - - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } - return collectionIDs - -} - func TestDCPFeedEventTypes(t *testing.T) { - TestRequiresGocbDCPClient(t) - ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -650,14 +635,6 @@ func TestDCPFeedEventTypes(t *testing.T) { collection := bucket.GetSingleDataStore() collectionNames := CollectionNames{collection.ScopeName(): []string{collection.CollectionName()}} - // start one shot feed - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } - - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) foundEvent := make(chan struct{}) docID := t.Name() @@ -696,7 +673,7 @@ func TestDCPFeedEventTypes(t *testing.T) { CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), Callback: callback, } - dcpClient, err := NewDCPClient(ctx, gocbv2Bucket, clientOptions) + dcpClient, err := NewDCPClient(ctx, bucket, clientOptions) require.NoError(t, err) doneChan, startErr := dcpClient.Start(ctx) @@ -738,9 +715,7 @@ func TestDCPFeedEventTypes(t *testing.T) { } func TestDCPClientAgentConfig(t *testing.T) { - if UnitTestUrlIsWalrus() { - t.Skip("exercises gocbcore code") - } + TestRequiresGocbDCPClient(t) ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 545dac7118..103f4a3c79 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -64,7 +64,6 @@ type CbgtContext struct { // StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket. // dbName is used to define a unique path name for local file storage of pindex files func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) { - fmt.Printf("cfg=%+v\n", cfg) // Ensure we don't try to start collections-enabled feed if there are any pre-collection SG nodes in the cluster. minVersion, err := getMinNodeVersion(cfg) if err != nil { diff --git a/base/util_testing.go b/base/util_testing.go index 6ca3b8e4b1..280bc2a310 100644 --- a/base/util_testing.go +++ b/base/util_testing.go @@ -758,24 +758,10 @@ func TestRequiresCollections(t testing.TB) { } } -// TestRequiresOneShotDCPClient will skip the current test until rosmar supports one-shot DCP. -func TestRequiresOneShotDCPClient(t testing.TB) { - if UnitTestUrlIsWalrus() { - t.Skip("rosmar doesn't have an abstracted one shot DCP client CBG-4246") - } -} - -// TestRequiresDCPResync will skip the current test DCP sync is not supported. -func TestRequiresDCPResync(t testing.TB) { - if UnitTestUrlIsWalrus() { - t.Skip("Walrus doesn't support DCP resync CBG-2661/CBG-4246") - } -} - // TestRequiresGocbDCPClient will skip the current test if using rosmar. func TestRequiresGocbDCPClient(t testing.TB) { if UnitTestUrlIsWalrus() { - t.Skip("rosmar doesn't support base.DCPClient") + t.Skip("rosmar doesn't support GocbDCPClient") } } diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index da5e8c60d2..2be96bb354 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -29,7 +29,7 @@ const ( CleanupPhase = "cleanup" ) -func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, terminator *base.SafeTerminator, markedAttachmentCount *base.AtomicInt) (count int64, vbUUIDs []uint64, checkpointPrefix string, err error) { +func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, terminator *base.SafeTerminator, markedAttachmentCount *base.AtomicInt) (count int64, vbUUIDs []uint64, checkpointPrefix string, feedPrefix string, err error) { base.InfofCtx(ctx, base.KeyAll, "Starting first phase of attachment compaction (mark phase) with compactionID: %q", compactionID) compactionLoggingID := "Compaction Mark: " + compactionID @@ -138,7 +138,7 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) - return 0, nil, "", err + return 0, nil, "", "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() @@ -146,7 +146,7 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) dcpClient.Close() - return 0, nil, metadataKeyPrefix, err + return 0, nil, metadataKeyPrefix, clientOptions.FeedPrefix, err } base.DebugfCtx(ctx, base.KeyAll, "[%s] DCP feed started.", compactionLoggingID) @@ -154,27 +154,27 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c case <-doneChan: base.InfofCtx(ctx, base.KeyAll, "[%s] Mark phase of attachment compaction completed. Marked %d attachments", compactionLoggingID, markedAttachmentCount.Value()) if markProcessFailureErr != nil { - return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr + return markedAttachmentCount.Value(), nil, metadataKeyPrefix, clientOptions.FeedPrefix, markProcessFailureErr } case <-terminator.Done(): base.DebugfCtx(ctx, base.KeyAll, "[%s] Terminator closed. Stopping mark phase.", compactionLoggingID) dcpClient.Close() if markProcessFailureErr != nil { - return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr + return markedAttachmentCount.Value(), nil, metadataKeyPrefix, clientOptions.FeedPrefix, markProcessFailureErr } if err != nil { - return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err + return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err } err = <-doneChan if err != nil { - return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err + return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err } base.InfofCtx(ctx, base.KeyAll, "[%s] Mark phase of attachment compaction was terminated. Marked %d attachments", compactionLoggingID, markedAttachmentCount.Value()) } - return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err + return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err } // AttachmentsMetaMap struct is a very minimal struct to unmarshal into when getting attachments from bodies @@ -406,7 +406,7 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, return purgedAttachmentCount.Value(), err } -func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (string, error) { +func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (checkpointPrefix string, feedPrefix string, err error) { base.InfofCtx(ctx, base.KeyAll, "Starting third phase of attachment compaction (cleanup phase) with compactionID: %q", compactionID) compactionLoggingID := "Compaction Cleanup: " + compactionID @@ -495,15 +495,10 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for cleanup phase of attachment compaction", compactionLoggingID) - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return "", err - } - - dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) - return "", err + return "", "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() @@ -511,7 +506,7 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) dcpClient.Close() - return metadataKeyPrefix, err + return metadataKeyPrefix, clientOptions.FeedPrefix, err } select { @@ -521,13 +516,13 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore dcpClient.Close() err = <-doneChan if err != nil { - return metadataKeyPrefix, err + return metadataKeyPrefix, clientOptions.FeedPrefix, err } base.InfofCtx(ctx, base.KeyAll, "[%s] Cleanup phase of attachment compaction was terminated", compactionLoggingID) } - return metadataKeyPrefix, err + return metadataKeyPrefix, clientOptions.FeedPrefix, err } // getCompactionIDSubDocPath is just a tiny helper func that just concatenates the subdoc path we're using to store diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 166a5934bd..e3102a212b 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -25,7 +25,7 @@ import ( ) func testRequiresRosmarHierachicalSubdocOps(t *testing.T) { - t.Skip("hierachical subdoc operations are not supported by rosmar yet - CBG-4232") + t.Skip("hierarchical subdoc operations are not supported by rosmar yet - CBG-4232") } func TestAttachmentMark(t *testing.T) { @@ -62,7 +62,7 @@ func TestAttachmentMark(t *testing.T) { attKeys = append(attKeys, createDocWithInBodyAttachment(t, ctx, "inBodyDoc", []byte(`{}`), "attForInBodyRef", []byte(`{"val": "inBodyAtt"}`), databaseCollection)) terminator := base.NewSafeTerminator() - attachmentsMarked, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{}) + attachmentsMarked, _, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{}) assert.NoError(t, err) assert.Equal(t, int64(13), attachmentsMarked) @@ -202,7 +202,7 @@ func TestAttachmentCleanup(t *testing.T) { } terminator := base.NewSafeTerminator() - _, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) + _, _, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) assert.NoError(t, err) for _, docID := range singleMarkedAttIDs { @@ -357,7 +357,7 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) { } terminator := base.NewSafeTerminator() - attachmentsMarked, vbUUIDS, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{}) + attachmentsMarked, vbUUIDS, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{}) assert.NoError(t, err) assert.Equal(t, int64(10), attachmentsMarked) @@ -382,7 +382,7 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) { } } - _, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) + _, _, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) assert.NoError(t, err) for _, attDocKey := range attKeys { @@ -696,7 +696,7 @@ func TestAttachmentDifferentVBUUIDsBetweenPhases(t *testing.T) { // Run mark phase as usual terminator := base.NewSafeTerminator() - _, vbUUIDs, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDB, t.Name(), terminator, &base.AtomicInt{}) + _, vbUUIDs, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDB, t.Name(), terminator, &base.AtomicInt{}) assert.NoError(t, err) // Manually modify a vbUUID and ensure the Sweep phase errors @@ -975,7 +975,7 @@ func TestAttachmentCompactIncorrectStat(t *testing.T) { stat := &base.AtomicInt{} count := int64(0) go func() { - attachmentCount, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, "mark", terminator, stat) + attachmentCount, _, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, "mark", terminator, stat) atomic.StoreInt64(&count, attachmentCount) require.NoError(t, err) }() diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index 3386ba155d..4bf1a9d4d6 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -133,9 +133,10 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin a.SetPhase("mark") worker := func() (shouldRetry bool, err error, value any) { persistClusterStatus() - _, a.VBUUIDs, metadataKeyPrefix, err = attachmentCompactMarkPhase(ctx, dataStore, collectionID, database, a.CompactID, terminator, &a.MarkedAttachments) + var feedPrefix string + _, a.VBUUIDs, metadataKeyPrefix, feedPrefix, err = attachmentCompactMarkPhase(ctx, dataStore, collectionID, database, a.CompactID, terminator, &a.MarkedAttachments) if err != nil { - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, metadataKeyPrefix) + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, metadataKeyPrefix, feedPrefix) } return shouldRetry, err, nil } @@ -161,9 +162,10 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin a.SetPhase("cleanup") worker := func() (shouldRetry bool, err error, value any) { persistClusterStatus() - metadataKeyPrefix, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) + var feedPrefix string + metadataKeyPrefix, feedPrefix, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) if err != nil { - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix) + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix, feedPrefix) } return shouldRetry, err, nil } @@ -182,13 +184,13 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } -func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase, keyPrefix string) (bool, error) { +func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase, checkpointPrefix, feedPrefix string) (bool, error) { var rollbackErr gocbcore.DCPRollbackError if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) { base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID) + err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, checkpointPrefix, feedPrefix) if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 36f868d01f..de984e9197 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -165,7 +165,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) // check for mismatch in collection id's between current collections on the db and prev run - err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, currCollectionIDs) + err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, dcpOptions.FeedPrefix, currCollectionIDs) if err != nil { return err } @@ -315,15 +315,15 @@ func getAttachmentMigrationPrefix(migrationID string) string { } // resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run -func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error { +func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string, collectionIDs []uint32) error { // if we are on our first run, no collections will be defined on the manager yet if len(a.CollectionIDs) == 0 { return nil } if len(a.CollectionIDs) != len(collectionIDs) { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) - if err != nil { + err := PurgeDCPCheckpoints(ctx, database, checkpointPrefix, feedPrefix) + if err != nil && !base.IsDocNotFoundError(err) { return err } return nil @@ -333,7 +333,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) if purgeNeeded != 0 { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + err := PurgeDCPCheckpoints(ctx, database, checkpointPrefix, feedPrefix) if err != nil { return err } diff --git a/db/database.go b/db/database.go index d0a166e97d..5575cad3d9 100644 --- a/db/database.go +++ b/db/database.go @@ -2466,7 +2466,7 @@ func (db *DatabaseContext) StartOnlineProcesses(ctx context.Context) (returnedEr db.AttachmentMigrationManager = NewAttachmentMigrationManager(db) // if we have collections requiring migration, run the job - if len(db.RequireAttachmentMigration) > 0 && !db.BucketSpec.IsWalrusBucket() { + if len(db.RequireAttachmentMigration) > 0 { err := db.AttachmentMigrationManager.Start(ctx, nil) if err != nil { base.WarnfCtx(ctx, "Error trying to migrate attachments for %s with error: %v", db.Name, err) @@ -2572,11 +2572,21 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 { } // PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 -func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, taskID string) error { +func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error { bucket, err := base.AsGocbV2Bucket(database.Bucket) if err != nil { - return err + checkpoint := checkpointPrefix + ":" + feedPrefix + fmt.Printf("Deleting DCP checkpoint %q\n", checkpoint) + err := database.MetadataStore.Delete(checkpoint) + if err != nil && !base.IsDocNotFoundError(err) { + return err + } + if base.IsDocNotFoundError(err) { + fmt.Printf("No DCP checkpoint found %q\n", checkpoint) + return nil + } + return nil } numVbuckets, err := bucket.GetMaxVbno() if err != nil { diff --git a/rest/access_test.go b/rest/access_test.go index 8d79566ff9..ed5dd5f071 100644 --- a/rest/access_test.go +++ b/rest/access_test.go @@ -760,7 +760,6 @@ func TestAllDocsAccessControl(t *testing.T) { } func TestChannelAccessChanges(t *testing.T) { - base.TestRequiresDCPResync(t) base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD) rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`, PersistentConfig: true} rt := NewRestTester(t, &rtConfig) diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 6a30d1a6e6..a21c7f354b 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -620,7 +620,6 @@ func TestDBGetConfigCustomLogging(t *testing.T) { } func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) syncFn := ` function(doc) { channel("x") @@ -647,7 +646,6 @@ func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) { } func TestDCPResyncCollectionsStatus(t *testing.T) { - base.TestRequiresDCPResync(t) base.TestRequiresCollections(t) testCases := []struct { @@ -704,8 +702,6 @@ func TestDCPResyncCollectionsStatus(t *testing.T) { } func TestResyncUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) - testCases := []struct { docsCreated int }{ @@ -770,8 +766,6 @@ func TestResyncUsingDCPStream(t *testing.T) { } func TestResyncUsingDCPStreamReset(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { channel("x") @@ -832,7 +826,6 @@ func TestResyncUsingDCPStreamReset(t *testing.T) { } func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { - base.TestRequiresDCPResync(t) base.TestRequiresCollections(t) numCollections := 2 @@ -894,8 +887,6 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { } func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { channel("x") @@ -1420,8 +1411,6 @@ func TestConfigPollingRemoveDatabase(t *testing.T) { } func TestResyncStopUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { channel("x") diff --git a/rest/adminapitest/resync_test.go b/rest/adminapitest/resync_test.go index d8588c6907..845cde4ca1 100644 --- a/rest/adminapitest/resync_test.go +++ b/rest/adminapitest/resync_test.go @@ -24,9 +24,7 @@ import ( // TestResyncRollback ensures that we allow rollback of func TestResyncRollback(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("This test doesn't works with walrus") - } + base.TestRequiresGocbDCPClient(t) rt := rest.NewRestTester(t, &rest.RestTesterConfig{ SyncFn: `function(doc) { channel("x") }`, // use custom sync function to increment sync function counter }) @@ -128,7 +126,6 @@ func TestResyncRegenerateSequencesCorruptDocumentSequence(t *testing.T) { } func TestResyncRegenerateSequencesPrincipals(t *testing.T) { - base.TestRequiresDCPResync(t) if !base.TestsUseNamedCollections() { t.Skip("Test requires named collections, performs default collection handling independently") } diff --git a/rest/attachmentmigrationtest/attachment_migration_test.go b/rest/attachmentmigrationtest/attachment_migration_test.go index 1ee667de0c..8fcf653e5f 100644 --- a/rest/attachmentmigrationtest/attachment_migration_test.go +++ b/rest/attachmentmigrationtest/attachment_migration_test.go @@ -26,7 +26,6 @@ import ( // - Grab attachment migration manager and assert it has run upon db startup // - Assert job has written syncInfo metaVersion as expected to the bucket func TestMigrationJobStartOnDbStart(t *testing.T) { - base.TestRequiresOneShotDCPClient(t) rt := rest.NewRestTesterPersistentConfig(t) defer rt.Close() @@ -46,7 +45,9 @@ func TestMigrationJobStartOnDbStart(t *testing.T) { // to be processed twice in the job, so we can assert that the job has processed more docs than we added // - Assert sync info: metaVersion is written to BOTH collections in the db config func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { - base.TestRequiresOneShotDCPClient(t) + if base.UnitTestUrlIsWalrus() { + t.Skip("This test currently fails under rosmar due to bucket closing when updating the database.") + } base.TestRequiresCollections(t) base.RequireNumTestDataStores(t, 2) @@ -143,7 +144,6 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { // after update to db config + assert on collections requiring migration // - Assert that syncInfo: metaVersion is written for new collection (and is still present in original collection) func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { - base.TestRequiresOneShotDCPClient(t) base.TestRequiresCollections(t) base.RequireNumTestDataStores(t, 2) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) @@ -242,7 +242,6 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { // - Assert that the migration job is not re-run (docs processed is the same as before + collections // requiring migration is empty) func TestMigrationNoReRunStartStopDb(t *testing.T) { - base.TestRequiresOneShotDCPClient(t) base.TestRequiresCollections(t) base.RequireNumTestDataStores(t, 2) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) @@ -321,7 +320,6 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) { // - Wait for migration job to start // - Attempt to start job again on manager, assert we get error func TestStartMigrationAlreadyRunningProcess(t *testing.T) { - base.TestRequiresOneShotDCPClient(t) base.TestRequiresCollections(t) base.RequireNumTestDataStores(t, 1) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) diff --git a/rest/indextest/resync_test.go b/rest/indextest/resync_test.go index 5c7de3a8db..4a3efb5fd8 100644 --- a/rest/indextest/resync_test.go +++ b/rest/indextest/resync_test.go @@ -19,7 +19,6 @@ import ( ) func TestResyncWithoutIndexes(t *testing.T) { - base.TestRequiresDCPResync(t) rt := rest.NewRestTester(t, &rest.RestTesterConfig{ PersistentConfig: true}) defer rt.Close() diff --git a/rest/sync_fn_test.go b/rest/sync_fn_test.go index abba2fc3dc..76b132b857 100644 --- a/rest/sync_fn_test.go +++ b/rest/sync_fn_test.go @@ -407,8 +407,6 @@ func TestSyncFnTimeout(t *testing.T) { } func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { channel("x") @@ -455,8 +453,6 @@ func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { } func TestResyncStopUsingDCPStream(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { channel("x") @@ -497,8 +493,6 @@ func TestResyncStopUsingDCPStream(t *testing.T) { } func TestResyncRegenerateSequences(t *testing.T) { - base.TestRequiresDCPResync(t) - syncFn := ` function(doc) { if (doc.userdoc){ @@ -627,8 +621,6 @@ func TestResyncRegenerateSequences(t *testing.T) { // CBG-2150: Tests that resync status is cluster aware func TestResyncPersistence(t *testing.T) { - base.TestRequiresDCPResync(t) - tb := base.GetTestBucket(t) noCloseTB := tb.NoCloseClone() From f7dec4054985f55cf6625bbadeee290919920651 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 1 Dec 2025 21:17:07 -0500 Subject: [PATCH 3/3] copilot suggestions --- base/abstract_dcp_client.go | 2 ++ base/rosmar_dcp_client.go | 1 - db/attachment_compaction.go | 13 ++++--------- db/attachment_compaction_test.go | 4 +--- db/database.go | 2 -- 5 files changed, 7 insertions(+), 15 deletions(-) diff --git a/base/abstract_dcp_client.go b/base/abstract_dcp_client.go index 45449e8e3a..72cb816ab4 100644 --- a/base/abstract_dcp_client.go +++ b/base/abstract_dcp_client.go @@ -58,6 +58,8 @@ func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DC return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true") } else if opts.MetadataStoreType == DCPMetadataStoreInMemory && opts.CheckpointPrefix != "" { return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory") + } else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" { + return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent") } underlyingBucket := GetBaseBucket(bucket) if _, ok := underlyingBucket.(*rosmar.Bucket); ok { diff --git a/base/rosmar_dcp_client.go b/base/rosmar_dcp_client.go index 386d9cc27f..d4c49804bc 100644 --- a/base/rosmar_dcp_client.go +++ b/base/rosmar_dcp_client.go @@ -75,6 +75,5 @@ func (dc *RosmarDCPClient) GetMetadata() []DCPMetadata { // GetMetadataKeyPrefix returns the document prefix for the checkpoint documents. func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string { - // this value is probably not correct return dc.opts.CheckpointPrefix } diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 2be96bb354..15c4aa07dc 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -368,13 +368,8 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, clientOptions := getCompactionDCPClientOptions(db, compactionID, SweepPhase, dataStore, callback, base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs)) - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return 0, err - } - base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, clientOptions.FeedPrefix) - dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return 0, err @@ -534,7 +529,7 @@ func getCompactionIDSubDocPath(compactionID string) string { // getCompactionDCPClientOptions returns the default set of DCPClientOptions suitable for attachment compaction func getCompactionDCPClientOptions(db *Database, compactionID string, compactionAction string, dataStore sgbucket.DataStore, callback sgbucket.FeedEventCallbackFunc, initialMetadata []base.DCPMetadata) base.DCPClientOptions { return base.DCPClientOptions{ - FeedPrefix: getAttachmentionCompactionPrefix(compactionID, compactionAction), + FeedPrefix: getAttachmentCompactionPrefix(compactionID, compactionAction), OneShot: true, FailOnRollback: true, MetadataStoreType: base.DCPMetadataStoreCS, @@ -547,7 +542,7 @@ func getCompactionDCPClientOptions(db *Database, compactionID string, compaction } } -func getAttachmentionCompactionPrefix(compactionID string, compactionAction string) string { +func getAttachmentCompactionPrefix(compactionID string, compactionAction string) string { return fmt.Sprintf("sg-%v:att_compaction:%v_%v", base.ProductAPIVersion, compactionID, @@ -555,7 +550,7 @@ func getAttachmentionCompactionPrefix(compactionID string, compactionAction stri ) } func GetAttachmentCompactionCheckpointPrefix(db *DatabaseContext, compactionID string, compactionAction string) string { - return getAttachmentionCompactionPrefix(compactionID, compactionAction) + db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) + return getAttachmentCompactionPrefix(compactionID, compactionAction) + db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID) } // getAttachmentCompactionXattr returns the value of the attachment compaction xattr from a DCP stream. The value will be nil if the xattr is not found. diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index e3102a212b..29a01c2173 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -281,11 +281,9 @@ func TestAttachmentCleanupRollback(t *testing.T) { require.NoError(t, err) } - bucket, err := base.AsGocbV2Bucket(testDb.Bucket) - require.NoError(t, err) clientOptions := getCompactionDCPClientOptions(testDb, t.Name(), CleanupPhase, dataStore, func(sgbucket.FeedEvent) bool { return true }, nil) - dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) + dcpClient, err := base.NewDCPClient(ctx, testDb.Bucket, clientOptions) require.NoError(t, err) // alter dcp metadata to feed into the compaction manager diff --git a/db/database.go b/db/database.go index 5575cad3d9..12a7cd40b5 100644 --- a/db/database.go +++ b/db/database.go @@ -2577,13 +2577,11 @@ func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpo bucket, err := base.AsGocbV2Bucket(database.Bucket) if err != nil { checkpoint := checkpointPrefix + ":" + feedPrefix - fmt.Printf("Deleting DCP checkpoint %q\n", checkpoint) err := database.MetadataStore.Delete(checkpoint) if err != nil && !base.IsDocNotFoundError(err) { return err } if base.IsDocNotFoundError(err) { - fmt.Printf("No DCP checkpoint found %q\n", checkpoint) return nil } return nil