Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/little-pants-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB] Optimize change stream filters to avoid PSYNC_S1345 timeouts
28 changes: 22 additions & 6 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ export class ChangeStream {

private snapshotChunkLength: number;

private changeStreamTimeout: number;

constructor(options: ChangeStreamOptions) {
this.storage = options.storage;
this.metrics = options.metrics;
Expand All @@ -122,6 +124,9 @@ export class ChangeStream {
this.sync_rules = options.storage.getParsedSyncRules({
defaultSchema: this.defaultDb.databaseName
});
// The change stream aggregation command should timeout before the socket times out,
// so we use 90% of the socket timeout value.
this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9);

this.abort_signal = options.abort_signal;
this.abort_signal.addEventListener(
Expand Down Expand Up @@ -411,8 +416,10 @@ export class ChangeStream {
private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } {
const sourceTables = this.sync_rules.getSourceTables();

let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }];
let $refilters: any[] = [];
let $inFilters: { db: string; coll: string }[] = [
{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }
];
let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = [];
let multipleDatabases = false;
for (let tablePattern of sourceTables) {
if (tablePattern.connectionTag != this.connections.connectionTag) {
Expand All @@ -435,10 +442,15 @@ export class ChangeStream {
});
}
}
const nsFilter = multipleDatabases
? // cluster-level: filter on the entire namespace
{ ns: { $in: $inFilters } }
: // collection-level: filter on coll only
{ 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } };
if ($refilters.length > 0) {
return { $match: { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }, multipleDatabases };
return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases };
}
return { $match: { ns: { $in: $inFilters } }, multipleDatabases };
return { $match: nsFilter, multipleDatabases };
}

static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
Expand Down Expand Up @@ -747,11 +759,11 @@ export class ChangeStream {
} else {
fullDocument = 'updateLookup';
}

const streamOptions: mongo.ChangeStreamOptions = {
showExpandedEvents: true,
maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS,
fullDocument: fullDocument
fullDocument: fullDocument,
maxTimeMS: this.changeStreamTimeout
};

/**
Expand Down Expand Up @@ -1103,6 +1115,10 @@ function mapChangeStreamError(e: any) {
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
// We wrap the error to make it more useful.
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
} else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') {
// maxTimeMS was reached. Example message:
// MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
} else if (
isMongoServerError(e) &&
e.codeName == 'NoMatchingDocument' &&
Expand Down