Skip to content

Commit bb09c29

Browse files
committed
Reduce batch size for bucket checksum pre-calculations.
1 parent d889219 commit bb09c29

File tree

2 files changed

+23
-8
lines changed

2 files changed

+23
-8
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export class MongoChecksums {
169169
// Limit the number of buckets we query for at a time.
170170
const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT;
171171

172-
if (batch.length < bucketBatchLimit) {
172+
if (batch.length <= bucketBatchLimit) {
173173
// Single batch - no need for splitting the batch and merging results
174174
return await this.computePartialChecksumsInternal(batch);
175175
}

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class MongoCompactor {
119119
// All done
120120
break;
121121
}
122-
for (let bucket of buckets) {
122+
for (let { bucket } of buckets) {
123123
await this.compactSingleBucket(bucket);
124124
}
125125
}
@@ -482,10 +482,20 @@ export class MongoCompactor {
482482
break;
483483
}
484484
const start = Date.now();
485-
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);
485+
logger.info(`Calculating checksums for batch of ${buckets.length} buckets`);
486486

487-
await this.updateChecksumsBatch(buckets);
488-
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
487+
// Filter batch by estimated bucket size, to reduce possibility of timeouts
488+
let checkBuckets: typeof buckets = [];
489+
let totalCountEstimate = 0;
490+
for (let bucket of buckets) {
491+
checkBuckets.push(bucket);
492+
totalCountEstimate += bucket.estimatedCount;
493+
if (totalCountEstimate > 50_000) {
494+
break;
495+
}
496+
}
497+
await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket));
498+
logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`);
489499
count += buckets.length;
490500
}
491501
return { buckets: count };
@@ -497,7 +507,9 @@ export class MongoCompactor {
497507
* This cannot be used to iterate on its own - the client is expected to process these buckets and
498508
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
499509
*/
500-
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
510+
private async dirtyBucketBatch(options: {
511+
minBucketChanges: number;
512+
}): Promise<{ bucket: string; estimatedCount: number }[]> {
501513
if (options.minBucketChanges <= 0) {
502514
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
503515
}
@@ -515,13 +527,16 @@ export class MongoCompactor {
515527
sort: {
516528
'estimate_since_compact.count': -1
517529
},
518-
limit: 5_000,
530+
limit: 200,
519531
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
520532
}
521533
)
522534
.toArray();
523535

524-
return dirtyBuckets.map((bucket) => bucket._id.b);
536+
return dirtyBuckets.map((bucket) => ({
537+
bucket: bucket._id.b,
538+
estimatedCount: bucket.estimate_since_compact!.count + (bucket.compacted_state?.count ?? 0)
539+
}));
525540
}
526541

527542
private async updateChecksumsBatch(buckets: string[]) {

0 commit comments

Comments
 (0)