diff --git a/db/blip_handler.go b/db/blip_handler.go index 5ffc6595e4..2de9aff356 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -629,7 +629,7 @@ func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]a bh.replicationStats.SendChangesCount.Add(int64(len(changeArray))) // Spawn a goroutine to await the client's response: go func(bh *blipHandler, sender *blip.Sender, response *blip.Message, changeArray [][]any, sendTime time.Time, dbCollection *DatabaseCollectionWithUser) { - if err := bh.handleChangesResponse(bh.loggingCtx, sender, response, changeArray, sendTime, dbCollection, bh.collectionIdx); err != nil { + if err := bh.handleChangesResponse(bh.loggingCtx, sender, response, changeArray, sendTime, dbCollection, bh.collectionIdx); err != nil && !errors.Is(err, ErrClosedBLIPSender) { base.WarnfCtx(bh.loggingCtx, "Error from bh.handleChangesResponse: %v", err) if bh.fatalErrorCallback != nil { bh.fatalErrorCallback(err) diff --git a/db/changes.go b/db/changes.go index 18c58aba7b..9304d5743c 100644 --- a/db/changes.go +++ b/db/changes.go @@ -996,7 +996,9 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex } else { // On feed error, send the error and exit changes processing if current[i].Err == base.ErrChannelFeed { - base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) + if options.ChangesCtx.Err() == nil { + base.WarnfCtx(ctx, "MultiChangesFeed got error reading changes feed: %v", current[i].Err) + } select { case <-options.ChangesCtx.Done(): case output <- current[i]: diff --git a/db/channel_cache_single.go b/db/channel_cache_single.go index 09cfe2f8fc..336d08ea63 100644 --- a/db/channel_cache_single.go +++ b/db/channel_cache_single.go @@ -410,10 +410,9 @@ func (c *singleChannelCacheImpl) GetChanges(ctx context.Context, options Changes return resultFromCache, nil } - // Check whether the changes process has been terminated while we waited for the view lock, to avoid the view - // overhead in that case (and prevent feedback loop on query backlog) + // Check whether the changes process has been terminated before running a query if options.ChangesCtx.Err() != nil { - return nil, fmt.Errorf("Changes feed cancelled while waiting for view lock") + return nil, fmt.Errorf("Changes feed cancelled %w", options.ChangesCtx.Err()) } // Now query the view. We set the max sequence equal to cacheValidFrom, so we'll get one