Skip to content

Conversation

@torcolvin
Copy link
Collaborator

Create an abstract DCPClient to be able to work with rosmar

I expect that this can be extended to work as a sharded or non shared DCP client as well, but I think there's interesting work here.

I made this "work" but not for attachment compaction since this uses hierarchical paths in rosmar for xattr subdoc operations https://jira.issues.couchbase.com/browse/CBG-4232

TODO

Pre-review checklist

  • Removed debug logging (fmt.Print, log.Print, ...)
  • Logging sensitive data? Make sure it's tagged (e.g. base.UD(docID), base.MD(dbName))
  • Updated relevant information in the API specifications (such as endpoint descriptions, schemas, ...) in docs/api

Dependencies (if applicable)

  • Link upstream PRs
  • Update Go module dependencies when merged

Integration Tests

Copilot AI review requested due to automatic review settings November 13, 2025 23:06
@torcolvin torcolvin marked this pull request as draft November 13, 2025 23:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces an abstract DCPClient interface to support both Couchbase Server (via gocbcore) and Rosmar backends. The key change is creating a unified DCP client abstraction that dispatches to implementation-specific clients based on the underlying bucket type.

Key changes:

  • Created DCPClient interface with implementations GoCBDCPClient (renamed from DCPClient) and RosmarDCPClient
  • Unified DCPClientOptions struct replacing separate options for each implementation
  • Removed test skips for Rosmar/Walrus, enabling DCP-based tests to run against all bucket types

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
base/abstract_dcp_client.go New abstraction layer with DCPClient interface and factory function NewDCPClient
base/rosmar_dcp_client.go New Rosmar-specific DCP client implementation
base/dcp_client.go Renamed DCPClient to GoCBDCPClient and DCPClientOptions to GoCBDCPClientOptions
base/dcp_client_stream_observer.go Updated receiver types from DCPClient to GoCBDCPClient
base/gocb_dcp_feed.go Refactored to use new client creation pattern
db/background_mgr_resync_dcp.go Updated to use new abstract client with scope-based collection specification
db/background_mgr_attachment_migration.go Updated to use new abstract client with scope-based collection specification
db/attachment_compaction.go Updated to use new abstract client with scope-based collection specification
db/util_testing.go Updated to use new abstract client with scope-based collection specification
db/background_mgr_resync_dcp_test.go Removed Walrus test skips
db/background_mgr_attachment_migration_test.go Removed Walrus test skips
db/attachment_compaction_test.go Removed Walrus test skips
base/dcp_client_test.go Removed Walrus test skips and commented out unported tests
tools/cache_perf_tool/dcpDataGeneration.go Updated type references to GoCBDCPClient

@torcolvin torcolvin requested a review from Copilot December 2, 2025 02:06
@torcolvin torcolvin changed the title Prototype: Create an abstract DCPClient to be able to work with rosmar CBG-4249: Create an abstract DCPClient to be able to work with rosmar Dec 2, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 33 out of 33 changed files in this pull request and generated 9 comments.

Copy link
Collaborator

@adamcfraser adamcfraser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few initial comments, haven't done a full review.

InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
go func() {
select {
case err := <-doneChan:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that for a one-shot feed, a closed doneChan wasn't an error case.

// Close is used externally to stop the DCP client.
func (dc *GoCBDCPClient) Close() {
dc.close()
return dc.getCloseError()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not care about reporting any potential close error, or is that being done elsewhere?

var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set")

type DCPClient struct {
type GoCBDCPClient struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, take it or leave it, but I think GocbDCPClient and GocbDCPClientOptions are more readable.

bucket, err := base.AsGocbV2Bucket(database.Bucket)
if err != nil {
return err
checkpoint := checkpointPrefix + ":" + feedPrefix
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Rosmar-specific handling - it seems to require some knowledge of the naming convention for rosmar DCP checkpoints. Seems like an unexpected place for rosmar-specific code.

Copy link
Member

@bbrks bbrks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the few comments I had from yesterday's review

FailOnRollback: false,
CollectionIDs: slices.Collect(maps.Keys(collections)),
MetadataStoreType: base.DCPMetadataStoreInMemory,
collectionNames.Add(dataStoreName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Add takes a variadic and Add itself is doing a reasonable amount of work to handle the de-dupe, it might make more sense to push this down outside of the loop?

for _, d := range ds {
if _, ok := c[d.ScopeName()]; !ok {
c[d.ScopeName()] = []string{}
} else if slices.Contains(c[d.ScopeName()], d.CollectionName()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the slices.Contains inside the ds iteration - especially thinking about in the future when we're considering increasing the collection limits.

It kind of feels like this would be better suited as a map[string]map[string]struct{} or similar to avoid the slice iterations at insertion time, and handle dedupe.


func getResyncDCPPrefix(resyncID string) string {
return fmt.Sprintf(
"sg-%v:resync:%v",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should decide on whether CBG-4286 means we should make this prefix sg: or one that includes a version. I would be inclined to say sg: and then group version numbers in the trailing part of the name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants