Skip to content

Commit 6d41200

Browse files
committed
Slightly faster shared mutex.
1 parent c226f1e commit 6d41200

File tree

5 files changed

+116
-196
lines changed

5 files changed

+116
-196
lines changed

lib/src/isolate_completer.dart

Lines changed: 0 additions & 115 deletions
This file was deleted.

lib/src/isolate_connection_factory.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import 'sqlite_open_factory.dart';
66

77
class IsolateConnectionFactory {
88
SqliteOpenFactory openFactory;
9-
Mutex mutex;
9+
SerializedMutex mutex;
1010
SerializedPortClient upstreamPort;
1111

1212
IsolateConnectionFactory(
@@ -17,7 +17,7 @@ class IsolateConnectionFactory {
1717
SqliteConnection open({String? debugName, bool readOnly = false}) {
1818
return SqliteConnectionImpl(
1919
openFactory: openFactory,
20-
mutex: mutex,
20+
mutex: mutex.open(),
2121
upstreamPort: upstreamPort,
2222
readOnly: readOnly,
2323
debugName: debugName,

lib/src/mutex.dart

Lines changed: 70 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,20 @@
22
// https://github.com/tekartik/synchronized.dart
33
// (MIT)
44
import 'dart:async';
5-
import 'dart:isolate';
65

7-
import './isolate_completer.dart';
6+
import 'port_channel.dart';
87

98
abstract class Mutex {
109
factory Mutex() {
1110
return SimpleMutex();
1211
}
1312

14-
factory Mutex.shared() {
15-
return SharedMutex._();
16-
}
17-
1813
/// timeout is a timeout for acquiring the lock, not for the callback
1914
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout});
2015

2116
Future<void> close();
2217
}
2318

24-
int mutexId = 0;
25-
2619
/// Mutex maintains a queue of Future-returning functions that
2720
/// are executed sequentially.
2821
/// The internal lock is not shared across Isolates by default.
@@ -39,7 +32,7 @@ class SimpleMutex implements Mutex {
3932

4033
bool get locked => last != null;
4134

42-
SharedMutex? _shared;
35+
SharedMutexServer? _shared;
4336

4437
@override
4538
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
@@ -104,83 +97,56 @@ class SimpleMutex implements Mutex {
10497
await lock(() async {});
10598
}
10699

107-
SharedMutex get shared {
108-
_shared ??= SharedMutex._withMutex(this);
109-
return _shared!;
100+
SerializedMutex get shared {
101+
_shared ??= SharedMutexServer._withMutex(this);
102+
return _shared!.serialized;
110103
}
111104
}
112105

113-
/// Like Mutex, but can be coped across Isolates.
114-
class SharedMutex implements Mutex {
115-
late final SendPort _lockPort;
106+
class SerializedMutex {
107+
final SerializedPortClient client;
116108

117-
factory SharedMutex._() {
118-
final Mutex mutex = Mutex();
119-
return SharedMutex._withMutex(mutex);
120-
}
109+
const SerializedMutex(this.client);
121110

122-
SharedMutex._withMutex(Mutex mutex) {
123-
final ReceivePort receivePort = ReceivePort();
124-
125-
receivePort.listen((dynamic arg) {
126-
if (arg is _AcquireMessage) {
127-
IsolateResult unlock = IsolateResult();
128-
mutex.lock(() async {
129-
arg.completer.complete(unlock.completer);
130-
await unlock.future;
131-
unlock.close();
132-
});
133-
} else if (arg is _CloseMessage) {
134-
if (arg.isSameIsolate()) {
135-
mutex.lock(() async {
136-
receivePort.close();
137-
arg.port.complete();
138-
});
139-
} else {
140-
arg.port.completeError(AssertionError(
141-
'A Mutex may only be closed from the Isolate that created it'));
142-
}
143-
}
144-
});
145-
_lockPort = receivePort.sendPort;
111+
SharedMutex open() {
112+
return SharedMutex._(client.open());
146113
}
114+
}
147115

148-
@override
149-
Future<void> close() async {
150-
final r = IsolateResult<void>();
151-
_lockPort.send(_CloseMessage(r.completer));
152-
await r.future;
153-
}
116+
class SharedMutex implements Mutex {
117+
final ChildPortClient client;
118+
119+
SharedMutex._(this.client);
154120

155121
@override
156122
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
157123
if (Zone.current[this] != null) {
158124
throw AssertionError('Recursive lock is not allowed');
159125
}
160126
return runZoned(() async {
161-
final releaseCompleter = await acquire(timeout: timeout);
127+
await _acquire(timeout: timeout);
162128
try {
163129
final T result = await callback();
164130
return result;
165131
} finally {
166-
releaseCompleter.complete(true);
132+
_unlock();
167133
}
168134
}, zoneValues: {this: true});
169135
}
170136

171-
Future<PortCompleter> acquire({Duration? timeout}) async {
172-
final r = IsolateResult<PortCompleter>();
173-
_lockPort.send(_AcquireMessage(r.completer));
174-
var lockFuture = r.future;
137+
_unlock() {
138+
client.fire(const _UnlockMessage());
139+
}
140+
141+
Future<void> _acquire({Duration? timeout}) async {
142+
final lockFuture = client.post(const _AcquireMessage());
175143
bool timedout = false;
176144

177-
var handledLockFuture = lockFuture.then((lock) {
178-
lock.addExitHandler();
145+
var handledLockFuture = lockFuture.then((_) {
179146
if (timedout) {
180-
lock.complete();
147+
_unlock();
181148
throw TimeoutException('Failed to acquire lock', timeout);
182149
}
183-
return lock;
184150
});
185151

186152
if (timeout != null) {
@@ -195,23 +161,59 @@ class SharedMutex implements Mutex {
195161
}
196162
return await handledLockFuture;
197163
}
164+
165+
@override
166+
Future<void> close() async {
167+
client.close();
168+
}
198169
}
199170

200-
class _CloseMessage {
201-
final PortCompleter port;
202-
late final int code;
171+
/// Like Mutex, but can be coped across Isolates.
172+
class SharedMutexServer {
173+
Completer? unlock;
174+
late final SerializedMutex serialized;
175+
final Mutex mutex;
176+
177+
late final PortServer server;
178+
179+
factory SharedMutexServer._() {
180+
final Mutex mutex = Mutex();
181+
return SharedMutexServer._withMutex(mutex);
182+
}
203183

204-
_CloseMessage(this.port) {
205-
code = Isolate.current.hashCode;
184+
SharedMutexServer._withMutex(this.mutex) {
185+
server = PortServer((Object? arg) async {
186+
return await _handle(arg);
187+
});
188+
serialized = SerializedMutex(server.client());
206189
}
207190

208-
isSameIsolate() {
209-
return Isolate.current.hashCode == code;
191+
Future<void> _handle(Object? arg) async {
192+
if (arg is _AcquireMessage) {
193+
var lock = Completer();
194+
mutex.lock(() async {
195+
assert(unlock == null);
196+
unlock = Completer();
197+
lock.complete();
198+
await unlock!.future;
199+
unlock = null;
200+
});
201+
await lock.future;
202+
} else if (arg is _UnlockMessage) {
203+
assert(unlock != null);
204+
unlock!.complete();
205+
}
206+
}
207+
208+
void close() async {
209+
server.close();
210210
}
211211
}
212212

213213
class _AcquireMessage {
214-
final PortCompleter completer;
214+
const _AcquireMessage();
215+
}
215216

216-
_AcquireMessage(this.completer);
217+
class _UnlockMessage {
218+
const _UnlockMessage();
217219
}

lib/src/port_channel.dart

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,19 @@ class ChildPortClient implements PortClient {
149149
void fire(Object message) {
150150
sendPort.send(_FireMessage(message));
151151
}
152+
153+
void _cancelAll(Object error) {
154+
var handlers = this.handlers;
155+
this.handlers.clear();
156+
for (var message in handlers.values) {
157+
message.completeError(error);
158+
}
159+
}
160+
161+
void close() {
162+
_cancelAll(const ClosedException());
163+
receivePort.close();
164+
}
152165
}
153166

154167
class RequestPortServer {
@@ -178,7 +191,7 @@ class PortServer {
178191
return _receivePort.sendPort;
179192
}
180193

181-
client() {
194+
SerializedPortClient client() {
182195
return SerializedPortClient(sendPort);
183196
}
184197

0 commit comments

Comments
 (0)