@@ -113,14 +113,23 @@ export class MongoCompactor {
113113
114114 private async compactDirtyBuckets ( ) {
115115 while ( ! this . signal ?. aborted ) {
116- // Process all buckets with 1 or more changes since last time
117- const buckets = await this . dirtyBucketBatch ( { minBucketChanges : 1 } ) ;
116+ // Process all buckets with 10 or more changes since last time.
117+ // We exclude the last 100 compacted buckets, to avoid repeatedly re-compacting the same buckets over and over
118+ // if they are modified while compacting.
119+ const TRACK_RECENTLY_COMPACTED_NUMBER = 100 ;
120+
121+ let recentlyCompacted : string [ ] = [ ] ;
122+ const buckets = await this . dirtyBucketBatch ( { minBucketChanges : 10 , exclude : recentlyCompacted } ) ;
118123 if ( buckets . length == 0 ) {
119124 // All done
120125 break ;
121126 }
122127 for ( let { bucket } of buckets ) {
123128 await this . compactSingleBucket ( bucket ) ;
129+ recentlyCompacted . push ( bucket ) ;
130+ }
131+ if ( recentlyCompacted . length > TRACK_RECENTLY_COMPACTED_NUMBER ) {
132+ recentlyCompacted = recentlyCompacted . slice ( - TRACK_RECENTLY_COMPACTED_NUMBER ) ;
124133 }
125134 }
126135 }
@@ -509,6 +518,7 @@ export class MongoCompactor {
509518 */
510519 private async dirtyBucketBatch ( options : {
511520 minBucketChanges : number ;
521+ exclude ?: string [ ] ;
512522 } ) : Promise < { bucket : string ; estimatedCount : number } [ ] > {
513523 if ( options . minBucketChanges <= 0 ) {
514524 throw new ReplicationAssertionError ( 'minBucketChanges must be >= 1' ) ;
@@ -518,7 +528,8 @@ export class MongoCompactor {
518528 . find (
519529 {
520530 '_id.g' : this . group_id ,
521- 'estimate_since_compact.count' : { $gte : options . minBucketChanges }
531+ 'estimate_since_compact.count' : { $gte : options . minBucketChanges } ,
532+ '_id.b' : { $nin : options . exclude ?? [ ] }
522533 } ,
523534 {
524535 projection : {
0 commit comments