From b98dc00b13791e74a76ca0ba975dd66af1339520 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 19 Nov 2025 15:58:43 -0500 Subject: [PATCH 1/6] Resync: make sure to create HLV if non exists --- db/database.go | 4 ++ db/database_test.go | 104 ++++++++++++++++++++++++++++++-------------- 2 files changed, 75 insertions(+), 33 deletions(-) diff --git a/db/database.go b/db/database.go index be32d2aff3..68220ad126 100644 --- a/db/database.go +++ b/db/database.go @@ -1854,6 +1854,10 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, if db.useMou() { doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate) } + doc, err = db.updateHLV(ctx, doc, Import, false) + if err != nil { + return sgbucket.UpdatedDoc{}, err + } _, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs() updatedDoc := sgbucket.UpdatedDoc{ diff --git a/db/database_test.go b/db/database_test.go index cc10be8878..9226829c0e 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3657,61 +3657,99 @@ func Test_invalidateAllPrincipalsCache(t *testing.T) { } func Test_resyncDocument(t *testing.T) { - if !base.TestUseXattrs() { - t.Skip("Walrus doesn't support xattr") - } db, ctx := setupTestDB(t) defer db.Close(ctx) db.Options.EnableXattr = true collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - syncFn := ` + testCases := []struct { + name string + useHLV bool + }{ + { + name: "pre-4.0", + useHLV: true, + }, + { + name: "has_hlv", + useHLV: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + startingSyncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value()) + syncFn := ` function sync(doc, oldDoc){ channel("channel." + "ABC"); } ` - _, err := collection.UpdateSyncFun(ctx, syncFn) - require.NoError(t, err) + _, err := collection.UpdateSyncFun(ctx, syncFn) + require.NoError(t, err) - docID := uuid.NewString() + docID := uuid.NewString() - updateBody := make(map[string]any) - updateBody["val"] = "value" - _, doc, err := collection.Put(ctx, docID, updateBody) - require.NoError(t, err) - assert.NotNil(t, doc) + updateBody := make(map[string]any) + updateBody["val"] = "value" + if tc.useHLV { + _, _, err := collection.Put(ctx, docID, updateBody) + require.NoError(t, err) + } else { + collection.CreateDocNoHLV(t, ctx, docID, updateBody) + } - syncFn = ` + syncFn = ` function sync(doc, oldDoc){ channel("channel." + "ABC12332423234"); } ` - _, err = collection.UpdateSyncFun(ctx, syncFn) - require.NoError(t, err) - - _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) - require.NoError(t, err) - err = collection.WaitForPendingChanges(ctx) - require.NoError(t, err) + _, err = collection.UpdateSyncFun(ctx, syncFn) + require.NoError(t, err) - syncData, err := collection.GetDocSyncData(ctx, docID) - assert.NoError(t, err) + preResyncDoc, err := collection.GetDocument(ctx, docID, DocUnmarshalAll) + require.NoError(t, err) + if !tc.useHLV { + require.Nil(t, preResyncDoc.HLV) + } + _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) + require.NoError(t, err) + err = collection.WaitForPendingChanges(ctx) + require.NoError(t, err) - assert.Len(t, syncData.ChannelSet, 2) - assert.Len(t, syncData.Channels, 2) - found := false + postResyncDoc, _, err := collection.getDocWithXattrs(ctx, docID, collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), DocUnmarshalAll) + assert.NoError(t, err) - for _, chSet := range syncData.ChannelSet { - if chSet.Name == "channel.ABC12332423234" { - found = true - break - } - } + assert.Len(t, postResyncDoc.ChannelSet, 2) + assert.Len(t, postResyncDoc.Channels, 2) + found := false - assert.True(t, found) - assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value())) + for _, chSet := range postResyncDoc.ChannelSet { + if chSet.Name == "channel.ABC12332423234" { + found = true + break + } + } + assert.True(t, found) + require.NoError(t, err) + require.NotNil(t, postResyncDoc.HLV) + require.Equal(t, Version{ + SourceID: db.EncodedSourceID, + Value: preResyncDoc.Cas, + }, Version{ + SourceID: postResyncDoc.HLV.SourceID, + Value: postResyncDoc.HLV.Version, + }) + require.NotNil(t, postResyncDoc.MetadataOnlyUpdate) + require.Equal(t, MetadataOnlyUpdate{ + HexCAS: base.CasToString(postResyncDoc.Cas), + PreviousHexCAS: base.CasToString(preResyncDoc.Cas), + PreviousRevSeqNo: preResyncDoc.RevSeqNo, + }, *postResyncDoc.MetadataOnlyUpdate) + assert.Equal(t, startingSyncFnCount+2, int(db.DbStats.Database().SyncFunctionCount.Value())) + }) + } } func Test_getUpdatedDocument(t *testing.T) { From eb809bf36ded144a008de3741c629a11a999d827 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 19 Nov 2025 17:11:47 -0500 Subject: [PATCH 2/6] CBG-3690 don't re-read document for resync --- base/constants.go | 4 + db/background_mgr_resync_dcp.go | 11 ++- db/crud.go | 2 +- db/database.go | 132 ++++++++++++-------------------- db/database_test.go | 10 ++- db/document.go | 54 +++++++++---- db/document_test.go | 31 ++++++++ 7 files changed, 141 insertions(+), 103 deletions(-) diff --git a/base/constants.go b/base/constants.go index 22af3b5750..4eb155d1f0 100644 --- a/base/constants.go +++ b/base/constants.go @@ -133,6 +133,10 @@ const ( // VirtualXattrRevSeqNo is used to fetch rev seq no from documents virtual xattr VirtualXattrRevSeqNo = "$document.revid" + + // VirtualExpiry is used to fetch the expiry from documents + VirtualExpiry = "$document.exptime" + // VirtualDocumentXattr is used to fetch the documents virtual xattr VirtualDocumentXattr = "$document" diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 9f1ed723cd..6948a8f9ad 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -124,7 +124,6 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers callback := func(event sgbucket.FeedEvent) bool { docID := string(event.Key) - key := realDocID(docID) base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID)) // Ignore documents without xattrs if possible, to avoid processing unnecessary documents @@ -147,9 +146,14 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers databaseCollection := db.CollectionByID[event.CollectionID] databaseCollection.collectionStats.ResyncNumProcessed.Add(1) collectionCtx := databaseCollection.AddCollectionContext(ctx) - _, unusedSequences, err := (&DatabaseCollectionWithUser{ + doc, err := bucketDocumentFromFeed(event) + if err != nil { + base.WarnfCtx(collectionCtx, "[%s] Error getting document from DCP event for doc %q: %v", resyncLoggingID, base.UD(docID), err) + return false + } + unusedSequences, err := (&DatabaseCollectionWithUser{ DatabaseCollection: databaseCollection, - }).resyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{}) + }).resyncDocument(collectionCtx, docID, doc, regenerateSequences) databaseCollection.releaseSequences(collectionCtx, unusedSequences) @@ -159,6 +163,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers databaseCollection.collectionStats.ResyncNumChanged.Add(1) } else if err != base.ErrUpdateCancel { base.WarnfCtx(collectionCtx, "[%s] Error updating doc %q: %v", resyncLoggingID, base.UD(docID), err) + return false } return true } diff --git a/db/crud.go b/db/crud.go index 9890cfebca..e889abe3b4 100644 --- a/db/crud.go +++ b/db/crud.go @@ -983,7 +983,7 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document case ExistingVersion: // preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed d.HLV.CurrentVersionCAS = expandMacroCASValueUint64 - case Import: + case Import, Resync: // Do not update HLV if the current document version (cas) is already included in the existing HLV, as either: // 1. _vv.cvCAS == document.cas (current mutation is already present as cv), or // 2. _mou.cas == document.cas (current mutation is already present as cv, and was imported on a different cluster) diff --git a/db/database.go b/db/database.go index 68220ad126..195a0b1cb3 100644 --- a/db/database.go +++ b/db/database.go @@ -57,6 +57,7 @@ const ( ExistingVersionLegacyRev ExistingVersionWithUpdateToHLV NoHLVUpdateForTest + Resync ) type DocUpdateType uint32 @@ -1822,101 +1823,70 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d return doc, shouldUpdate, updatedExpiry, doc.Sequence, updatedUnusedSequences, nil } -func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) { +func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid string, previousDoc *sgbucket.BucketDocument, regenerateSequences bool) (updatedUnusedSequences []uint64, err error) { var updatedDoc *Document var shouldUpdate bool var updatedExpiry *uint32 - if db.UseXattrs() { - writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { - // There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing, - // so deleteDoc is always returned as false. - if currentValue == nil || len(currentValue) == 0 { - return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel - } - doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll) - if err != nil { - return sgbucket.UpdatedDoc{}, err - } - updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences) - if err != nil { - return sgbucket.UpdatedDoc{}, err - } - if !shouldUpdate { - return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel - } - base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid)) - if updatedExpiry != nil { - updatedDoc.UpdateExpiry(*updatedExpiry) - } - doc.SetCrc32cUserXattrHash() + writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + // There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing, + // so deleteDoc is always returned as false. + if len(currentValue) == 0 { + return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel + } + doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll) + if err != nil { + return sgbucket.UpdatedDoc{}, err + } + updatedDoc, shouldUpdate, updatedExpiry, _, updatedUnusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, nil) + if err != nil { + return sgbucket.UpdatedDoc{}, err + } + if !shouldUpdate { + return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel + } + base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid)) + if updatedExpiry != nil { + updatedDoc.UpdateExpiry(*updatedExpiry) + } + doc.SetCrc32cUserXattrHash() - // Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate - if db.useMou() { - doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate) - } - doc, err = db.updateHLV(ctx, doc, Import, false) - if err != nil { - return sgbucket.UpdatedDoc{}, err - } + // Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate + doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate) + mouMatch := false // after writing this document, mou.cas != doc.cas since otherwise shouldUpdate would be false + doc, err = db.updateHLV(ctx, doc, Resync, mouMatch) + if err != nil { + return sgbucket.UpdatedDoc{}, err + } - _, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs() - updatedDoc := sgbucket.UpdatedDoc{ - Doc: nil, // Resync does not require document body update - Xattrs: map[string][]byte{ - base.SyncXattrName: rawSyncXattr, - base.VvXattrName: rawVvXattr, - }, - Expiry: updatedExpiry, - } - if db.useMou() { - updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr - if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString { - updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) - } - } - if rawGlobalXattr != nil { - updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr - } - return updatedDoc, err + _, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs() + updatedDoc := sgbucket.UpdatedDoc{ + Doc: nil, // Resync does not require document body update + Xattrs: map[string][]byte{ + base.SyncXattrName: rawSyncXattr, + base.VvXattrName: rawVvXattr, + base.MouXattrName: rawMouXattr, + }, + Expiry: updatedExpiry, } - opts := &sgbucket.MutateInOptions{ - MacroExpansion: macroExpandSpec(base.SyncXattrName), + if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString { + updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) } - _, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc) - } else { - _, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) { - // Be careful: this block can be invoked multiple times if there are races! - if currentValue == nil { - return nil, nil, false, base.ErrUpdateCancel // someone deleted it?! - } - doc, err := unmarshalDocument(docid, currentValue) - if err != nil { - return nil, nil, false, err - } - updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences) - if err != nil { - return nil, nil, false, err - } - if shouldUpdate { - base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid)) - if updatedExpiry != nil { - updatedDoc.UpdateExpiry(*updatedExpiry) - } - - updatedBytes, marshalErr := base.JSONMarshal(updatedDoc) - return updatedBytes, updatedExpiry, false, marshalErr - } else { - return nil, nil, false, base.ErrUpdateCancel - } - }) + if rawGlobalXattr != nil { + updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr + } + return updatedDoc, err + } + opts := &sgbucket.MutateInOptions{ + MacroExpansion: macroExpandSpec(base.SyncXattrName), } + _, err = db.dataStore.WriteUpdateWithXattrs(ctx, docid, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, previousDoc, opts, writeUpdateFunc) if err == nil { base.Audit(ctx, base.AuditIDDocumentResync, base.AuditFields{ base.AuditFieldDocID: docid, base.AuditFieldDocVersion: updatedDoc.CVOrRevTreeID(), }) } - return updatedHighSeq, unusedSequences, err + return updatedUnusedSequences, err } // invalidateAllPrincipals invalidates computed channels and roles for all users/roles, for the specified collections: diff --git a/db/database_test.go b/db/database_test.go index 9226829c0e..5a4182c704 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3669,11 +3669,11 @@ func Test_resyncDocument(t *testing.T) { }{ { name: "pre-4.0", - useHLV: true, + useHLV: false, }, { name: "has_hlv", - useHLV: false, + useHLV: true, }, } @@ -3712,7 +3712,10 @@ func Test_resyncDocument(t *testing.T) { if !tc.useHLV { require.Nil(t, preResyncDoc.HLV) } - _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) + doc, err := getBucketDocument(ctx, collection.DatabaseCollection, docID) + require.NoError(t, err) + + _, err = collection.resyncDocument(ctx, docID, doc, false) require.NoError(t, err) err = collection.WaitForPendingChanges(ctx) require.NoError(t, err) @@ -3747,6 +3750,7 @@ func Test_resyncDocument(t *testing.T) { PreviousHexCAS: base.CasToString(preResyncDoc.Cas), PreviousRevSeqNo: preResyncDoc.RevSeqNo, }, *postResyncDoc.MetadataOnlyUpdate) + assert.Equal(t, preResyncDoc.Cas, postResyncDoc.HLV.CurrentVersionCAS) assert.Equal(t, startingSyncFnCount+2, int(db.DbStats.Database().SyncFunctionCount.Value())) }) } diff --git a/db/document.go b/db/document.go index cc0e88ab00..79c0bf47a4 100644 --- a/db/document.go +++ b/db/document.go @@ -612,21 +612,6 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey return rawDoc, syncData, nil } -func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) { - if dataType&base.MemcachedDataTypeXattr == 0 { - return unmarshalDocument(docid, data) - } - xattrKeys := []string{base.SyncXattrName} - if userXattrKey != "" { - xattrKeys = append(xattrKeys, userXattrKey) - } - body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data) - if err != nil { - return nil, err - } - return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll) -} - func (doc *SyncData) HasValidSyncData() bool { valid := doc != nil && doc.GetRevTreeID() != "" && (doc.Sequence > 0) @@ -1578,3 +1563,42 @@ func (d DocVersion) CVOrRevTreeID() string { } return d.RevTreeID } + +// bucketDocumentFromFeed converts a sgbucket.FeedEvent to a sgbucket.BucketDocument +func bucketDocumentFromFeed(event sgbucket.FeedEvent) (*sgbucket.BucketDocument, error) { + body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value) + if err != nil { + return nil, err + } + return &sgbucket.BucketDocument{ + Body: body, + Xattrs: xattrs, + Cas: event.Cas, + Expiry: event.Expiry, + IsTombstone: event.Opcode == sgbucket.FeedOpDeletion, + }, nil +} + +func getBucketDocument(ctx context.Context, collection *DatabaseCollection, docID string) (*sgbucket.BucketDocument, error) { + xattrNames := append(collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), base.VirtualExpiry) + body, xattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, docID, xattrNames) + if err != nil { + return nil, err + } + var expiry uint32 + if expiryBytes, ok := xattrs[base.VirtualExpiry]; ok { + err := base.JSONUnmarshal(expiryBytes, &expiry) + if err != nil { + return nil, fmt.Errorf("Failed to unmarshal virtual expiry xattr for key %v, ignoring virtual expiry. Err: %v", base.UD(docID), err) + } + delete(xattrs, base.VirtualExpiry) + } + + return &sgbucket.BucketDocument{ + Body: body, + Xattrs: xattrs, + Cas: cas, + Expiry: expiry, + IsTombstone: len(body) == 0, + }, nil +} diff --git a/db/document_test.go b/db/document_test.go index 11d7c6dd9a..efde4c62a5 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -16,6 +16,7 @@ import ( "log" "reflect" "testing" + "time" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" @@ -772,3 +773,33 @@ func TestAlignRevTreeHistory(t *testing.T) { }) } } + +func TestGetBucketDocument(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + // use 120 days so expiry is actually a unix time + exp := uint32(time.Now().Add(24 * 120 * time.Hour).Unix()) + body := []byte(`{"value":"test"}`) + xattrs := map[string][]byte{ + base.SyncXattrName: []byte(`{"rev":"1-abc","sequence":1}`), + } + cas, err := collection.dataStore.WriteWithXattrs(ctx, "doc1", exp, 0, body, xattrs, nil, nil) + require.NoError(t, err) + + doc, err := getBucketDocument(ctx, collection.DatabaseCollection, "doc1") + require.NoError(t, err) + + expectedDoc := sgbucket.BucketDocument{ + Body: body, + Xattrs: xattrs, + Cas: cas, + Expiry: exp, + } + if base.UnitTestUrlIsWalrus() { + expectedDoc.Expiry = 0 // Walrus doesn't support expiry via virtual xattr yet + } + require.Equal(t, expectedDoc, *doc) +} From b417eba9dce6ca8d606e57958c637cedab672281 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 20 Nov 2025 09:09:13 -0500 Subject: [PATCH 3/6] Update comments, delete unnecessary line --- db/background_mgr_resync_dcp.go | 3 ++- db/database.go | 4 ++-- db/database_test.go | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 6948a8f9ad..c9d846f72b 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -131,7 +131,8 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers return true } // Don't want to process raw binary docs - // The binary check should suffice but for additional safety also check for empty bodies + // The binary check should suffice but for additional safety also check for empty bodies. This will also avoid + // processing tombstones. if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 { return true } diff --git a/db/database.go b/db/database.go index 195a0b1cb3..c91ed9fa6f 100644 --- a/db/database.go +++ b/db/database.go @@ -1828,8 +1828,8 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid var shouldUpdate bool var updatedExpiry *uint32 writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { - // There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing, - // so deleteDoc is always returned as false. + // resyncDocument is not called on tombstoned documents, so this value will only be empty if the document was + // deleted between DCP event and calling this function. In any case, we do not need to update it. if len(currentValue) == 0 { return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel } diff --git a/db/database_test.go b/db/database_test.go index 5a4182c704..b583359c41 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3735,7 +3735,6 @@ func Test_resyncDocument(t *testing.T) { } assert.True(t, found) - require.NoError(t, err) require.NotNil(t, postResyncDoc.HLV) require.Equal(t, Version{ SourceID: db.EncodedSourceID, From 798488e5513ac2176c9e2229324006fc2a4a2835 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 11:16:11 -0500 Subject: [PATCH 4/6] simplify mou handling --- db/crud.go | 41 +++++++++++++++++++++++---------------- db/database.go | 8 ++------ db/database_collection.go | 13 ++++++++++++- db/database_test.go | 3 +++ db/document.go | 19 +++++++++++++----- db/document_test.go | 2 ++ db/import.go | 20 ++++++++++++------- db/import_test.go | 9 ++------- 8 files changed, 72 insertions(+), 43 deletions(-) diff --git a/db/crud.go b/db/crud.go index e889abe3b4..b85a0aed0a 100644 --- a/db/crud.go +++ b/db/crud.go @@ -969,10 +969,11 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context return nil } -// updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering. mouMatch represents if the _mou.cas == doc.cas -func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document, docUpdateEvent DocUpdateType, mouMatch bool) (*Document, error) { +// updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering. +func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document, docUpdateEvent DocUpdateType) (*Document, error) { hasHLV := d.HLV != nil + if d.HLV == nil { d.HLV = &HybridLogicalVector{} base.DebugfCtx(ctx, base.KeyVV, "No existing HLV for doc %s", base.UD(d.ID)) @@ -983,7 +984,9 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document case ExistingVersion: // preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed d.HLV.CurrentVersionCAS = expandMacroCASValueUint64 - case Import, Resync: + case MetadataOnly: + mouMatch := d.MetadataOnlyUpdate != nil && d.MetadataOnlyUpdate.CAS() == d.Cas + d.MetadataOnlyUpdate = computeMetadataOnlyUpdate(ctx, d) // Do not update HLV if the current document version (cas) is already included in the existing HLV, as either: // 1. _vv.cvCAS == document.cas (current mutation is already present as cv), or // 2. _mou.cas == document.cas (current mutation is already present as cv, and was imported on a different cluster) @@ -1023,6 +1026,7 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document } // update the cvCAS on the SGWrite event too d.HLV.CurrentVersionCAS = expandMacroCASValueUint64 + d.MetadataOnlyUpdate = nil case ExistingVersionLegacyRev: revTreeEncodedCV, err := LegacyRevToRevTreeEncodedVersion(d.GetRevTreeID()) if err != nil { @@ -1034,6 +1038,7 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document } // update the cvCAS on the SGWrite event too d.HLV.CurrentVersionCAS = expandMacroCASValueUint64 + d.MetadataOnlyUpdate = nil case NoHLVUpdateForTest: // no hlv update event for testing purposes only (used to simulate pre upgraded write) return d, nil @@ -2242,7 +2247,6 @@ func (db *DatabaseCollectionWithUser) storeOldBodyInRevTreeAndUpdateCurrent(ctx // Store the new revision body into the doc: doc.setRevisionBody(ctx, newRevID, newDoc, db.AllowExternalRevBodyStorage(), newDocHasAttachments) doc.SetAttachments(newDoc.Attachments()) - doc.MetadataOnlyUpdate = newDoc.MetadataOnlyUpdate if doc.GetRevTreeID() == newRevID { doc.NewestRev = "" @@ -2539,14 +2543,6 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( return } - // compute mouMatch before the callback modifies doc.MetadataOnlyUpdate - mouMatch := false - if doc.MetadataOnlyUpdate != nil && doc.MetadataOnlyUpdate.CAS() == doc.Cas { - mouMatch = doc.MetadataOnlyUpdate.CAS() == doc.Cas - base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): _mou:%+v Metadata-only update match:%t", base.UD(doc.ID), doc.MetadataOnlyUpdate, mouMatch) - } else { - base.DebugfCtx(ctx, base.KeyVV, "updateDoc(%q): has no _mou", base.UD(doc.ID)) - } // Invoke the callback to update the document and with a new revision body to be used by the Sync Function: newDoc, newAttachments, createNewRevIDSkipped, updatedExpiry, err := callback(doc) if err != nil { @@ -2574,8 +2570,13 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( } isWinningRev := doc.GetRevTreeID() == newRevID - if len(channelSet) > 0 && !isWinningRev { - doc.History[newRevID].Channels = channelSet + if !isWinningRev { + if len(channelSet) > 0 { + doc.History[newRevID].Channels = channelSet + } + if docUpdateEvent == MetadataOnly { + docUpdateEvent = NewVersion + } } if newAttachments != nil { @@ -2610,7 +2611,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( // The callback has updated the HLV for mutations coming from CBL. Update the HLV so that the current version is set before // we call updateChannels, which needs to set the current version for removals // update the HLV values - doc, err = col.updateHLV(ctx, doc, docUpdateEvent, mouMatch) + doc, err = col.updateHLV(ctx, doc, docUpdateEvent) if err != nil { return } @@ -2702,7 +2703,13 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do if expiry != nil { initialExpiry = *expiry } - casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) { + var xattrKeys []string + if isImport { + xattrKeys = db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys() + } else { + xattrKeys = db.syncGlobalSyncMouAndUserXattrKeys() + } + casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, xattrKeys, initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) { // Be careful: this block can be invoked multiple times if there are races! if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil { return @@ -2773,7 +2780,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr, base.VvXattrName: rawVvXattr} - if rawMouXattr != nil && db.useMou() { + if rawMouXattr != nil { updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr } if rawGlobalSync != nil { diff --git a/db/database.go b/db/database.go index c91ed9fa6f..ba8ca3867e 100644 --- a/db/database.go +++ b/db/database.go @@ -51,13 +51,12 @@ const ( ) const ( - Import DocUpdateType = iota + MetadataOnly DocUpdateType = iota NewVersion ExistingVersion ExistingVersionLegacyRev ExistingVersionWithUpdateToHLV NoHLVUpdateForTest - Resync ) type DocUpdateType uint32 @@ -1850,10 +1849,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid } doc.SetCrc32cUserXattrHash() - // Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate - doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate) - mouMatch := false // after writing this document, mou.cas != doc.cas since otherwise shouldUpdate would be false - doc, err = db.updateHLV(ctx, doc, Resync, mouMatch) + doc, err = db.updateHLV(ctx, doc, MetadataOnly) if err != nil { return sgbucket.UpdatedDoc{}, err } diff --git a/db/database_collection.go b/db/database_collection.go index daa77de851..a6b3ff275b 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -323,9 +323,20 @@ func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string { return xattrKeys } +// syncGlobalSyncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs. +func (c *DatabaseCollection) syncGlobalSyncMouAndUserXattrKeys() []string { + xattrKeys := []string{base.SyncXattrName, base.VvXattrName, + base.MouXattrName, base.GlobalXattrName} + userXattrKey := c.userXattrKey() + if userXattrKey != "" { + xattrKeys = append(xattrKeys, userXattrKey) + } + return xattrKeys +} + // syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs. func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string { - xattrKeys := []string{base.SyncXattrName, base.VvXattrName} + xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.VirtualXattrRevSeqNo} if c.useMou() { xattrKeys = append(xattrKeys, base.MouXattrName, base.GlobalXattrName) } diff --git a/db/database_test.go b/db/database_test.go index b583359c41..0138235652 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3714,6 +3714,9 @@ func Test_resyncDocument(t *testing.T) { } doc, err := getBucketDocument(ctx, collection.DatabaseCollection, docID) require.NoError(t, err) + revSeqNo, err := unmarshalRevSeqNo(doc.Xattrs[base.VirtualXattrRevSeqNo]) + require.NoError(t, err) + preResyncDoc.RevSeqNo = revSeqNo _, err = collection.resyncDocument(ctx, docID, doc, false) require.NoError(t, err) diff --git a/db/document.go b/db/document.go index 79c0bf47a4..e844eaeb3d 100644 --- a/db/document.go +++ b/db/document.go @@ -1428,19 +1428,24 @@ func (doc *Document) channelsForRevTreeID(revTreeID string) (base.Set, bool) { } // computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate -func computeMetadataOnlyUpdate(currentCas uint64, revNo uint64, currentMou *MetadataOnlyUpdate) *MetadataOnlyUpdate { +func computeMetadataOnlyUpdate(ctx context.Context, doc *Document) *MetadataOnlyUpdate { var prevCas string - currentCasString := base.CasToString(currentCas) - if currentMou != nil && currentCasString == currentMou.HexCAS { - prevCas = currentMou.PreviousHexCAS + currentCasString := base.CasToString(doc.Cas) + if doc.MetadataOnlyUpdate != nil && currentCasString == doc.MetadataOnlyUpdate.HexCAS { + prevCas = doc.MetadataOnlyUpdate.PreviousHexCAS } else { prevCas = currentCasString } + // if this value is zero, then the document was not fetched with $document.revid set + if doc.RevSeqNo == 0 { + base.AssertfCtx(ctx, "revSeqNo must be non-zero when computing MetadataOnlyUpdate for %d, not writing _mou for this document", base.UD(doc.ID)) + return nil + } metadataOnlyUpdate := &MetadataOnlyUpdate{ HexCAS: expandMacroCASValueString, // when non-empty, this is replaced with cas macro expansion PreviousHexCAS: prevCas, - PreviousRevSeqNo: revNo, + PreviousRevSeqNo: doc.RevSeqNo, } return metadataOnlyUpdate } @@ -1488,6 +1493,10 @@ func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) { return revSeqNo, nil } +func marshalRevSeqNo(revSeqNo uint64) []byte { + return []byte(strconv.FormatUint(revSeqNo, 10)) +} + func (doc *Document) ExtractDocVersion() DocVersion { return DocVersion{ RevTreeID: doc.GetRevTreeID(), diff --git a/db/document_test.go b/db/document_test.go index efde4c62a5..69e4ded360 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -801,5 +801,7 @@ func TestGetBucketDocument(t *testing.T) { if base.UnitTestUrlIsWalrus() { expectedDoc.Expiry = 0 // Walrus doesn't support expiry via virtual xattr yet } + require.Contains(t, doc.Xattrs, base.VirtualXattrRevSeqNo) + delete(doc.Xattrs, base.VirtualXattrRevSeqNo) // remove rev seq no for comparison require.Equal(t, expectedDoc, *doc) } diff --git a/db/import.go b/db/import.go index a1501c0d47..15e83fee5f 100644 --- a/db/import.go +++ b/db/import.go @@ -58,6 +58,10 @@ func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid st } delete(body, BodyPurged) } + if xattrs == nil { + xattrs = make(map[string][]byte) + } + xattrs[base.VirtualXattrRevSeqNo] = []byte(fmt.Sprintf("\"%d\"", importOpts.revSeqNo)) existingBucketDoc := &sgbucket.BucketDocument{ Body: value, @@ -111,6 +115,12 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin return nil, err } + // put _mou on the xattrs to be able to find it later + if existingBucketDoc.Xattrs == nil { + existingBucketDoc.Xattrs = make(map[string][]byte) + } + existingBucketDoc.Xattrs[base.VirtualXattrRevSeqNo] = []byte(fmt.Sprintf("\"%d\"", importOpts.revSeqNo)) + return db.importDoc(ctx, docid, existingDoc.Body(ctx), importOpts.expiry, importOpts.isDelete, importOpts.revSeqNo, existingBucketDoc, importOpts.mode) } @@ -159,7 +169,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin existingDoc.Expiry = *expiry } - docUpdateEvent := Import + docUpdateEvent := MetadataOnly // do not update rev cache for any imports (CBG-4494 and CBG-4550) const updateRevCache = false docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, expiry, mutationOptions, docUpdateEvent, existingDoc, true, updateRevCache, func(doc *Document) (resultDocument *Document, resultAttachmentData updatedAttachments, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { @@ -202,9 +212,10 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin return nil, nil, false, nil, err } } + } else { + } - metadataOnlyUpdate := true // If the existing doc is a legacy SG write (_sync in body), check for migrate instead of import. _, ok := body[base.SyncPropertyName] if ok || doc.inlineSyncData { @@ -336,11 +347,6 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin newDoc.SetAttachments(doc.Attachments()) } - // If this is a metadata-only update, set metadataOnlyUpdate based on old doc's cas and mou - if metadataOnlyUpdate && db.useMou() { - newDoc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, revNo, doc.MetadataOnlyUpdate) - } - return newDoc, nil, !shouldGenerateNewRev, updatedExpiry, nil }) diff --git a/db/import_test.go b/db/import_test.go index f1e63f27f0..888262cfc5 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -477,14 +477,9 @@ func TestImportWithStaleBucketDocCorrectExpiry(t *testing.T) { assert.NoError(t, err, "Error writing doc w/ expiry") // Get the existing bucket doc - _, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll) - assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err)) - - body = Body{} - err = body.Unmarshal(existingBucketDoc.Body) - assert.NoError(t, err, "Error unmarshalling body") + existingBucketDoc, err := getBucketDocument(ctx, collection.DatabaseCollection, key) + require.NoError(t, err) - // Set the expiry value syncMetaExpiryUnix := syncMetaExpiry.Unix() expiry := uint32(syncMetaExpiryUnix) From 3ce0f72919039784430ee0a1f817448aeb87cc4b Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 11:45:50 -0500 Subject: [PATCH 5/6] clean lint errors --- db/import.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/db/import.go b/db/import.go index 15e83fee5f..3eb10a8643 100644 --- a/db/import.go +++ b/db/import.go @@ -61,7 +61,7 @@ func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid st if xattrs == nil { xattrs = make(map[string][]byte) } - xattrs[base.VirtualXattrRevSeqNo] = []byte(fmt.Sprintf("\"%d\"", importOpts.revSeqNo)) + xattrs[base.VirtualXattrRevSeqNo] = marshalRevSeqNo(importOpts.revSeqNo) existingBucketDoc := &sgbucket.BucketDocument{ Body: value, @@ -115,11 +115,10 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin return nil, err } - // put _mou on the xattrs to be able to find it later if existingBucketDoc.Xattrs == nil { existingBucketDoc.Xattrs = make(map[string][]byte) } - existingBucketDoc.Xattrs[base.VirtualXattrRevSeqNo] = []byte(fmt.Sprintf("\"%d\"", importOpts.revSeqNo)) + existingBucketDoc.Xattrs[base.VirtualXattrRevSeqNo] = marshalRevSeqNo(importOpts.revSeqNo) return db.importDoc(ctx, docid, existingDoc.Body(ctx), importOpts.expiry, importOpts.isDelete, importOpts.revSeqNo, existingBucketDoc, importOpts.mode) } @@ -212,8 +211,6 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin return nil, nil, false, nil, err } } - } else { - } // If the existing doc is a legacy SG write (_sync in body), check for migrate instead of import. From 50814395f9283a6f3693311b4a9fd3f9ecfa9666 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 13:56:40 -0500 Subject: [PATCH 6/6] fix marshal --- db/document.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/document.go b/db/document.go index e844eaeb3d..d50c848c29 100644 --- a/db/document.go +++ b/db/document.go @@ -1493,8 +1493,9 @@ func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) { return revSeqNo, nil } +// marshalRevSeqNo converts revSeqNo into the format of $document.revid func marshalRevSeqNo(revSeqNo uint64) []byte { - return []byte(strconv.FormatUint(revSeqNo, 10)) + return []byte(fmt.Sprintf(`"%d"`, revSeqNo)) } func (doc *Document) ExtractDocVersion() DocVersion {