Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
fda46b6
Refactor replication connections.
rkistner Nov 17, 2025
7a812ee
Start streaming concurrently - lets see what breaks.
rkistner Nov 17, 2025
be3390a
Split out snapshot logic from streaming logic.
rkistner Nov 17, 2025
fcd539d
Quick workaround for now.
rkistner Nov 17, 2025
7701a9a
Fix test issues.
rkistner Nov 17, 2025
7e3b0d1
WIP.
rkistner Nov 18, 2025
98da657
Split out "snapshot done" check.
rkistner Nov 18, 2025
db5d578
Tweaks to tests.
rkistner Nov 18, 2025
b7a5d5f
Fix for postgres storage.
rkistner Nov 18, 2025
df0d4cb
Refactor for snapshotting.
rkistner Nov 18, 2025
ce500a1
Refactor commit logic to better handle concurrent replication.
rkistner Nov 20, 2025
f5ec031
Cover case of no tables to snapshot.
rkistner Nov 20, 2025
049b32a
Implement missing method for Postgres storage.
rkistner Nov 20, 2025
531e887
Port changes to postgres storage.
rkistner Nov 20, 2025
64e2abc
Fix data storage tests - markSnapshotComplete is required.
rkistner Nov 20, 2025
333bdad
Fix storage sync tests.
rkistner Nov 20, 2025
29f4302
Fix snapshot_lsn handling.
rkistner Nov 20, 2025
cd19ba2
Fix snapshot_lsn in postgres storage.
rkistner Nov 20, 2025
8dc726f
Fix test promise handling.
rkistner Nov 20, 2025
ac790a7
Fix more tests.
rkistner Nov 20, 2025
12d60ed
Make schema test more stable.
rkistner Nov 24, 2025
85aaccc
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 1, 2025
f09dea3
Refactor streaming promise management.
rkistner Dec 1, 2025
b3a23ef
Fix more tests.
rkistner Dec 1, 2025
8b9ef4f
More stable abort handling.
rkistner Dec 1, 2025
1d0088f
Skip empty checkpoints on Postgres again.
rkistner Dec 1, 2025
1c85ca2
More test fixes.
rkistner Dec 1, 2025
8a55aa1
Add tests for empty checkpoints.
rkistner Dec 1, 2025
77cd69b
Implement createEmptyCheckpoints filter for mongodb storage.
rkistner Dec 1, 2025
0041ff3
Implement soft deletes for current_data.
rkistner Dec 2, 2025
b5a4428
Cleaner abort handling.
rkistner Dec 2, 2025
44afe81
Implement current_data soft deletes on postgres storage; add tests.
rkistner Dec 2, 2025
df8b2dc
Fix compacting tests.
rkistner Dec 2, 2025
8f33247
Add missing migration.
rkistner Dec 2, 2025
1d17232
Fix more tests.
rkistner Dec 2, 2025
d3bfdd6
Update snapshot.
rkistner Dec 2, 2025
b68ffa6
Improve handling of abort errors.
rkistner Dec 2, 2025
8cc27bd
Improve transaction isolation for postgres storage batches to avoid
rkistner Dec 2, 2025
ec82bb6
More abort error improvements.
rkistner Dec 2, 2025
3129507
Fix truncate on MongoDB storage.
rkistner Dec 3, 2025
3c77f43
Fix postgres storage truncate; typed SourceTable id.
rkistner Dec 3, 2025
7b87214
Fix build errors.
rkistner Dec 3, 2025
4961662
Fix another build error.
rkistner Dec 3, 2025
4a6a5ae
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 3, 2025
d45c212
Workaround for tests.
rkistner Dec 3, 2025
d968d39
Same workaround in different test.
rkistner Dec 3, 2025
4958054
Better fix for tests.
rkistner Dec 4, 2025
effa749
Better handling of deadlocks on current transactions.
rkistner Dec 4, 2025
2a7a572
Another error test tweak.
rkistner Dec 4, 2025
bfc1259
Restructure TEST_TABLE for tests.
rkistner Dec 4, 2025
6a5590f
Checksums are different for postgres now.
rkistner Dec 4, 2025
eefddef
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 4, 2025
9dab9ef
Fix merge conflict.
rkistner Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/lib-services/src/logger/logger-index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './Logger.js';
export { Logger } from 'winston';
export { createLogger, format, transports } from 'winston';
17 changes: 12 additions & 5 deletions libs/lib-services/src/migrations/AbstractMigrationAgent.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { LockManager } from '../locks/LockManager.js';
import { logger } from '../logger/Logger.js';
import { logger as defaultLogger, Logger } from '../logger/logger-index.js';
import * as defs from './migration-definitions.js';

export type MigrationParams<Generics extends MigrationAgentGenerics = MigrationAgentGenerics> = {
count?: number;
direction: defs.Direction;
migrationContext?: Generics['MIGRATION_CONTEXT'];
logger?: Logger;
};

type WriteLogsParams = {
Expand All @@ -20,10 +21,12 @@ export type MigrationAgentGenerics = {
export type RunMigrationParams<Generics extends MigrationAgentGenerics = MigrationAgentGenerics> = MigrationParams & {
migrations: defs.Migration<Generics['MIGRATION_CONTEXT']>[];
maxLockWaitMs?: number;
logger?: Logger;
};

type ExecuteParams = RunMigrationParams & {
state?: defs.MigrationState;
logger: Logger;
};

export const DEFAULT_MAX_LOCK_WAIT_MS = 3 * 60 * 1000; // 3 minutes
Expand All @@ -46,9 +49,11 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
async run(params: RunMigrationParams) {
await this.init();

const logger = params.logger ?? defaultLogger;

const { direction, migrations, migrationContext } = params;
// Only one process should execute this at a time.
logger.info('Acquiring lock for migrations');
logger.debug('Acquiring lock for migrations');
const lockHandle = await this.locks.acquire({ max_wait_ms: params.maxLockWaitMs ?? DEFAULT_MAX_LOCK_WAIT_MS });

if (!lockHandle) {
Expand All @@ -75,22 +80,24 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
direction,
migrations,
state,
migrationContext
migrationContext,
logger
});

await this.writeLogsToStore({
log_stream: logStream,
state
});
} finally {
logger.info('Releasing migration lock');
logger.debug('Releasing migration lock');
await releaseLock();
process.removeListener('beforeExit', releaseLock);
logger.info('Done with migrations');
logger.debug('Done with migrations');
}
}

protected async *execute(params: ExecuteParams): AsyncGenerator<defs.ExecutedMigration> {
const logger = params.logger;
const internalMigrations = await this.loadInternalMigrations();
let migrations = [...internalMigrations, ...params.migrations];

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

const INDEX_NAME = 'pending_delete';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.current_data.createIndex(
{
'_id.g': 1,
pending_delete: 1
},
{
partialFilterExpression: { pending_delete: { $exists: true } },
name: INDEX_NAME
}
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.current_data.indexExists(INDEX_NAME)) {
await db.current_data.dropIndex(INDEX_NAME);
}
} finally {
await db.client.close();
}
};
Loading