Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions base/abstract_dcp_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2025-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package base

import (
"context"
"expvar"
"fmt"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbaselabs/rosmar"
)

// DCPClient is an interface for all DCP implementations.
type DCPClient interface {
// Start will start the DCP feed. It returns a channel marking the end of the feed.
Start(ctx context.Context) (chan error, error)
// Close will shut down the DCP feed.
Close()
// GetMetadata returns the current DCP metadata.
GetMetadata() []DCPMetadata
// GetMetadataKeyPrefix returns the key prefix used for storing any persistent data.
GetMetadataKeyPrefix() string
}

// DCPClientOptions are options for creating a DCPClient.
type DCPClientOptions struct {
FeedPrefix string // name of the DCP feed, used for logging locally and stored by Couchbase Server
Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events
DBStats *expvar.Map // these options are used only for gocbcore implementation, these stats are not shared by prometheus stats
CheckpointPrefix string // start of the checkpoint documents
CollectionNames CollectionNames // scopes and collections to monitor
InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with
MetadataStoreType DCPMetadataStoreType // persistent or in memory storage
OneShot bool // if true, the feed runs to latest document found when the client is started
FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition
Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option.
FromLatestSequence bool // If true, start at latest sequence.
}

// NewDCPClient creates a new DCPClient to receive events from a bucket.
func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) {
if opts.FeedPrefix == "" {
return nil, fmt.Errorf("DCPClientOptions.IDPrefix must be provided")
} else if bucket == nil {
return nil, fmt.Errorf("bucket must be provided")
} else if opts.Callback == nil {
return nil, fmt.Errorf("DCPClientOptions.Callback must be provided")
} else if len(opts.CollectionNames) == 0 {
return nil, fmt.Errorf("DCPClientOptions.CollectionNames must be provided")
} else if opts.FromLatestSequence && len(opts.InitialMetadata) > 0 {
return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true")
} else if opts.MetadataStoreType == DCPMetadataStoreInMemory && opts.CheckpointPrefix != "" {
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory")
} else if opts.MetadataStoreType == DCPMetadataStoreCS && opts.CheckpointPrefix == "" {
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix must be provided when MetadataStoreType is persistent")
}
underlyingBucket := GetBaseBucket(bucket)
if _, ok := underlyingBucket.(*rosmar.Bucket); ok {
return NewRosmarDCPClient(bucket, opts)
} else if gocbBucket, ok := underlyingBucket.(*GocbV2Bucket); ok {
return newGocbDCPClient(ctx, gocbBucket, opts)
}
return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket)
}

// StartDCPFeed creates and starts a DCP feed. This function will return as soon as the feed is started. doneChan is
// sent a single error value when the feed terminates.
func StartDCPFeed(ctx context.Context, bucket Bucket, opts DCPClientOptions) (doneChan <-chan error, err error) {
client, err := NewDCPClient(ctx, bucket, opts)
if err != nil {
return nil, err
}
bucketName := bucket.GetName()
feedName := opts.FeedPrefix

doneChan, err = client.Start(ctx)
if err != nil {
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
client.Close()
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
if doneChan != nil {
<-doneChan
}
return nil, err
}
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that for a one-shot feed, a closed doneChan wasn't an error case.

WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
// FIXME: close dbContext here
break
case <-opts.Terminator:
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
client.Close()
dcpCloseErr := <-doneChan
if dcpCloseErr != nil {
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
}
break
}
}()
return doneChan, err
}
3 changes: 1 addition & 2 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
}

func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
groupID := ""
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
return errors.New("GocbV2Bucket does not support StartDCPFeed; use NewDCPClient instead")
}

func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {
Expand Down
17 changes: 17 additions & 0 deletions base/collection_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package base

import (
"errors"
"slices"

sgbucket "github.com/couchbase/sg-bucket"
)
Expand All @@ -20,6 +21,9 @@ var ErrCollectionsUnsupported = errors.New("collections not supported")

type ScopeAndCollectionName = sgbucket.DataStoreNameImpl

// CollectionNames is map of scope name to slice of collection names.
type CollectionNames map[string][]string

func DefaultScopeAndCollectionName() ScopeAndCollectionName {
return ScopeAndCollectionName{Scope: DefaultScope, Collection: DefaultCollection}
}
Expand All @@ -45,3 +49,16 @@ func (s ScopeAndCollectionNames) ScopeAndCollectionNames() []string {
func FullyQualifiedCollectionName(bucketName, scopeName, collectionName string) string {
return bucketName + "." + scopeName + "." + collectionName
}

// Add adds any collections to the collections. Any duplicates will be ignored.
func (c CollectionNames) Add(ds ...sgbucket.DataStoreName) {
for _, d := range ds {
if _, ok := c[d.ScopeName()]; !ok {
c[d.ScopeName()] = []string{}
} else if slices.Contains(c[d.ScopeName()], d.CollectionName()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the slices.Contains inside the ds iteration - especially thinking about in the future when we're considering increasing the collection limits.

It kind of feels like this would be better suited as a map[string]map[string]struct{} or similar to avoid the slice iterations at insertion time, and handle dedupe.

// avoid duplicates
continue
}
c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName())
}
}
Loading
Loading