Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ const (

// VirtualXattrRevSeqNo is used to fetch rev seq no from documents virtual xattr
VirtualXattrRevSeqNo = "$document.revid"

// VirtualExpiry is used to fetch the expiry from documents
VirtualExpiry = "$document.exptime"

// VirtualDocumentXattr is used to fetch the documents virtual xattr
VirtualDocumentXattr = "$document"

Expand Down
14 changes: 10 additions & 4 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers

callback := func(event sgbucket.FeedEvent) bool {
docID := string(event.Key)
key := realDocID(docID)
base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID))

// Ignore documents without xattrs if possible, to avoid processing unnecessary documents
if r.useXattrs && event.DataType&base.MemcachedDataTypeXattr == 0 {
return true
}
// Don't want to process raw binary docs
// The binary check should suffice but for additional safety also check for empty bodies
// The binary check should suffice but for additional safety also check for empty bodies. This will also avoid
// processing tombstones.
if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 {
return true
}
Expand All @@ -147,9 +147,14 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
databaseCollection := db.CollectionByID[event.CollectionID]
databaseCollection.collectionStats.ResyncNumProcessed.Add(1)
collectionCtx := databaseCollection.AddCollectionContext(ctx)
_, unusedSequences, err := (&DatabaseCollectionWithUser{
doc, err := bucketDocumentFromFeed(event)
if err != nil {
base.WarnfCtx(collectionCtx, "[%s] Error getting document from DCP event for doc %q: %v", resyncLoggingID, base.UD(docID), err)
return false
}
unusedSequences, err := (&DatabaseCollectionWithUser{
DatabaseCollection: databaseCollection,
}).ResyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{})
}).ResyncDocument(collectionCtx, docID, doc, regenerateSequences)

databaseCollection.releaseSequences(collectionCtx, unusedSequences)

Expand All @@ -159,6 +164,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
databaseCollection.collectionStats.ResyncNumChanged.Add(1)
} else if err != base.ErrUpdateCancel {
base.WarnfCtx(collectionCtx, "[%s] Error updating doc %q: %v", resyncLoggingID, base.UD(docID), err)
return false
}
return true
}
Expand Down
2 changes: 1 addition & 1 deletion db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2724,7 +2724,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if expiry != nil {
initialExpiry = *expiry
}
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
// Be careful: this block can be invoked multiple times if there are races!
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
Expand Down
123 changes: 47 additions & 76 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,96 +1824,67 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d

// ResyncDocument will re-run the sync function on the document and write an updated version to the bucket. If
// the sync function doesn't change any channels or access grants, no write will be performed.
func (db *DatabaseCollectionWithUser) ResyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
func (db *DatabaseCollectionWithUser) ResyncDocument(ctx context.Context, docid string, previousDoc *sgbucket.BucketDocument, regenerateSequences bool) (updatedUnusedSequences []uint64, err error) {
var updatedDoc *Document
var shouldUpdate bool
var updatedExpiry *uint32
if db.UseXattrs() {
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
// There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing,
// so deleteDoc is always returned as false.
if currentValue == nil || len(currentValue) == 0 {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
if !shouldUpdate {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}
doc.SetCrc32cUserXattrHash()

// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
if db.useMou() {
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
}

_, rawSyncXattr, _, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawSyncXattr,
},
Expiry: updatedExpiry,
}
if db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
}
if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
// resyncDocument is not called on tombstoned documents, so this value will only be empty if the document was
// deleted between DCP event and calling this function. In any case, we do not need to update it.
if len(currentValue) == 0 {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
} else {
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
// Be careful: this block can be invoked multiple times if there are races!
if currentValue == nil {
return nil, nil, false, base.ErrUpdateCancel // someone deleted it?!
}
doc, err := unmarshalDocument(docid, currentValue)
if err != nil {
return nil, nil, false, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
if err != nil {
return nil, nil, false, err
}
if shouldUpdate {
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}
fmt.Printf("%+v\n", doc)
updatedDoc, shouldUpdate, updatedExpiry, _, updatedUnusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, nil)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
if !shouldUpdate {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
if updatedExpiry != nil {
updatedDoc.UpdateExpiry(*updatedExpiry)
}
doc.SetCrc32cUserXattrHash()

updatedBytes, marshalErr := base.JSONMarshal(updatedDoc)
return updatedBytes, updatedExpiry, false, marshalErr
} else {
return nil, nil, false, base.ErrUpdateCancel
// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
fmt.Printf("updatedDoc= %+v\n", updatedDoc)
_, rawSyncXattr, _, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: This declaration here shadows the declaration on line 1827

Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawSyncXattr,
base.MouXattrName: rawMouXattr,
},
Expiry: updatedExpiry,
}
if doc.MetadataOnlyUpdate != nil {
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
}
})
}
if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, docid, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, previousDoc, opts, writeUpdateFunc)
if err == nil {
base.Audit(ctx, base.AuditIDDocumentResync, base.AuditFields{
base.AuditFieldDocID: docid,
base.AuditFieldDocVersion: updatedDoc.CVOrRevTreeID(),
})
}
return updatedHighSeq, unusedSequences, err
return updatedUnusedSequences, err
}

// invalidateAllPrincipals invalidates computed channels and roles for all users/roles, for the specified collections:
Expand Down
13 changes: 12 additions & 1 deletion db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,20 @@ func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
return xattrKeys
}

// syncGlobalSyncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncMouAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName,
base.MouXattrName, base.GlobalXattrName}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
return xattrKeys
}

// syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.VirtualXattrRevSeqNo}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName, base.GlobalXattrName)
}
Expand Down
7 changes: 4 additions & 3 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3918,12 +3918,13 @@ func Test_resyncDocument(t *testing.T) {
_, err = collection.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

preResyncDoc, err := collection.GetDocument(ctx, docID, DocUnmarshalAll)
preResyncDoc, _, err := collection.getDocWithXattrs(ctx, docID, collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), DocUnmarshalAll)
require.NoError(t, err)
if !tc.useHLV {
require.Nil(t, preResyncDoc.HLV)
}
_, _, err = collection.ResyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})

_, err = collection.ResyncDocument(ctx, docID, getBucketDocument(t, collection.DatabaseCollection, docID), false)
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)
Expand All @@ -3943,7 +3944,6 @@ func Test_resyncDocument(t *testing.T) {
}
assert.True(t, found)

require.NoError(t, err)
if tc.useHLV {
require.NotNil(t, postResyncDoc.HLV)
require.Equal(t, Version{
Expand All @@ -3953,6 +3953,7 @@ func Test_resyncDocument(t *testing.T) {
SourceID: postResyncDoc.HLV.SourceID,
Value: postResyncDoc.HLV.Version,
})
assert.Equal(t, preResyncDoc.Cas, postResyncDoc.HLV.CurrentVersionCAS)
} else {
require.Nil(t, postResyncDoc.HLV)
}
Expand Down
30 changes: 15 additions & 15 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,21 +612,6 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
return rawDoc, syncData, nil
}

func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) {
if dataType&base.MemcachedDataTypeXattr == 0 {
return unmarshalDocument(docid, data)
}
xattrKeys := []string{base.SyncXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data)
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {

valid := doc != nil && doc.GetRevTreeID() != "" && (doc.Sequence > 0)
Expand Down Expand Up @@ -1578,3 +1563,18 @@ func (d DocVersion) CVOrRevTreeID() string {
}
return d.RevTreeID
}

// bucketDocumentFromFeed converts a sgbucket.FeedEvent to a sgbucket.BucketDocument
func bucketDocumentFromFeed(event sgbucket.FeedEvent) (*sgbucket.BucketDocument, error) {
body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value)
if err != nil {
return nil, err
}
return &sgbucket.BucketDocument{
Body: body,
Xattrs: xattrs,
Cas: event.Cas,
Expiry: event.Expiry,
IsTombstone: event.Opcode == sgbucket.FeedOpDeletion,
}, nil
}
30 changes: 23 additions & 7 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,9 @@ func TestImportWithStaleBucketDocCorrectExpiry(t *testing.T) {
assert.NoError(t, err, "Error writing doc w/ expiry")

// Get the existing bucket doc
_, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll)
assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err))

body = Body{}
err = body.Unmarshal(existingBucketDoc.Body)
assert.NoError(t, err, "Error unmarshalling body")
existingBucketDoc := getBucketDocument(t, collection.DatabaseCollection, key)
require.NoError(t, err)

// Set the expiry value
syncMetaExpiryUnix := syncMetaExpiry.Unix()
expiry := uint32(syncMetaExpiryUnix)

Expand Down Expand Up @@ -1494,3 +1489,24 @@ func TestImportWithSyncCVAndNoVV(t *testing.T) {
base.RequireWaitForStat(t, db.DbStats.Database().Crc32MatchCount.Value, 1)

}

func getBucketDocument(t *testing.T, collection *DatabaseCollection, docID string) *sgbucket.BucketDocument {
ctx := base.TestCtx(t)
xattrNames := append(collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), base.VirtualExpiry)
body, xattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, docID, xattrNames)
require.NoError(t, err)
var expiry uint32
if expiryBytes, ok := xattrs[base.VirtualExpiry]; ok {
err := base.JSONUnmarshal(expiryBytes, &expiry)
require.NoError(t, err)
delete(xattrs, base.VirtualExpiry)
}

return &sgbucket.BucketDocument{
Body: body,
Xattrs: xattrs,
Cas: cas,
Expiry: expiry,
IsTombstone: len(body) == 0,
}
}
3 changes: 1 addition & 2 deletions rest/legacy_rev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func TestResyncLegacyRev(t *testing.T) {
// use ResyncDocument and TakeDbOffline/Online instead of /ks/_config/sync && /db/_resync to work under rosmar which
// doesn't yet support DCP resync or updating config on an existing bucket.
regenerateSequences := false
var unusedSequences []uint64
_, _, err = collection.ResyncDocument(ctx, docID, docID, regenerateSequences, unusedSequences)
_, err = collection.ResyncDocument(ctx, docID, nil, regenerateSequences)
require.NoError(t, err)

rt.TakeDbOffline()
Expand Down