Skip to content

Commit 3780223

Browse files
Re-open database as soon as it's closed. Trigger uploads if database has closed.
1 parent deaf83c commit 3780223

File tree

4 files changed

+100
-32
lines changed

4 files changed

+100
-32
lines changed

packages/web/src/db/adapters/AsyncDatabaseConnection.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
1717
*/
1818
export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void;
1919

20+
export class ConnectionClosedError extends Error {
21+
constructor(message: string) {
22+
super(message);
23+
this.name = 'ConnectionClosedError';
24+
}
25+
}
26+
2027
/**
2128
* @internal
2229
* An async Database connection which provides basic async SQL methods.

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@ import {
1111
type ILogger
1212
} from '@powersync/common';
1313
import { getNavigatorLocks } from '../..//shared/navigator';
14-
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
14+
import { AsyncDatabaseConnection, ConnectionClosedError } from './AsyncDatabaseConnection';
1515
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
16-
import {
17-
WorkerConnectionClosedError,
18-
WorkerWrappedAsyncDatabaseConnection
19-
} from './WorkerWrappedAsyncDatabaseConnection';
16+
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';
2017
import { WASQLiteVFS } from './wa-sqlite/WASQLiteConnection';
2118
import { ResolvedWASQLiteOpenFactoryOptions } from './wa-sqlite/WASQLiteOpenFactory';
2219
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
@@ -30,10 +27,15 @@ export interface LockedAsyncDatabaseAdapterOptions {
3027
debugMode?: boolean;
3128
logger?: ILogger;
3229
defaultLockTimeoutMs?: number;
30+
reOpenOnConnectionClosed?: boolean;
3331
}
3432

3533
export type LockedAsyncDatabaseAdapterListener = DBAdapterListener & {
3634
initialized?: () => void;
35+
/**
36+
* Fired when the database is re-opened after being closed.
37+
*/
38+
databaseReOpened?: () => void;
3739
};
3840

3941
/**
@@ -56,6 +58,7 @@ export class LockedAsyncDatabaseAdapter
5658
protected pendingAbortControllers: Set<AbortController>;
5759
protected requiresHolds: boolean | null;
5860
protected requiresReOpen: boolean;
61+
protected databaseOpenPromise: Promise<void> | null = null;
5962

6063
closing: boolean;
6164
closed: boolean;
@@ -116,10 +119,15 @@ export class LockedAsyncDatabaseAdapter
116119
this._disposeTableChangeListener?.();
117120
this._disposeTableChangeListener = null;
118121

122+
const isReOpen = !!this._db;
123+
119124
this._db = await this.options.openConnection();
120125
await this._db.init();
121126
this._config = await this._db.getConfig();
122127
await this.registerOnChangeListener(this._db);
128+
if (isReOpen) {
129+
this.iterateListeners((cb) => cb.databaseReOpened?.());
130+
}
123131
/**
124132
* This is only required for the long-lived shared IndexedDB connections.
125133
*/
@@ -253,18 +261,28 @@ export class LockedAsyncDatabaseAdapter
253261
}
254262
let holdId: string | null = null;
255263
try {
256-
if (this.requiresReOpen) {
257-
this.logger.debug('Re-opening database');
258-
await this.openInternalDB();
259-
this.logger.debug('Database re-opened');
260-
this.requiresReOpen = false;
264+
// The database is being opened in the background. Wait for it here.
265+
if (this.databaseOpenPromise) {
266+
try {
267+
await this.databaseOpenPromise;
268+
} catch (ex) {
269+
// This will cause a retry of opening the database.
270+
const wrappedError = new ConnectionClosedError('Could not open database');
271+
wrappedError.cause = ex;
272+
throw wrappedError;
273+
}
261274
}
262275

263276
holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
264277
return await callback();
265278
} catch (ex) {
266-
if (ex instanceof WorkerConnectionClosedError) {
267-
this.requiresReOpen = true;
279+
if (ex instanceof ConnectionClosedError) {
280+
if (this.options.reOpenOnConnectionClosed && !this.databaseOpenPromise && !this.closing) {
281+
// Immediately re-open the database. We need to miss as little table updates as possible.
282+
this.databaseOpenPromise = this.openInternalDB().finally(() => {
283+
this.databaseOpenPromise = null;
284+
});
285+
}
268286
}
269287
throw ex;
270288
} finally {

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as Comlink from 'comlink';
22
import {
33
AsyncDatabaseConnection,
4+
ConnectionClosedError,
45
OnTableChangeCallback,
56
OpenAsyncDatabaseConnection,
67
ProxiedQueryResult
@@ -23,13 +24,6 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
2324
onClose?: () => void;
2425
};
2526

26-
export class WorkerConnectionClosedError extends Error {
27-
constructor(message: string) {
28-
super(message);
29-
this.name = 'WorkerConnectionClosedError';
30-
}
31-
}
32-
3327
/**
3428
* Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy
3529
* functions for worker listeners.
@@ -84,13 +78,13 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
8478
if (controller) {
8579
return new Promise((resolve, reject) => {
8680
if (controller.signal.aborted) {
87-
reject(new WorkerConnectionClosedError('Called operation on closed remote'));
81+
reject(new ConnectionClosedError('Called operation on closed remote'));
8882
// Don't run the operation if we're going to reject
8983
return;
9084
}
9185

9286
function handleAbort() {
93-
reject(new WorkerConnectionClosedError('Remote peer closed with request in flight'));
87+
reject(new ConnectionClosedError('Remote peer closed with request in flight'));
9488
}
9589

9690
function completePromise(action: () => void) {

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,20 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
249249
// Gets a connection from the clients when a new connection is requested.
250250
return await this.openInternalDB();
251251
},
252-
logger: this.logger
252+
logger: this.logger,
253+
reOpenOnConnectionClosed: true
253254
});
254255
this.distributedDB = lockedAdapter;
255256
await lockedAdapter.init();
256257

258+
lockedAdapter.registerListener({
259+
databaseReOpened: () => {
260+
// We may have missed some table updates while the database was closed.
261+
// We can poke the crud in case we missed any updates.
262+
this.connectionManager.syncStreamImplementation?.triggerCrudUpload();
263+
}
264+
});
265+
257266
self.onerror = (event) => {
258267
// Share any uncaught events on the broadcast logger
259268
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
@@ -480,7 +489,31 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
480489
throw new Error(`Could not open DB connection since no client is connected.`);
481490
}
482491

483-
const workerPort = await withTimeout(() => lastClient.clientProvider.getDBWorkerPort(), 5_000);
492+
/**
493+
* Handle cases where the client might close while opening a connection.
494+
*/
495+
const abortController = new AbortController();
496+
const closeListener = () => {
497+
abortController.abort();
498+
};
499+
500+
const removeCloseListener = () => {
501+
const index = lastClient.closeListeners.indexOf(closeListener);
502+
if (index >= 0) {
503+
lastClient.closeListeners.splice(index, 1);
504+
}
505+
};
506+
507+
lastClient.closeListeners.push(closeListener);
508+
509+
const workerPort = await withAbort(
510+
() => lastClient.clientProvider.getDBWorkerPort(),
511+
abortController.signal
512+
).catch((ex) => {
513+
removeCloseListener();
514+
throw ex;
515+
});
516+
484517
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
485518
const identifier = this.syncParams!.dbParams.dbFilename;
486519

@@ -490,7 +523,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
490523
* We typically execute the closeListeners using the portMutex in a different context.
491524
* We can't rely on the closeListeners to abort the operation if the tab is closed.
492525
*/
493-
const db = await withTimeout(() => remote(this.syncParams!.dbParams), 5_000);
526+
const db = await withAbort(() => remote(this.syncParams!.dbParams), abortController.signal).finally(() => {
527+
// We can remove the close listener here since we no longer need it past this point.
528+
removeCloseListener();
529+
});
494530

495531
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
496532
remote,
@@ -543,16 +579,29 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
543579
}
544580

545581
/**
546-
* Runs the action with a timeout. If the action takes longer than the timeout, the promise will be rejected.
582+
* Runs the action with an abort controller.
547583
*/
548-
function withTimeout<T>(action: () => Promise<T>, timeoutMs: number): Promise<T> {
584+
function withAbort<T>(action: () => Promise<T>, signal: AbortSignal): Promise<T> {
549585
return new Promise((resolve, reject) => {
550-
const timeout = setTimeout(() => {
551-
reject(new Error('Timeout waiting for action'));
552-
}, timeoutMs);
586+
if (signal.aborted) {
587+
reject(new AbortOperation('Operation aborted by abort controller'));
588+
return;
589+
}
590+
591+
function handleAbort() {
592+
signal.removeEventListener('abort', handleAbort);
593+
reject(new AbortOperation('Operation aborted by abort controller'));
594+
}
595+
596+
signal.addEventListener('abort', handleAbort, { once: true });
597+
598+
function completePromise(action: () => void) {
599+
signal.removeEventListener('abort', handleAbort);
600+
action();
601+
}
602+
553603
action()
554-
.then(resolve)
555-
.catch(reject)
556-
.finally(() => clearTimeout(timeout));
604+
.then((data) => completePromise(() => resolve(data)))
605+
.catch((e) => completePromise(() => reject(e)));
557606
});
558607
}

0 commit comments

Comments
 (0)