Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ const (

// VirtualXattrRevSeqNo is used to fetch rev seq no from documents virtual xattr
VirtualXattrRevSeqNo = "$document.revid"

// VirtualExpiry is used to fetch the expiry from documents
VirtualExpiry = "$document.exptime"

// VirtualDocumentXattr is used to fetch the documents virtual xattr
VirtualDocumentXattr = "$document"

Expand Down
11 changes: 8 additions & 3 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers

callback := func(event sgbucket.FeedEvent) bool {
docID := string(event.Key)
key := realDocID(docID)
base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID))

// Ignore documents without xattrs if possible, to avoid processing unnecessary documents
Expand All @@ -147,9 +146,14 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
databaseCollection := db.CollectionByID[event.CollectionID]
databaseCollection.collectionStats.ResyncNumProcessed.Add(1)
collectionCtx := databaseCollection.AddCollectionContext(ctx)
_, unusedSequences, err := (&DatabaseCollectionWithUser{
doc, err := bucketDocumentFromFeed(event)
if err != nil {
base.WarnfCtx(collectionCtx, "[%s] Error getting document from DCP event for doc %q: %v", resyncLoggingID, base.UD(docID), err)
return false
}
unusedSequences, err := (&DatabaseCollectionWithUser{
DatabaseCollection: databaseCollection,
}).resyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{})
}).resyncDocument(collectionCtx, docID, doc, regenerateSequences)

databaseCollection.releaseSequences(collectionCtx, unusedSequences)

Expand All @@ -159,6 +163,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
databaseCollection.collectionStats.ResyncNumChanged.Add(1)
} else if err != base.ErrUpdateCancel {
base.WarnfCtx(collectionCtx, "[%s] Error updating doc %q: %v", resyncLoggingID, base.UD(docID), err)
return false
}
return true
}
Expand Down
2 changes: 1 addition & 1 deletion db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = expandMacroCASValueUint64
case Import:
case Import, Resync:
// Do not update HLV if the current document version (cas) is already included in the existing HLV, as either:
// 1. _vv.cvCAS == document.cas (current mutation is already present as cv), or
// 2. _mou.cas == document.cas (current mutation is already present as cv, and was imported on a different cluster)
Expand Down
128 changes: 51 additions & 77 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
ExistingVersionLegacyRev
ExistingVersionWithUpdateToHLV
NoHLVUpdateForTest
Resync
)

type DocUpdateType uint32
Expand Down Expand Up @@ -1822,97 +1823,70 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
return doc, shouldUpdate, updatedExpiry, doc.Sequence, updatedUnusedSequences, nil
}

func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid string, previousDoc *sgbucket.BucketDocument, regenerateSequences bool) (updatedUnusedSequences []uint64, err error) {
var updatedDoc *Document
var shouldUpdate bool
var updatedExpiry *uint32
if db.UseXattrs() {
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
// There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing,
// so deleteDoc is always returned as false.
if currentValue == nil || len(currentValue) == 0 {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
if !shouldUpdate {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}
doc.SetCrc32cUserXattrHash()
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
// There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing,
// so deleteDoc is always returned as false.
if len(currentValue) == 0 {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
updatedDoc, shouldUpdate, updatedExpiry, _, updatedUnusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, nil)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
if !shouldUpdate {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}
doc.SetCrc32cUserXattrHash()

// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
if db.useMou() {
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
}
// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
mouMatch := false // after writing this document, mou.cas != doc.cas since otherwise shouldUpdate would be false
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment explains that mouMatch should be false, but doesn't clearly explain why shouldUpdate being false would make mou.cas == doc.cas. Consider clarifying that when shouldUpdate is false, no resync occurs and the function returns early via ErrUpdateCancel, so at this point shouldUpdate must be true.

Suggested change
mouMatch := false // after writing this document, mou.cas != doc.cas since otherwise shouldUpdate would be false
// At this point, shouldUpdate must be true, because if it were false, we would have already returned early via ErrUpdateCancel.
// Therefore, mou.cas != doc.cas, and mouMatch should be false.
mouMatch := false

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this comment confusing as after this update has been completed mou.cas should be == to doc cas right?

doc, err = db.updateHLV(ctx, doc, Resync, mouMatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to just pass in false here given mouMatch is always false?

if err != nil {
return sgbucket.UpdatedDoc{}, err
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawSyncXattr,
base.VvXattrName: rawVvXattr,
},
Expiry: updatedExpiry,
}
if db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
}
if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: This declaration here shadows the declaration on line 1827

Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawSyncXattr,
base.VvXattrName: rawVvXattr,
base.MouXattrName: rawMouXattr,
},
Expiry: updatedExpiry,
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
} else {
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
// Be careful: this block can be invoked multiple times if there are races!
if currentValue == nil {
return nil, nil, false, base.ErrUpdateCancel // someone deleted it?!
}
doc, err := unmarshalDocument(docid, currentValue)
if err != nil {
return nil, nil, false, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
if err != nil {
return nil, nil, false, err
}
if shouldUpdate {
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}

updatedBytes, marshalErr := base.JSONMarshal(updatedDoc)
return updatedBytes, updatedExpiry, false, marshalErr
} else {
return nil, nil, false, base.ErrUpdateCancel
}
})
if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, docid, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, previousDoc, opts, writeUpdateFunc)
if err == nil {
base.Audit(ctx, base.AuditIDDocumentResync, base.AuditFields{
base.AuditFieldDocID: docid,
base.AuditFieldDocVersion: updatedDoc.CVOrRevTreeID(),
})
}
return updatedHighSeq, unusedSequences, err
return updatedUnusedSequences, err
}

// invalidateAllPrincipals invalidates computed channels and roles for all users/roles, for the specified collections:
Expand Down
106 changes: 74 additions & 32 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3657,61 +3657,103 @@ func Test_invalidateAllPrincipalsCache(t *testing.T) {
}

func Test_resyncDocument(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("Walrus doesn't support xattr")
}
db, ctx := setupTestDB(t)
defer db.Close(ctx)

db.Options.EnableXattr = true
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

syncFn := `
testCases := []struct {
name string
useHLV bool
}{
{
name: "pre-4.0",
useHLV: false,
},
{
name: "has_hlv",
useHLV: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
startingSyncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value())
syncFn := `
function sync(doc, oldDoc){
channel("channel." + "ABC");
}
`
_, err := collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)
_, err := collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

docID := uuid.NewString()
docID := uuid.NewString()

updateBody := make(map[string]any)
updateBody["val"] = "value"
_, doc, err := collection.Put(ctx, docID, updateBody)
require.NoError(t, err)
assert.NotNil(t, doc)
updateBody := make(map[string]any)
updateBody["val"] = "value"
if tc.useHLV {
_, _, err := collection.Put(ctx, docID, updateBody)
require.NoError(t, err)
} else {
collection.CreateDocNoHLV(t, ctx, docID, updateBody)
}

syncFn = `
syncFn = `
function sync(doc, oldDoc){
channel("channel." + "ABC12332423234");
}
`
_, err = collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)
_, err = collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

_, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)
preResyncDoc, err := collection.GetDocument(ctx, docID, DocUnmarshalAll)
require.NoError(t, err)
if !tc.useHLV {
require.Nil(t, preResyncDoc.HLV)
}
doc, err := getBucketDocument(ctx, collection.DatabaseCollection, docID)
require.NoError(t, err)

syncData, err := collection.GetDocSyncData(ctx, docID)
assert.NoError(t, err)
_, err = collection.resyncDocument(ctx, docID, doc, false)
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)

assert.Len(t, syncData.ChannelSet, 2)
assert.Len(t, syncData.Channels, 2)
found := false
postResyncDoc, _, err := collection.getDocWithXattrs(ctx, docID, collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), DocUnmarshalAll)
assert.NoError(t, err)

for _, chSet := range syncData.ChannelSet {
if chSet.Name == "channel.ABC12332423234" {
found = true
break
}
}
assert.Len(t, postResyncDoc.ChannelSet, 2)
assert.Len(t, postResyncDoc.Channels, 2)
found := false

assert.True(t, found)
assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value()))
for _, chSet := range postResyncDoc.ChannelSet {
if chSet.Name == "channel.ABC12332423234" {
found = true
break
}
}
assert.True(t, found)

require.NoError(t, err)
require.NotNil(t, postResyncDoc.HLV)
require.Equal(t, Version{
SourceID: db.EncodedSourceID,
Value: preResyncDoc.Cas,
}, Version{
SourceID: postResyncDoc.HLV.SourceID,
Value: postResyncDoc.HLV.Version,
})
require.NotNil(t, postResyncDoc.MetadataOnlyUpdate)
require.Equal(t, MetadataOnlyUpdate{
HexCAS: base.CasToString(postResyncDoc.Cas),
PreviousHexCAS: base.CasToString(preResyncDoc.Cas),
PreviousRevSeqNo: preResyncDoc.RevSeqNo,
}, *postResyncDoc.MetadataOnlyUpdate)
assert.Equal(t, preResyncDoc.Cas, postResyncDoc.HLV.CurrentVersionCAS)
assert.Equal(t, startingSyncFnCount+2, int(db.DbStats.Database().SyncFunctionCount.Value()))
})
}
}

func Test_getUpdatedDocument(t *testing.T) {
Expand Down
Loading
Loading