Skip to content

Commit b5a4428

Browse files
committed
Cleaner abort handling.
1 parent 0041ff3 commit b5a4428

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -828,9 +828,11 @@ export class MongoBucketBatch
828828
const result = await this.db.current_data.deleteMany({
829829
pending_delete: { $exists: true, $lte: lastCheckpoint ?? 1_000_000 }
830830
});
831-
this.logger.info(
832-
`Cleaned up ${result.deletedCount} pending delete current_data records for checkpoint ${lastCheckpoint}`
833-
);
831+
if (result.deletedCount > 0) {
832+
this.logger.info(
833+
`Cleaned up ${result.deletedCount} pending delete current_data records for checkpoint ${lastCheckpoint}`
834+
);
835+
}
834836
}
835837

836838
/**

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
container,
44
logger as defaultLogger,
55
Logger,
6+
ReplicationAbortedError,
67
ReplicationAssertionError
78
} from '@powersync/lib-services-framework';
89
import {
@@ -345,16 +346,33 @@ export class WalStream {
345346
try {
346347
this.initPromise = this.initReplication();
347348
await this.initPromise;
348-
streamPromise = this.streamChanges();
349-
loopPromise = this.snapshotter.replicationLoop();
350-
await Promise.race([loopPromise, streamPromise]);
349+
// These Promises are both expected to run until aborted or error.
350+
streamPromise = this.streamChanges().finally(() => {
351+
this.abortController.abort();
352+
});
353+
loopPromise = this.snapshotter.replicationLoop().finally(() => {
354+
this.abortController.abort();
355+
});
356+
const results = await Promise.allSettled([loopPromise, streamPromise]);
357+
// First, prioritize non-aborted errors
358+
for (let result of results) {
359+
if (result.status == 'rejected' && !(result.reason instanceof ReplicationAbortedError)) {
360+
throw result.reason;
361+
}
362+
}
363+
// Then include aborted errors
364+
for (let result of results) {
365+
if (result.status == 'rejected') {
366+
throw result.reason;
367+
}
368+
}
369+
// If we get here, both Promises completed successfully, which is unexpected.
370+
throw new ReplicationAssertionError(`Replication loop exited unexpectedly`);
351371
} catch (e) {
352372
await this.storage.reportError(e);
353373
throw e;
354374
} finally {
355375
this.abortController.abort();
356-
// Wait for both to finish, to ensure proper cleanup.
357-
await Promise.all([loopPromise?.catch((_) => {}), streamPromise?.catch((_) => {})]);
358376
}
359377
}
360378

0 commit comments

Comments
 (0)