Skip to content

Commit e427fb9

Browse files
committed
Implement current_data soft deletes on postgres storage; add tests.
1 parent b5a4428 commit e427fb9

File tree

6 files changed

+335
-47
lines changed

6 files changed

+335
-47
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { migrations } from '@powersync/service-core';
2+
import * as storage from '../../../storage/storage-index.js';
3+
import { MongoStorageConfig } from '../../../types/types.js';
4+
5+
const INDEX_NAME = 'pending_delete';
6+
7+
export const up: migrations.PowerSyncMigrationFunction = async (context) => {
8+
const {
9+
service_context: { configuration }
10+
} = context;
11+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
12+
13+
try {
14+
await db.current_data.createIndex(
15+
{
16+
'_id.g': 1,
17+
pending_delete: 1
18+
},
19+
{
20+
partialFilterExpression: { pending_delete: { $exists: true } },
21+
name: INDEX_NAME
22+
}
23+
);
24+
} finally {
25+
await db.client.close();
26+
}
27+
};
28+
29+
export const down: migrations.PowerSyncMigrationFunction = async (context) => {
30+
const {
31+
service_context: { configuration }
32+
} = context;
33+
34+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
35+
36+
try {
37+
if (await db.current_data.indexExists(INDEX_NAME)) {
38+
await db.current_data.dropIndex(INDEX_NAME);
39+
}
40+
} finally {
41+
await db.client.close();
42+
}
43+
};

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,8 @@ export class MongoBucketBatch
826826

827827
private async cleanupCurrentData(lastCheckpoint: bigint) {
828828
const result = await this.db.current_data.deleteMany({
829-
pending_delete: { $exists: true, $lte: lastCheckpoint ?? 1_000_000 }
829+
'_id.g': this.group_id,
830+
pending_delete: { $exists: true, $lte: lastCheckpoint }
830831
});
831832
if (result.deletedCount > 0) {
832833
this.logger.info(

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ export class PostgresBucketBatch
219219
persistedBatch.deleteCurrentData({
220220
// This is serialized since we got it from a DB query
221221
serialized_source_key: value.source_key,
222-
source_table_id: sourceTable.id
222+
source_table_id: sourceTable.id,
223+
checkpoint_greater_than: 1n
223224
});
224225
}
225226
}
@@ -456,6 +457,14 @@ export class PostgresBucketBatch
456457
this.logger.info(
457458
`Created checkpoint at ${lsn}. Persisted op: ${result.last_checkpoint} (${this.persisted_op}). keepalive: ${result.keepalive_op}`
458459
);
460+
461+
await this.db.sql`
462+
DELETE FROM current_data
463+
WHERE
464+
group_id = ${{ type: 'int4', value: this.group_id }}
465+
AND pending_delete IS NOT NULL
466+
AND pending_delete <= ${{ type: 'int8', value: result.last_checkpoint }}
467+
`.execute();
459468
} else {
460469
this.logger.info(
461470
`Skipped empty checkpoint at ${lsn}. Persisted op: ${result.last_checkpoint}. keepalive: ${result.keepalive_op}`
@@ -1003,16 +1012,18 @@ export class PostgresBucketBatch
10031012
data: afterData!,
10041013
source_table: sourceTable.id,
10051014
buckets: newBuckets,
1006-
lookups: newLookups
1015+
lookups: newLookups,
1016+
pending_delete: null
10071017
};
10081018
persistedBatch.upsertCurrentData(result);
10091019
}
10101020

10111021
if (afterId == null || !storage.replicaIdEquals(beforeId, afterId)) {
10121022
// Either a delete (afterId == null), or replaced the old replication id
10131023
persistedBatch.deleteCurrentData({
1014-
source_table_id: record.sourceTable.id,
1015-
source_key: beforeId!
1024+
source_table_id: sourceTable.id,
1025+
source_key: beforeId!,
1026+
checkpoint_greater_than: 1n
10161027
});
10171028
}
10181029

modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import { logger } from '@powersync/lib-services-framework';
3-
import { storage, utils } from '@powersync/service-core';
3+
import { bson, InternalOpId, storage, utils } from '@powersync/service-core';
44
import { JSONBig } from '@powersync/service-jsonbig';
55
import * as sync_rules from '@powersync/service-sync-rules';
66
import { models, RequiredOperationBatchLimits } from '../../types/types.js';
@@ -24,7 +24,7 @@ export type SaveParameterDataOptions = {
2424
};
2525

2626
export type DeleteCurrentDataOptions = {
27-
source_table_id: bigint;
27+
source_table_id: any;
2828
/**
2929
* ReplicaID which needs to be serialized in order to be queried
3030
* or inserted into the DB
@@ -40,6 +40,8 @@ export type PostgresPersistedBatchOptions = RequiredOperationBatchLimits & {
4040
group_id: number;
4141
};
4242

43+
const EMPTY_DATA = Buffer.from(bson.serialize({}));
44+
4345
export class PostgresPersistedBatch {
4446
group_id: number;
4547

@@ -56,7 +58,6 @@ export class PostgresPersistedBatch {
5658
*/
5759
protected bucketDataInserts: models.BucketData[];
5860
protected parameterDataInserts: models.BucketParameters[];
59-
protected currentDataDeletes: Pick<models.CurrentData, 'group_id' | 'source_key' | 'source_table'>[];
6061
/**
6162
* This is stored as a map to avoid multiple inserts (or conflicts) for the same key
6263
*/
@@ -70,7 +71,6 @@ export class PostgresPersistedBatch {
7071

7172
this.bucketDataInserts = [];
7273
this.parameterDataInserts = [];
73-
this.currentDataDeletes = [];
7474
this.currentDataInserts = new Map();
7575
this.currentSize = 0;
7676
}
@@ -180,13 +180,15 @@ export class PostgresPersistedBatch {
180180
}
181181

182182
deleteCurrentData(options: DeleteCurrentDataOptions) {
183-
const serializedReplicaId = options.serialized_source_key ?? storage.serializeReplicaId(options.source_key);
184-
this.currentDataDeletes.push({
183+
return this.upsertCurrentData({
185184
group_id: this.group_id,
186-
source_table: options.source_table_id.toString(),
187-
source_key: serializedReplicaId.toString('hex')
185+
source_table: options.source_table_id,
186+
source_key: options.source_key,
187+
buckets: [],
188+
data: EMPTY_DATA,
189+
lookups: [],
190+
pending_delete: 1n
188191
});
189-
this.currentSize += serializedReplicaId.byteLength + 100;
190192
}
191193

192194
upsertCurrentData(options: models.CurrentDataDecoded) {
@@ -212,7 +214,8 @@ export class PostgresPersistedBatch {
212214
source_key: hexReplicaId,
213215
buckets: serializedBuckets,
214216
data: options.data.toString('hex'),
215-
lookups: options.lookups.map((l) => l.toString('hex'))
217+
lookups: options.lookups.map((l) => l.toString('hex')),
218+
pending_delete: options.pending_delete?.toString() ?? null
216219
});
217220

218221
this.currentSize +=
@@ -230,15 +233,14 @@ export class PostgresPersistedBatch {
230233
this.currentSize >= this.maxTransactionBatchSize ||
231234
this.bucketDataInserts.length >= this.maxTransactionDocCount ||
232235
this.currentDataInserts.size >= this.maxTransactionDocCount ||
233-
this.currentDataDeletes.length >= this.maxTransactionDocCount ||
234236
this.parameterDataInserts.length >= this.maxTransactionDocCount
235237
);
236238
}
237239

238240
async flush(db: lib_postgres.WrappedConnection) {
239241
logger.info(
240242
`powersync_${this.group_id} Flushed ${this.bucketDataInserts.length} + ${this.parameterDataInserts.length} + ${
241-
this.currentDataInserts.size + this.currentDataDeletes.length
243+
this.currentDataInserts.size
242244
} updates, ${Math.round(this.currentSize / 1024)}kb.`
243245
);
244246

@@ -248,7 +250,6 @@ export class PostgresPersistedBatch {
248250

249251
this.bucketDataInserts = [];
250252
this.parameterDataInserts = [];
251-
this.currentDataDeletes = [];
252253
this.currentDataInserts = new Map();
253254
this.currentSize = 0;
254255
}
@@ -338,7 +339,8 @@ export class PostgresPersistedBatch {
338339
source_key,
339340
buckets,
340341
data,
341-
lookups
342+
lookups,
343+
pending_delete
342344
)
343345
SELECT
344346
group_id,
@@ -351,44 +353,27 @@ export class PostgresPersistedBatch {
351353
decode(element, 'hex')
352354
FROM
353355
unnest(lookups) AS element
354-
) AS lookups
356+
) AS lookups,
357+
CASE
358+
WHEN pending_delete IS NOT NULL THEN nextval('op_id_sequence')
359+
ELSE NULL
360+
END AS pending_delete
355361
FROM
356362
json_to_recordset(${{ type: 'json', value: Array.from(this.currentDataInserts.values()) }}::json) AS t (
357363
group_id integer,
358364
source_table text,
359365
source_key text, -- Input as hex string
360366
buckets text,
361367
data text, -- Input as hex string
362-
lookups TEXT[] -- Input as stringified JSONB array of hex strings
368+
lookups TEXT[], -- Input as stringified JSONB array of hex strings
369+
pending_delete bigint
363370
)
364371
ON CONFLICT (group_id, source_table, source_key) DO UPDATE
365372
SET
366373
buckets = EXCLUDED.buckets,
367374
data = EXCLUDED.data,
368-
lookups = EXCLUDED.lookups;
369-
`.execute();
370-
}
371-
372-
if (this.currentDataDeletes.length > 0) {
373-
await db.sql`
374-
WITH
375-
conditions AS (
376-
SELECT
377-
group_id,
378-
source_table,
379-
decode(source_key, 'hex') AS source_key -- Decode hex to bytea
380-
FROM
381-
jsonb_to_recordset(${{ type: 'jsonb', value: this.currentDataDeletes }}::jsonb) AS t (
382-
group_id integer,
383-
source_table text,
384-
source_key text -- Input as hex string
385-
)
386-
)
387-
DELETE FROM current_data USING conditions
388-
WHERE
389-
current_data.group_id = conditions.group_id
390-
AND current_data.source_table = conditions.source_table
391-
AND current_data.source_key = conditions.source_key;
375+
lookups = EXCLUDED.lookups,
376+
pending_delete = EXCLUDED.pending_delete;
392377
`.execute();
393378
}
394379
}

modules/module-postgres-storage/src/types/models/CurrentData.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as t from 'ts-codec';
2-
import { hexBuffer, jsonb, pgwire_number } from '../codecs.js';
2+
import { bigint, hexBuffer, jsonb, pgwire_number } from '../codecs.js';
33

44
export const CurrentBucket = t.object({
55
bucket: t.string,
@@ -16,7 +16,8 @@ export const CurrentData = t.object({
1616
group_id: pgwire_number,
1717
lookups: t.array(hexBuffer),
1818
source_key: hexBuffer,
19-
source_table: t.string
19+
source_table: t.string,
20+
pending_delete: t.Null.or(bigint)
2021
});
2122

2223
export type CurrentData = t.Encoded<typeof CurrentData>;

0 commit comments

Comments
 (0)