Skip to content

Commit 0041ff3

Browse files
committed
Implement soft deletes for current_data.
1 parent 77cd69b commit 0041ff3

File tree

5 files changed

+147
-17
lines changed

5 files changed

+147
-17
lines changed

modules/module-mongodb-storage/src/migrations/db/migrations/1764667093139-current-data-cleanup.ts

Whitespace-only changes.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ export const MAX_ROW_SIZE = 15 * 1024 * 1024;
4343
// In the future, we can investigate allowing multiple replication streams operating independently.
4444
const replicationMutex = new utils.Mutex();
4545

46+
export const EMPTY_DATA = new bson.Binary(bson.serialize({}));
47+
4648
export interface MongoBucketBatchOptions {
4749
db: PowerSyncMongo;
4850
syncRules: SqlSyncRules;
@@ -387,7 +389,7 @@ export class MongoBucketBatch
387389

388390
let afterData: bson.Binary | undefined;
389391
if (afterId != null && !this.storeCurrentData) {
390-
afterData = new bson.Binary(bson.serialize({}));
392+
afterData = EMPTY_DATA;
391393
} else if (afterId != null) {
392394
try {
393395
// This will fail immediately if the record is > 16MB.
@@ -552,8 +554,10 @@ export class MongoBucketBatch
552554

553555
if (afterId == null || !storage.replicaIdEquals(beforeId, afterId)) {
554556
// Either a delete (afterId == null), or replaced the old replication id
555-
// TODO: soft delete
556-
batch.deleteCurrentData(before_key);
557+
// Note that this is a soft delete.
558+
// We don't specifically need a new or unique op_id here, but it must be greater than the
559+
// last checkpoint, so we use next().
560+
batch.deleteCurrentData(before_key, opSeq.next());
557561
}
558562
return result;
559563
}
@@ -764,7 +768,8 @@ export class MongoBucketBatch
764768
snapshot_done: 1,
765769
last_checkpoint_lsn: 1,
766770
no_checkpoint_before: 1,
767-
keepalive_op: 1
771+
keepalive_op: 1,
772+
last_checkpoint: 1
768773
}
769774
}
770775
);
@@ -777,7 +782,8 @@ export class MongoBucketBatch
777782
snapshot_done: 1,
778783
last_checkpoint_lsn: 1,
779784
no_checkpoint_before: 1,
780-
keepalive_op: 1
785+
keepalive_op: 1,
786+
last_checkpoint: 1
781787
}
782788
}
783789
);
@@ -788,11 +794,14 @@ export class MongoBucketBatch
788794
// This can happen when last_checkpoint and keepalive_op would remain unchanged.
789795
updateResult = existing;
790796
}
791-
const checkpointCreated = updateResult.snapshot_done === true && updateResult.last_checkpoint_lsn === lsn;
797+
const checkpointCreated =
798+
updateResult.snapshot_done === true &&
799+
updateResult.last_checkpoint_lsn === lsn &&
800+
updateResult.last_checkpoint != null;
792801

793802
if (!checkpointCreated) {
794803
// Failed on snapshot_done or no_checkpoint_before.
795-
if (Date.now() - this.lastWaitingLogThottled > 5_000 || true) {
804+
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
796805
this.logger.info(
797806
`Waiting before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}. Current state: ${JSON.stringify(
798807
{
@@ -805,15 +814,25 @@ export class MongoBucketBatch
805814
this.lastWaitingLogThottled = Date.now();
806815
}
807816
} else {
808-
this.logger.info(`Created checkpoint at ${lsn}. Persisted op: ${this.persisted_op}`);
817+
this.logger.info(`Created checkpoint at ${lsn}/${updateResult.last_checkpoint}`);
809818
await this.autoActivate(lsn);
810819
await this.db.notifyCheckpoint();
811820
this.persisted_op = null;
812821
this.last_checkpoint_lsn = lsn;
822+
await this.cleanupCurrentData(updateResult.last_checkpoint!);
813823
}
814824
return true;
815825
}
816826

827+
private async cleanupCurrentData(lastCheckpoint: bigint) {
828+
const result = await this.db.current_data.deleteMany({
829+
pending_delete: { $exists: true, $lte: lastCheckpoint ?? 1_000_000 }
830+
});
831+
this.logger.info(
832+
`Cleaned up ${result.deletedCount} pending delete current_data records for checkpoint ${lastCheckpoint}`
833+
);
834+
}
835+
817836
/**
818837
* Switch from processing -> active if relevant.
819838
*
@@ -1001,7 +1020,7 @@ export class MongoBucketBatch
10011020
sourceKey: value._id.k
10021021
});
10031022

1004-
persistedBatch.deleteCurrentData(value._id);
1023+
persistedBatch.deleteCurrentData(value._id, opSeq.next());
10051024
}
10061025
await persistedBatch.flush(this.db, session);
10071026
lastBatchCount = batch.length;

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as bson from 'bson';
55

66
import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework';
77
import { InternalOpId, storage, utils } from '@powersync/service-core';
8-
import { currentBucketKey, MAX_ROW_SIZE } from './MongoBucketBatch.js';
8+
import { currentBucketKey, EMPTY_DATA, MAX_ROW_SIZE } from './MongoBucketBatch.js';
99
import { MongoIdSequence } from './MongoIdSequence.js';
1010
import { PowerSyncMongo } from './db.js';
1111
import {
@@ -243,10 +243,19 @@ export class PersistedBatch {
243243
}
244244
}
245245

246-
deleteCurrentData(id: SourceKey) {
246+
deleteCurrentData(id: SourceKey, checkpointGreaterThan: bigint) {
247247
const op: mongo.AnyBulkWriteOperation<CurrentDataDocument> = {
248-
deleteOne: {
249-
filter: { _id: id }
248+
updateOne: {
249+
filter: { _id: id },
250+
update: {
251+
$set: {
252+
data: EMPTY_DATA,
253+
buckets: [],
254+
lookups: [],
255+
pending_delete: checkpointGreaterThan
256+
}
257+
},
258+
upsert: true
250259
}
251260
};
252261
this.currentData.push(op);

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export interface CurrentDataDocument {
3535
data: bson.Binary;
3636
buckets: CurrentBucket[];
3737
lookups: bson.Binary[];
38+
/**
39+
* If set, this can be deleted, once there is a consistent checkpoint >= pending_delete.
40+
*
41+
* This must only be set if buckets = [], lookups = [].
42+
*/
43+
pending_delete?: bigint;
3844
}
3945

4046
export interface CurrentBucket {

modules/module-postgres/test/src/slow_tests.test.ts

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ import * as pgwire from '@powersync/service-jpgwire';
1414
import { SqliteRow } from '@powersync/service-sync-rules';
1515

1616
import { PgManager } from '@module/replication/PgManager.js';
17-
import { createCoreReplicationMetrics, initializeCoreReplicationMetrics, storage } from '@powersync/service-core';
17+
import { ReplicationAbortedError } from '@powersync/lib-services-framework';
18+
import {
19+
createCoreReplicationMetrics,
20+
initializeCoreReplicationMetrics,
21+
reduceBucket,
22+
storage
23+
} from '@powersync/service-core';
1824
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1925
import * as mongo_storage from '@powersync/service-module-mongodb-storage';
2026
import * as postgres_storage from '@powersync/service-module-postgres-storage';
2127
import * as timers from 'node:timers/promises';
22-
import { ReplicationAbortedError } from '@powersync/lib-services-framework';
28+
import { WalStreamTestContext } from './wal_stream_utils.js';
2329

2430
describe.skipIf(!(env.CI || env.SLOW_TESTS))('slow tests', function () {
2531
describeWithStorage({ timeout: 120_000 }, function (factory) {
@@ -300,7 +306,7 @@ bucket_definitions:
300306
//
301307
// If the first LSN does not correctly match with the first replication transaction,
302308
// we may miss some updates.
303-
test('repeated initial replication', { timeout: TEST_DURATION_MS + TIMEOUT_MARGIN_MS }, async () => {
309+
test('repeated initial replication (1)', { timeout: TEST_DURATION_MS + TIMEOUT_MARGIN_MS }, async () => {
304310
const pool = await connectPgPool();
305311
await clearTestDb(pool);
306312
await using f = await factory();
@@ -348,7 +354,7 @@ bucket_definitions:
348354

349355
await storage.clear();
350356

351-
// 3. Start initial replication, then streaming, but don't wait for any of this
357+
// 3. Start replication, but don't wait for it
352358
let initialReplicationDone = false;
353359
streamPromise = walStream.replicate();
354360
walStream
@@ -408,4 +414,94 @@ bucket_definitions:
408414
await connections.end();
409415
}
410416
});
417+
418+
// Test repeatedly performing initial replication while deleting data.
419+
//
420+
// This specifically checks for data in the initial snapshot being deleted while snapshotting.
421+
test('repeated initial replication with deletes', { timeout: TEST_DURATION_MS + TIMEOUT_MARGIN_MS }, async () => {
422+
const syncRuleContent = `
423+
bucket_definitions:
424+
global:
425+
data:
426+
- SELECT id, description FROM "test_data"
427+
`;
428+
429+
const start = Date.now();
430+
let i = 0;
431+
432+
while (Date.now() - start < TEST_DURATION_MS) {
433+
i += 1;
434+
435+
// 1. Each iteration starts with a clean slate
436+
await using context = await WalStreamTestContext.open(factory, {
437+
walStreamOptions: { snapshotChunkLength: 100 }
438+
});
439+
const pool = context.pool;
440+
441+
// Introduce an artificial delay in snapshot queries, to make it more likely to reproduce an
442+
// issue.
443+
const originalSnapshotConnectionFn = context.connectionManager.snapshotConnection;
444+
context.connectionManager.snapshotConnection = async () => {
445+
const conn = await originalSnapshotConnectionFn.call(context.connectionManager);
446+
// Wrap streaming query to add delays to snapshots
447+
const originalStream = conn.stream;
448+
conn.stream = async function* (...args: any[]) {
449+
const delay = Math.random() * 20;
450+
yield* originalStream.call(this, ...args);
451+
await new Promise((resolve) => setTimeout(resolve, delay));
452+
};
453+
return conn;
454+
};
455+
456+
await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`);
457+
await context.updateSyncRules(syncRuleContent);
458+
459+
let statements: pgwire.Statement[] = [];
460+
461+
const n = Math.floor(Math.random() * 200);
462+
for (let i = 0; i < n; i++) {
463+
statements.push({
464+
statement: `INSERT INTO test_data(description) VALUES('test_init') RETURNING id`
465+
});
466+
}
467+
const results = await pool.query(...statements);
468+
const ids = new Set(
469+
results.results.map((sub) => {
470+
return sub.rows[0][0] as string;
471+
})
472+
);
473+
474+
// 3. Start replication, but don't wait for it
475+
let initialReplicationDone = false;
476+
477+
streamPromise = context.replicateSnapshot().finally(() => {
478+
initialReplicationDone = true;
479+
});
480+
481+
// 4. While initial replication is still running, delete random rows
482+
while (!initialReplicationDone && ids.size > 0) {
483+
let statements: pgwire.Statement[] = [];
484+
485+
const m = Math.floor(Math.random() * 10) + 1;
486+
const idArray = Array.from(ids);
487+
for (let i = 0; i < m; i++) {
488+
const id = idArray[Math.floor(Math.random() * idArray.length)];
489+
statements.push({
490+
statement: `DELETE FROM test_data WHERE id = $1`,
491+
params: [{ type: 'uuid', value: id }]
492+
});
493+
ids.delete(id);
494+
}
495+
await pool.query(...statements);
496+
await new Promise((resolve) => setTimeout(resolve, Math.random() * 10));
497+
}
498+
499+
await streamPromise;
500+
501+
// 5. Once initial replication is done, wait for the streaming changes to complete syncing.
502+
const data = await context.getBucketData('global[]', 0n);
503+
const normalized = reduceBucket(data).filter((op) => op.op !== 'CLEAR');
504+
expect(normalized.length).toEqual(ids.size);
505+
}
506+
});
411507
}

0 commit comments

Comments
 (0)