Skip to content

Commit effa749

Browse files
committed
Better handling of deadlocks on current transactions.
1 parent 4958054 commit effa749

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,8 +1118,9 @@ export class PostgresBucketBatch
11181118
return await callback(db);
11191119
});
11201120
} catch (err) {
1121-
// Serialization failure, retry
1122-
if (err[Symbol.for('pg.ErrorCode')] === '40001' && Date.now() < lastTry) {
1121+
const code = err[Symbol.for('pg.ErrorCode')];
1122+
if ((code == '40001' || code == '40P01') && Date.now() < lastTry) {
1123+
// Serialization (lock) failure, retry
11231124
this.logger.warn(`Serialization failure during replication transaction, retrying: ${err.message}`);
11241125
await timers.setTimeout(100 + Math.random() * 200);
11251126
continue;

modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,12 @@ export class PostgresPersistedBatch {
269269
} updates, ${Math.round(this.currentSize / 1024)}kb.`
270270
);
271271

272-
await this.flushBucketData(db);
273-
await this.flushParameterData(db);
272+
// Flush current_data first, since this is where lock errors are most likely to occur, and we
273+
// want to detect those as soon as possible.
274274
await this.flushCurrentData(db);
275275

276+
await this.flushBucketData(db);
277+
await this.flushParameterData(db);
276278
this.bucketDataInserts = [];
277279
this.parameterDataInserts = [];
278280
this.currentDataInserts = new Map();
@@ -357,6 +359,18 @@ export class PostgresPersistedBatch {
357359

358360
protected async flushCurrentData(db: lib_postgres.WrappedConnection) {
359361
if (this.currentDataInserts.size > 0) {
362+
const updates = Array.from(this.currentDataInserts.values());
363+
// Sort by source_table, source_key to ensure consistent order.
364+
// While order of updates don't directly matter, using a consistent order helps to reduce 40P01 deadlock errors.
365+
// We may still have deadlocks between deletes and inserts, but those should be less frequent.
366+
updates.sort((a, b) => {
367+
if (a.source_table < b.source_table) return -1;
368+
if (a.source_table > b.source_table) return 1;
369+
if (a.source_key < b.source_key) return -1;
370+
if (a.source_key > b.source_key) return 1;
371+
return 0;
372+
});
373+
360374
await db.sql`
361375
INSERT INTO
362376
current_data (
@@ -385,7 +399,7 @@ export class PostgresPersistedBatch {
385399
ELSE NULL
386400
END AS pending_delete
387401
FROM
388-
json_to_recordset(${{ type: 'json', value: Array.from(this.currentDataInserts.values()) }}::json) AS t (
402+
json_to_recordset(${{ type: 'json', value: updates }}::json) AS t (
389403
group_id integer,
390404
source_table text,
391405
source_key text, -- Input as hex string
@@ -404,6 +418,16 @@ export class PostgresPersistedBatch {
404418
}
405419

406420
if (this.currentDataDeletes.size > 0) {
421+
const deletes = Array.from(this.currentDataDeletes.values());
422+
// Same sorting as for inserts
423+
deletes.sort((a, b) => {
424+
if (a.source_table < b.source_table) return -1;
425+
if (a.source_table > b.source_table) return 1;
426+
if (a.source_key_hex < b.source_key_hex) return -1;
427+
if (a.source_key_hex > b.source_key_hex) return 1;
428+
return 0;
429+
});
430+
407431
await db.sql`
408432
WITH
409433
conditions AS (
@@ -413,7 +437,7 @@ export class PostgresPersistedBatch {
413437
FROM
414438
jsonb_to_recordset(${{
415439
type: 'jsonb',
416-
value: Array.from(this.currentDataDeletes.values())
440+
value: deletes
417441
}}::jsonb) AS t (source_table text, source_key_hex text)
418442
)
419443
DELETE FROM current_data USING conditions

0 commit comments

Comments
 (0)