Skip to content

Commit d889219

Browse files
authored
Fix memory leaks when retrying replication (#395)
* Clean up ended ConnectionManagerFactory instances. * Re-use connectionManager for keepAlive. * Remove replicateLoop - let the replicator handle retries. * Cleanup MongoManager instances. * Include lock expiration time in warning message. * Lazy-instantiate checksum caches to reduce replication memory usage. * MongoDB: Remove replicateLoop - let the replicator handle retries. * MySQL: Remove replicateLoop - let the replicator handle retries. * Cleanup MySQLConnectionManager instances. * More consistent error handling. * Use a single MongoErrorRateLimiter, and properly report errors. * More error handling tweaks. * Fix test. * Changeset. * Fix test issues after changing context dispose logic.
1 parent 7eb7957 commit d889219

File tree

19 files changed

+348
-299
lines changed

19 files changed

+348
-299
lines changed

.changeset/nervous-dots-behave.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-module-postgres': patch
5+
'@powersync/service-module-mongodb': patch
6+
'@powersync/service-core': patch
7+
'@powersync/service-module-mysql': patch
8+
'@powersync/service-image': patch
9+
---
10+
11+
Fix memory leaks when retrying replication after errors.

modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,28 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
4242
* 4. computePartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time
4343
*/
4444
export class MongoChecksums {
45-
private cache = new ChecksumCache({
46-
fetchChecksums: (batch) => {
47-
return this.computePartialChecksums(batch);
48-
}
49-
});
45+
private _cache: ChecksumCache | undefined;
5046

5147
constructor(
5248
private db: PowerSyncMongo,
5349
private group_id: number,
5450
private options?: MongoChecksumOptions
5551
) {}
5652

53+
/**
54+
* Lazy-instantiated cache.
55+
*
56+
* This means the cache only allocates memory once it is used for the first time.
57+
*/
58+
private get cache(): ChecksumCache {
59+
this._cache ??= new ChecksumCache({
60+
fetchChecksums: (batch) => {
61+
return this.computePartialChecksums(batch);
62+
}
63+
});
64+
return this._cache;
65+
}
66+
5767
/**
5868
* Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from
5969
* the database (bucket_state + bucket_data).

modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,19 @@ export class MongoSyncRulesLock implements storage.ReplicationLock {
3333
);
3434

3535
if (doc == null) {
36-
throw new ServiceError(
37-
ErrorCode.PSYNC_S1003,
38-
`Sync rules: ${sync_rules.id} have been locked by another process for replication.`
39-
);
36+
// Query the existing lock to get the expiration time (best effort - it may have been released in the meantime).
37+
const heldLock = await db.sync_rules.findOne({ _id: sync_rules.id }, { projection: { lock: 1 } });
38+
if (heldLock?.lock?.expires_at) {
39+
throw new ServiceError(
40+
ErrorCode.PSYNC_S1003,
41+
`Sync rules: ${sync_rules.id} have been locked by another process for replication, expiring at ${heldLock.lock.expires_at.toISOString()}.`
42+
);
43+
} else {
44+
throw new ServiceError(
45+
ErrorCode.PSYNC_S1003,
46+
`Sync rules: ${sync_rules.id} have been locked by another process for replication.`
47+
);
48+
}
4049
}
4150
return new MongoSyncRulesLock(db, sync_rules.id, lockId);
4251
}

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ export interface SyncRuleDocument {
197197
last_fatal_error: string | null;
198198

199199
content: string;
200+
201+
lock?: {
202+
id: string;
203+
expires_at: Date;
204+
} | null;
200205
}
201206

202207
export interface CheckpointEventDocument {

modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,39 +27,35 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
2727
// Nothing needed here
2828
}
2929

30-
private get slotName() {
31-
return this.options.storage.slot_name;
32-
}
33-
3430
async replicate() {
3531
try {
36-
await this.replicateLoop();
32+
await this.replicateOnce();
3733
} catch (e) {
38-
// Fatal exception
39-
container.reporter.captureException(e, {
40-
metadata: {}
41-
});
42-
this.logger.error(`Replication failed`, e);
34+
if (!this.abortController.signal.aborted) {
35+
container.reporter.captureException(e, {
36+
metadata: {}
37+
});
38+
39+
this.logger.error(`Replication error`, e);
40+
if (e.cause != null) {
41+
// Without this additional log, the cause may not be visible in the logs.
42+
this.logger.error(`cause`, e.cause);
43+
}
44+
45+
this.rateLimiter.reportError(e);
46+
}
4347

4448
if (e instanceof ChangeStreamInvalidatedError) {
4549
// This stops replication and restarts with a new instance
4650
await this.options.storage.factory.restartReplication(this.storage.group_id);
4751
}
52+
53+
// No need to rethrow - the error is already logged, and retry behavior is the same on error
4854
} finally {
4955
this.abortController.abort();
5056
}
5157
}
5258

53-
async replicateLoop() {
54-
while (!this.isStopped) {
55-
await this.replicateOnce();
56-
57-
if (!this.isStopped) {
58-
await new Promise((resolve) => setTimeout(resolve, 5000));
59-
}
60-
}
61-
}
62-
6359
async replicateOnce() {
6460
// New connections on every iteration (every error with retry),
6561
// otherwise we risk repeating errors related to the connection,
@@ -79,25 +75,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
7975
});
8076
this.lastStream = stream;
8177
await stream.replicate();
82-
} catch (e) {
83-
if (this.abortController.signal.aborted) {
84-
return;
85-
}
86-
this.logger.error(`Replication error`, e);
87-
if (e.cause != null) {
88-
// Without this additional log, the cause may not be visible in the logs.
89-
this.logger.error(`cause`, e.cause);
90-
}
91-
if (e instanceof ChangeStreamInvalidatedError) {
92-
throw e;
93-
} else {
94-
// Report the error if relevant, before retrying
95-
container.reporter.captureException(e, {
96-
metadata: {}
97-
});
98-
// This sets the retry delay
99-
this.rateLimiter?.reportError(e);
100-
}
10178
} finally {
10279
await connectionManager.end();
10380
}

modules/module-mongodb/src/replication/ChangeStreamReplicator.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator<Chang
2525
metrics: this.metrics,
2626
connectionFactory: this.connectionFactory,
2727
lock: options.lock,
28-
rateLimiter: new MongoErrorRateLimiter()
28+
rateLimiter: this.rateLimiter
2929
});
3030
}
3131

modules/module-mongodb/src/replication/ConnectionManagerFactory.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,28 @@ import { NormalizedMongoConnectionConfig } from '../types/types.js';
33
import { MongoManager } from './MongoManager.js';
44

55
export class ConnectionManagerFactory {
6-
private readonly connectionManagers: MongoManager[];
6+
private readonly connectionManagers = new Set<MongoManager>();
77
public readonly dbConnectionConfig: NormalizedMongoConnectionConfig;
88

99
constructor(dbConnectionConfig: NormalizedMongoConnectionConfig) {
1010
this.dbConnectionConfig = dbConnectionConfig;
11-
this.connectionManagers = [];
1211
}
1312

1413
create() {
1514
const manager = new MongoManager(this.dbConnectionConfig);
16-
this.connectionManagers.push(manager);
15+
this.connectionManagers.add(manager);
16+
17+
manager.registerListener({
18+
onEnded: () => {
19+
this.connectionManagers.delete(manager);
20+
}
21+
});
1722
return manager;
1823
}
1924

2025
async shutdown() {
2126
logger.info('Shutting down MongoDB connection Managers...');
22-
for (const manager of this.connectionManagers) {
27+
for (const manager of [...this.connectionManagers]) {
2328
await manager.end();
2429
}
2530
logger.info('MongoDB connection Managers shutdown completed.');

modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { ErrorRateLimiter } from '@powersync/service-core';
22
import { setTimeout } from 'timers/promises';
3+
import { ChangeStreamInvalidatedError } from './ChangeStream.js';
34

45
export class MongoErrorRateLimiter implements ErrorRateLimiter {
56
nextAllowed: number = Date.now();
67

78
async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise<void> {
89
const delay = Math.max(0, this.nextAllowed - Date.now());
9-
// Minimum delay between connections, even without errors
10+
// Minimum delay between connections, even without errors (for the next attempt)
1011
this.setDelay(500);
1112
await setTimeout(delay, undefined, { signal: options?.signal });
1213
}
@@ -18,9 +19,12 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter {
1819
reportError(e: any): void {
1920
// FIXME: Check mongodb-specific requirements
2021
const message = (e.message as string) ?? '';
21-
if (message.includes('password authentication failed')) {
22-
// Wait 15 minutes, to avoid triggering Supabase's fail2ban
23-
this.setDelay(900_000);
22+
if (e instanceof ChangeStreamInvalidatedError) {
23+
// Short delay
24+
this.setDelay(2_000);
25+
} else if (message.includes('Authentication failed')) {
26+
// Wait 2 minutes, to avoid triggering too many authentication attempts
27+
this.setDelay(120_000);
2428
} else if (message.includes('ENOTFOUND')) {
2529
// DNS lookup issue - incorrect URI or deleted instance
2630
this.setDelay(120_000);

modules/module-mongodb/src/replication/MongoManager.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,24 @@ import { mongo } from '@powersync/lib-service-mongodb';
22

33
import { NormalizedMongoConnectionConfig } from '../types/types.js';
44
import { BSON_DESERIALIZE_DATA_OPTIONS, POWERSYNC_VERSION } from '@powersync/service-core';
5+
import { BaseObserver } from '@powersync/lib-services-framework';
6+
7+
export interface MongoManagerListener {
8+
onEnded(): void;
9+
}
510

611
/**
712
* Manage a MongoDB source database connection.
813
*/
9-
export class MongoManager {
14+
export class MongoManager extends BaseObserver<MongoManagerListener> {
1015
public readonly client: mongo.MongoClient;
1116
public readonly db: mongo.Db;
1217

1318
constructor(
1419
public options: NormalizedMongoConnectionConfig,
1520
overrides?: mongo.MongoClientOptions
1621
) {
22+
super();
1723
// The pool is lazy - no connections are opened until a query is performed.
1824
this.client = new mongo.MongoClient(options.uri, {
1925
auth: {
@@ -59,9 +65,8 @@ export class MongoManager {
5965

6066
async end(): Promise<void> {
6167
await this.client.close();
62-
}
63-
64-
async destroy() {
65-
// TODO: Implement?
68+
this.iterateListeners((listener) => {
69+
listener.onEnded?.();
70+
});
6671
}
6772
}

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,18 @@ export class ChangeStreamTestContext {
5757
initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
5858
}
5959

60-
async dispose() {
60+
/**
61+
* Abort snapshot and/or replication, without actively closing connections.
62+
*/
63+
abort() {
6164
this.abortController.abort();
65+
}
66+
67+
async dispose() {
68+
this.abort();
6269
await this.streamPromise?.catch((e) => e);
63-
await this.connectionManager.destroy();
6470
await this.factory[Symbol.asyncDispose]();
71+
await this.connectionManager.end();
6572
}
6673

6774
async [Symbol.asyncDispose]() {

0 commit comments

Comments
 (0)