Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,19 @@ func (hlv *HybridLogicalVector) ToHistoryForHLV() string {
return hlv.toHistoryForHLV(HLVVersions.unsorted)
}

// HLVDebugString represents the HLV as a string with entries sorted for easier comparison in tests
func (hlv *HybridLogicalVector) HLVDebugString() string {
fullHLV := hlv.GetCurrentVersionString()
history := hlv.toHistoryForHLV(HLVVersions.sorted)
if history == "" {
return fullHLV
}
if hlv.MergeVersions != nil {
return fullHLV + "," + history
}
return fullHLV + ";" + history
}

// toHistoryForHLV formats HLV property for blip history.
func (hlv *HybridLogicalVector) toHistoryForHLV(sortFunc func(HLVVersions) iter.Seq2[string, uint64]) string {
// take pv and mv from hlv if defined and add to history
Expand Down
13 changes: 8 additions & 5 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, do
require.Fail(p.TB(), fmt.Sprintf("unsupported peer type %s for creating document", p.Type()))
}
docMetadata := DocMetadataFromDocVersion(p.TB(), docID, hlv, docVersion)
p.TB().Logf("%s: Created document %s with %#v", p, docID, docMetadata)
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Created document %s with %#v", p, docID, docMetadata)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Created document %s with %#v", p, docID, docMetadata.HLVString())
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand All @@ -137,7 +138,8 @@ func (p *CouchbaseLiteMockPeer) WriteDocument(dsName sgbucket.DataStoreName, doc
require.Fail(p.TB(), fmt.Sprintf("unsupported peer type %s for writing document", p.Type()))
}
docMetadata := DocMetadataFromDocVersion(p.TB(), docID, hlv, docVersion)
p.TB().Logf("%s: Wrote document %s with %#+v", p, docID, docMetadata)
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Wrote document %s with %#v", p, docID, docMetadata)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Wrote document %s with %#v", p, docID, docMetadata.HLVString())
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand All @@ -155,7 +157,8 @@ func (p *CouchbaseLiteMockPeer) DeleteDocument(dsName sgbucket.DataStoreName, do
}
docVersion, hlv := client.Delete(docID, parentVersion)
docMeta := DocMetadataFromDocVersion(p.TB(), docID, hlv, docVersion)
p.TB().Logf("%s: Deleted document %s with %#+v", p, docID, docMeta)
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Deleted document %s with %#v", p, docID, docMeta)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Deleted document %s with %#v", p, docID, docMeta.HLVString())
return docMeta
}

Expand Down Expand Up @@ -324,7 +327,7 @@ func (r *CouchbaseLiteMockReplication) PassivePeer() Peer {

// Start starts the replication
func (r *CouchbaseLiteMockReplication) Start() {
r.btc.TB().Logf("starting CBL replication: %s", r)
base.InfofCtx(r.btc.TB().Context(), base.KeySGTest, "Starting CBL replication: %s", r)
switch r.direction {
case PeerReplicationDirectionPush:
r.btcRunner.StartPush(r.btc.ID())
Expand All @@ -337,7 +340,7 @@ func (r *CouchbaseLiteMockReplication) Start() {

// Stop halts the replication. The replication can be restarted after it is stopped.
func (r *CouchbaseLiteMockReplication) Stop() {
r.btc.TB().Logf("stopping CBL replication: %s", r)
base.InfofCtx(r.btc.TB().Context(), base.KeySGTest, "Stopping CBL replication: %s", r)
switch r.direction {
case PeerReplicationDirectionPush:
r.btcRunner.StopPush(r.btc.ID())
Expand Down
12 changes: 7 additions & 5 deletions topologytest/couchbase_server_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func (r *CouchbaseServerReplication) PassivePeer() Peer {

// Start starts the replication
func (r *CouchbaseServerReplication) Start() {
r.t.Logf("starting XDCR replication %s", r)
ctx := base.TestCtx(r.t)
base.InfofCtx(ctx, base.KeySGTest, "starting XDCR replication %s", r)
require.NoError(r.t, r.manager.Start(r.ctx))
}

// Stop halts the replication. The replication can be restarted after it is stopped.
func (r *CouchbaseServerReplication) Stop() {
r.t.Logf("stopping XDCR replication %s", r)
ctx := base.TestCtx(r.t)
base.InfofCtx(ctx, base.KeySGTest, "stopping XDCR replication %s", r)
require.NoError(r.t, r.manager.Stop(r.ctx))
}

Expand Down Expand Up @@ -130,7 +132,7 @@ func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docI
Cas: cas,
ImplicitHLV: implicitHLV,
}
p.TB().Logf("%s: Created document %s with %#v", p, docID, docMetadata)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Created document %s with %#v", p, docID, docMetadata.HLVString())
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand Down Expand Up @@ -158,7 +160,7 @@ func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.TB(), err)
docMeta := getDocVersion(docID, p, cas, lastXattrs)
p.TB().Logf("%s: Wrote document %s with %#+v", p, docID, docMeta)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Wrote document %s with %#+v", p, docID, docMeta.HLVString())
return BodyAndVersion{
docMeta: docMeta,
body: body,
Expand All @@ -178,7 +180,7 @@ func (p *CouchbaseServerPeer) DeleteDocument(dsName sgbucket.DataStoreName, docI
cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback)
require.NoError(p.TB(), err)
version := getDocVersion(docID, p, cas, lastXattrs)
p.TB().Logf("%s: Deleted document %s with %#+v", p, docID, version)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Deleted document %s with %#+v", p, docID, version.HLVString())
return version
}

Expand Down
25 changes: 18 additions & 7 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ func stripInternalProperties(body db.Body) {

// waitForVersionAndBody waits for a document to reach a specific version on all peers.
func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
ctx := base.TestCtx(t)
base.TracefCtx(ctx, base.KeySGTest, "waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
base.InfofCtx(ctx, base.KeySGTest, "waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
for _, peer := range topology.SortedPeers() {
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
base.TracefCtx(ctx, base.KeySGTest, "waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
requireBodyEqual(t, expectedVersion.body, body)
}
base.InfofCtx(ctx, base.KeySGTest, "found matching doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
}

// waitForCVAndBody waits for a document to reach a specific cv on all peers.
Expand Down Expand Up @@ -95,9 +98,11 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, doc
// - cv:2@rosmar2 on cbs1, cbs2, cbl2
// - cv:2@rosmar2, pv:1@rosmar1 on cbl1
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
ctx := base.TestCtx(t)
base.TracefCtx(ctx, base.KeySGTest, "waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
base.InfofCtx(ctx, base.KeySGTest, "waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
for _, peer := range topology.SortedPeers() {
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
base.TracefCtx(ctx, base.KeySGTest, "waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
var body db.Body
if peer.Type() == PeerTypeCouchbaseLite {
body = peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
Expand All @@ -106,6 +111,7 @@ func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID st
}
requireBodyEqual(t, expectedVersion.body, body)
}
base.InfofCtx(ctx, base.KeySGTest, "found matching doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
}

// waitForConvergingTombstones waits for all peers to have a tombstone document for a given doc ID. This is the
Expand Down Expand Up @@ -151,7 +157,8 @@ func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID st
// - CBL1: 7@rosmar1;5@cbl1
// - CBL2: 8@rosmar2;6@cbl2
func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) {
t.Logf("waiting for converging tombstones")
ctx := base.TestCtx(t)
base.InfofCtx(ctx, base.KeySGTest, "waiting for converging tombstones")
require.EventuallyWithT(t, func(c *assert.CollectT) {
nonCBLVersions := make(map[string]DocMetadata)
for peerName, peer := range topology.SortedPeers() {
Expand Down Expand Up @@ -179,15 +186,19 @@ func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionNam
}
}
}, totalWaitTime, pollInterval)
base.InfofCtx(ctx, base.KeySGTest, "found converging tombstones")
}

// waitForTombstoneVersion waits for a tombstone document with a particular HLV to be present on all peers.
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
ctx := base.TestCtx(t)
base.TracefCtx(ctx, base.KeySGTest, "waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
base.InfofCtx(ctx, base.KeySGTest, "waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
for _, peer := range topology.SortedPeers() {
t.Logf("waiting for tombstone version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
base.InfofCtx(ctx, base.KeySGTest, "waiting for tombstone version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
peer.WaitForTombstoneVersion(dsName, docID, expectedVersion.docMeta, topology)
}
base.InfofCtx(ctx, base.KeySGTest, "found matching tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion.docMeta.HLVString())
}

// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
Expand Down
3 changes: 2 additions & 1 deletion topologytest/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,10 @@ func getPeerBuckets(t *testing.T, peerOptions map[string]PeerOptions) map[PeerBu
func createPeers(t *testing.T, peersOptions map[string]PeerOptions) Peers {
buckets := getPeerBuckets(t, peersOptions)
peers := make(Peers, len(peersOptions))
ctx := base.TestCtx(t)
for id, peerOptions := range peersOptions {
peer := NewPeer(t, id, buckets, peerOptions)
t.Logf("TopologyTest: created peer %s", peer)
base.InfofCtx(ctx, base.KeySGTest, "TopologyTest: created peer %s", peer)
t.Cleanup(func() {
peer.Close()
})
Expand Down
9 changes: 6 additions & 3 deletions topologytest/sync_gateway_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (p *SyncGatewayPeer) GetDocumentIfExists(dsName sgbucket.DataStoreName, doc
// CreateDocument creates a document on the peer. The test will fail if the document already exists.
func (p *SyncGatewayPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
docMetadata := p.writeDocument(dsName, docID, body)
p.TB().Logf("%s: Created document %s with %#+v", p, docID, docMetadata)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Created document %s with %#+v", p, docID, docMetadata.HLVString())
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Created document %s with %#+v", p, docID, docMetadata)
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand Down Expand Up @@ -125,7 +126,8 @@ func (p *SyncGatewayPeer) writeDocument(dsName sgbucket.DataStoreName, docID str
// WriteDocument writes a document to the peer. The test will fail if the write does not succeed.
func (p *SyncGatewayPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
docMetadata := p.writeDocument(dsName, docID, body)
p.TB().Logf("%s: Wrote document %s with %#+v", p, docID, docMetadata)
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Wrote document %s with %#+v", p, docID, docMetadata)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Wrote document %s with %s", p, docID, docMetadata.HLVString())
return BodyAndVersion{
docMeta: docMetadata,
body: body,
Expand All @@ -142,7 +144,8 @@ func (p *SyncGatewayPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID st
_, doc, err = collection.DeleteDoc(ctx, docID, doc.ExtractDocVersion())
require.NoError(p.TB(), err)
docMeta := DocMetadataFromDocument(doc)
p.TB().Logf("%s: Deleted document %s with %#+v", p, docID, docMeta)
base.TracefCtx(p.Context(), base.KeySGTest, "%s: Deleted document %s with %#+v", p, docID, docMeta)
base.InfofCtx(p.Context(), base.KeySGTest, "%s: Deleted document %s with %s", p, docID, docMeta.HLVString())
return docMeta
}

Expand Down
9 changes: 9 additions & 0 deletions topologytest/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ func (v DocMetadata) GoString() string {
return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV)
}

func (v DocMetadata) HLVString() string {
if v.ImplicitHLV != nil {
return v.ImplicitHLV.HLVDebugString()
} else if v.HLV != nil {
return v.HLV.HLVDebugString()
}
return ""
}

// DocMetadataFromDocVersion returns metadata DocVersion from the given document and version.
func DocMetadataFromDocVersion(t testing.TB, docID string, hlv *db.HybridLogicalVector, version rest.DocVersion) DocMetadata {
m := DocMetadata{
Expand Down
Loading