@@ -19,6 +19,8 @@ import 'package:powersync_core/src/streaming_sync.dart';
1919import 'package:powersync_core/src/sync_status.dart' ;
2020import 'package:sqlite_async/sqlite3_common.dart' ;
2121import 'package:sqlite_async/sqlite_async.dart' ;
22+ // ignore: implementation_imports
23+ import 'package:sqlite_async/src/native/native_isolate_mutex.dart' ;
2224
2325/// A PowerSync managed database.
2426///
@@ -44,6 +46,8 @@ class PowerSyncDatabaseImpl
4446 @protected
4547 late Future <void > isInitialized;
4648
49+ final SimpleMutex _syncMutex = SimpleMutex (), _crudMutex = SimpleMutex ();
50+
4751 @override
4852
4953 /// The Logger used by this [PowerSyncDatabase] .
@@ -224,7 +228,13 @@ class PowerSyncDatabaseImpl
224228 await Isolate .spawn (
225229 _syncIsolate,
226230 _PowerSyncDatabaseIsolateArgs (
227- receiveMessages.sendPort, dbRef, retryDelay, clientParams),
231+ receiveMessages.sendPort,
232+ dbRef,
233+ retryDelay,
234+ clientParams,
235+ _crudMutex.shared,
236+ _syncMutex.shared,
237+ ),
228238 debugName: 'Sync ${database .openFactory .path }' ,
229239 onError: receiveUnhandledErrors.sendPort,
230240 errorsAreFatal: true ,
@@ -259,16 +269,32 @@ class PowerSyncDatabaseImpl
259269 return database.writeLock (callback,
260270 debugContext: debugContext, lockTimeout: lockTimeout);
261271 }
272+
273+ @override
274+ Future <void > close () async {
275+ await super .close ();
276+
277+ await _crudMutex.close ();
278+ await _crudMutex.close ();
279+ }
262280}
263281
264282class _PowerSyncDatabaseIsolateArgs {
265283 final SendPort sPort;
266284 final IsolateConnectionFactory dbRef;
267285 final Duration retryDelay;
268286 final Map <String , dynamic >? parameters;
287+ final SerializedMutex crudMutex;
288+ final SerializedMutex syncMutex;
269289
270290 _PowerSyncDatabaseIsolateArgs (
271- this .sPort, this .dbRef, this .retryDelay, this .parameters);
291+ this .sPort,
292+ this .dbRef,
293+ this .retryDelay,
294+ this .parameters,
295+ this .crudMutex,
296+ this .syncMutex,
297+ );
272298}
273299
274300Future <void > _syncIsolate (_PowerSyncDatabaseIsolateArgs args) async {
@@ -277,6 +303,9 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
277303 StreamController <String > crudUpdateController = StreamController .broadcast ();
278304 final upstreamDbClient = args.dbRef.upstreamPort.open ();
279305
306+ final crudMutex = args.crudMutex.open ();
307+ final syncMutex = args.syncMutex.open ();
308+
280309 CommonDatabase ? db;
281310 final Mutex mutex = args.dbRef.mutex.open ();
282311 StreamingSyncImplementation ? openedStreamingSync;
@@ -294,6 +323,8 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
294323 // It needs to be closed before killing the isolate
295324 // in order to free the mutex for other operations.
296325 await mutex.close ();
326+ await crudMutex.close ();
327+ await syncMutex.close ();
297328 rPort.close ();
298329
299330 // TODO: If we closed our resources properly, this wouldn't be necessary...
@@ -348,14 +379,17 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
348379
349380 final storage = BucketStorage (connection);
350381 final sync = StreamingSyncImplementation (
351- adapter: storage,
352- credentialsCallback: loadCredentials,
353- invalidCredentialsCallback: invalidateCredentials,
354- uploadCrud: uploadCrud,
355- crudUpdateTriggerStream: crudUpdateController.stream,
356- retryDelay: args.retryDelay,
357- client: http.Client (),
358- syncParameters: args.parameters);
382+ adapter: storage,
383+ credentialsCallback: loadCredentials,
384+ invalidCredentialsCallback: invalidateCredentials,
385+ uploadCrud: uploadCrud,
386+ crudUpdateTriggerStream: crudUpdateController.stream,
387+ retryDelay: args.retryDelay,
388+ client: http.Client (),
389+ syncParameters: args.parameters,
390+ crudMutex: crudMutex,
391+ syncMutex: syncMutex,
392+ );
359393 openedStreamingSync = sync ;
360394 sync .streamingSync ();
361395 sync .statusStream.listen ((event) {
0 commit comments