From 0d33616e911b2e1bc123c26bbe397b279d68a4c6 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 13 Nov 2025 12:12:29 -0500 Subject: [PATCH 1/4] CBG-4928 do not log warning for closed blip sender --- db/blip_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 5ffc6595e4..2de9aff356 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -629,7 +629,7 @@ func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]a bh.replicationStats.SendChangesCount.Add(int64(len(changeArray))) // Spawn a goroutine to await the client's response: go func(bh *blipHandler, sender *blip.Sender, response *blip.Message, changeArray [][]any, sendTime time.Time, dbCollection *DatabaseCollectionWithUser) { - if err := bh.handleChangesResponse(bh.loggingCtx, sender, response, changeArray, sendTime, dbCollection, bh.collectionIdx); err != nil { + if err := bh.handleChangesResponse(bh.loggingCtx, sender, response, changeArray, sendTime, dbCollection, bh.collectionIdx); err != nil && !errors.Is(err, ErrClosedBLIPSender) { base.WarnfCtx(bh.loggingCtx, "Error from bh.handleChangesResponse: %v", err) if bh.fatalErrorCallback != nil { bh.fatalErrorCallback(err) From dd8319794235845b41d7934ea390ecc557d260e0 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Fri, 14 Nov 2025 14:39:33 +0530 Subject: [PATCH 2/4] update log messages --- base/error.go | 2 +- db/changes.go | 4 ++-- db/channel_cache_single.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/base/error.go b/base/error.go index 7365967e74..bed9095d4f 100644 --- a/base/error.go +++ b/base/error.go @@ -41,7 +41,7 @@ var ( ErrNotFound = &sgError{"Not Found"} ErrUpdateCancel = &sgError{"Cancel update"} ErrImportCancelledPurged = HTTPErrorf(http.StatusNotFound, "Import Cancelled Due to Purge") - ErrChannelFeed = &sgError{"Error while building channel feed"} + ErrChannelFeed = &sgError{"Failed to build channel feed"} ErrTimeout = &sgError{"Operation timed out"} ErrPathNotFound = sgbucket.ErrPathNotFound ErrPathExists = sgbucket.ErrPathExists diff --git a/db/changes.go b/db/changes.go index 18c58aba7b..508a3dd93b 100644 --- a/db/changes.go +++ b/db/changes.go @@ -480,7 +480,7 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha base.TracefCtx(ctx, base.KeyChanges, "Querying channel %q with options: %+v", base.UD(singleChannelCache.ChannelID().Name), paginationOptions) changes, err := singleChannelCache.GetChanges(ctx, paginationOptions) if err != nil { - base.WarnfCtx(ctx, "Error retrieving changes for channel %q: %v", base.UD(singleChannelCache.ChannelID().Name), err) + base.InfofCtx(ctx, base.KeyChanges, "Could not retrieve changes for channel %q: %v", base.UD(singleChannelCache.ChannelID().Name), err) change := ChangeEntry{ Err: base.ErrChannelFeed, } @@ -996,7 +996,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex } else { // On feed error, send the error and exit changes processing if current[i].Err == base.ErrChannelFeed { - base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) + base.InfofCtx(ctx, base.KeyChanges, "Could not read changes feed: %v", current[i].Err) select { case <-options.ChangesCtx.Done(): case output <- current[i]: diff --git a/db/channel_cache_single.go b/db/channel_cache_single.go index 09cfe2f8fc..f2b5665d11 100644 --- a/db/channel_cache_single.go +++ b/db/channel_cache_single.go @@ -413,7 +413,7 @@ func (c *singleChannelCacheImpl) GetChanges(ctx context.Context, options Changes // Check whether the changes process has been terminated while we waited for the view lock, to avoid the view // overhead in that case (and prevent feedback loop on query backlog) if options.ChangesCtx.Err() != nil { - return nil, fmt.Errorf("Changes feed cancelled while waiting for view lock") + return nil, fmt.Errorf("Changes feed cancelled") } // Now query the view. We set the max sequence equal to cacheValidFrom, so we'll get one From ae24539dbd22c88ff872c633f9eb42448f4112d5 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Wed, 19 Nov 2025 17:30:19 +0530 Subject: [PATCH 3/4] fixes based on PR comments --- db/changes.go | 6 +++--- db/channel_cache_single.go | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/db/changes.go b/db/changes.go index 508a3dd93b..915a4cde41 100644 --- a/db/changes.go +++ b/db/changes.go @@ -480,7 +480,7 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha base.TracefCtx(ctx, base.KeyChanges, "Querying channel %q with options: %+v", base.UD(singleChannelCache.ChannelID().Name), paginationOptions) changes, err := singleChannelCache.GetChanges(ctx, paginationOptions) if err != nil { - base.InfofCtx(ctx, base.KeyChanges, "Could not retrieve changes for channel %q: %v", base.UD(singleChannelCache.ChannelID().Name), err) + base.WarnfCtx(ctx, "Error retrieving changes for channel %q: %v", base.UD(singleChannelCache.ChannelID().Name), err) change := ChangeEntry{ Err: base.ErrChannelFeed, } @@ -995,8 +995,8 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex feeds[i] = nil } else { // On feed error, send the error and exit changes processing - if current[i].Err == base.ErrChannelFeed { - base.InfofCtx(ctx, base.KeyChanges, "Could not read changes feed: %v", current[i].Err) + if options.ChangesCtx.Err() != nil { + base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) select { case <-options.ChangesCtx.Done(): case output <- current[i]: diff --git a/db/channel_cache_single.go b/db/channel_cache_single.go index f2b5665d11..336d08ea63 100644 --- a/db/channel_cache_single.go +++ b/db/channel_cache_single.go @@ -410,10 +410,9 @@ func (c *singleChannelCacheImpl) GetChanges(ctx context.Context, options Changes return resultFromCache, nil } - // Check whether the changes process has been terminated while we waited for the view lock, to avoid the view - // overhead in that case (and prevent feedback loop on query backlog) + // Check whether the changes process has been terminated before running a query if options.ChangesCtx.Err() != nil { - return nil, fmt.Errorf("Changes feed cancelled") + return nil, fmt.Errorf("Changes feed cancelled %w", options.ChangesCtx.Err()) } // Now query the view. We set the max sequence equal to cacheValidFrom, so we'll get one From b028e25eb7ab566d7e66348be022f54562a978c7 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Fri, 21 Nov 2025 19:12:12 +0530 Subject: [PATCH 4/4] fixes based on pr comments --- base/error.go | 2 +- db/changes.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/base/error.go b/base/error.go index bed9095d4f..7365967e74 100644 --- a/base/error.go +++ b/base/error.go @@ -41,7 +41,7 @@ var ( ErrNotFound = &sgError{"Not Found"} ErrUpdateCancel = &sgError{"Cancel update"} ErrImportCancelledPurged = HTTPErrorf(http.StatusNotFound, "Import Cancelled Due to Purge") - ErrChannelFeed = &sgError{"Failed to build channel feed"} + ErrChannelFeed = &sgError{"Error while building channel feed"} ErrTimeout = &sgError{"Operation timed out"} ErrPathNotFound = sgbucket.ErrPathNotFound ErrPathExists = sgbucket.ErrPathExists diff --git a/db/changes.go b/db/changes.go index 915a4cde41..9304d5743c 100644 --- a/db/changes.go +++ b/db/changes.go @@ -995,8 +995,10 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex feeds[i] = nil } else { // On feed error, send the error and exit changes processing - if options.ChangesCtx.Err() != nil { - base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) + if current[i].Err == base.ErrChannelFeed { + if options.ChangesCtx.Err() == nil { + base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) + } select { case <-options.ChangesCtx.Done(): case output <- current[i]: