Skip to content

Commit acda397

Browse files
committed
Improve performance.
1 parent e5d540d commit acda397

File tree

4 files changed

+296
-65
lines changed

4 files changed

+296
-65
lines changed

lib/src/connection_pool.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
4646
bool haveLock = false;
4747
var completer = Completer<T>();
4848

49-
var futures = _readConnections.map((connection) async {
49+
var futures = _readConnections.sublist(0).map((connection) async {
5050
try {
5151
return await connection.readLock((ctx) async {
5252
if (haveLock) {

lib/src/port_channel.dart

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import 'dart:async';
2+
import 'dart:collection';
3+
import 'dart:isolate';
4+
5+
import 'package:sqlite_async/src/isolate_completer.dart';
6+
7+
class PortClient {
8+
late Future<SendPort> sendPort;
9+
ReceivePort receivePort = ReceivePort();
10+
bool closed = false;
11+
int _nextId = 1;
12+
13+
Map<int, Completer<Object?>> handlers = HashMap();
14+
15+
PortClient() {
16+
final initCompleter = Completer<SendPort>();
17+
sendPort = initCompleter.future;
18+
receivePort.listen((message) {
19+
if (message is _InitMessage) {
20+
assert(!initCompleter.isCompleted);
21+
initCompleter.complete(message.port);
22+
} else if (message is _PortChannelResult) {
23+
final handler = handlers.remove(message.requestId);
24+
assert(handler != null);
25+
if (message.success) {
26+
handler!.complete(message.result);
27+
} else {
28+
handler!.completeError(message.error, message.stackTrace);
29+
}
30+
} else if (message == _closeMessage) {
31+
close();
32+
}
33+
}, onError: (e) {
34+
if (!initCompleter.isCompleted) {
35+
initCompleter.completeError(e);
36+
}
37+
38+
close();
39+
}, onDone: () {
40+
if (!initCompleter.isCompleted) {
41+
initCompleter.completeError(ClosedException());
42+
}
43+
close();
44+
});
45+
}
46+
47+
Future<void> get ready async {
48+
await sendPort;
49+
}
50+
51+
_cancelAll(Object error) {
52+
var handlers = this.handlers;
53+
this.handlers = {};
54+
for (var message in handlers.values) {
55+
message.completeError(error);
56+
}
57+
}
58+
59+
Future<T> post<T>(Object message) async {
60+
if (closed) {
61+
throw ClosedException();
62+
}
63+
var completer = Completer<T>();
64+
var id = _nextId++;
65+
handlers[id] = completer;
66+
(await sendPort).send(_RequestMessage(id, message));
67+
return await completer.future;
68+
}
69+
70+
PortServer server() {
71+
return PortServer(receivePort.sendPort);
72+
}
73+
74+
close() async {
75+
if (!closed) {
76+
closed = true;
77+
78+
receivePort.close();
79+
_cancelAll(const ClosedException());
80+
}
81+
}
82+
83+
tieToIsolate(Isolate isolate) {
84+
isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage);
85+
}
86+
}
87+
88+
class PortServer {
89+
SendPort port;
90+
late ReceivePort receivePort;
91+
late Future<Object?> Function(Object? message) handle;
92+
93+
PortServer(this.port);
94+
95+
void init(Future<Object?> Function(Object? message) handle) {
96+
this.handle = handle;
97+
receivePort = ReceivePort();
98+
port.send(_InitMessage(receivePort.sendPort));
99+
receivePort.listen((message) async {
100+
final request = message as _RequestMessage;
101+
try {
102+
var result = await handle(request.message);
103+
port.send(_PortChannelResult.success(request.id, result));
104+
} catch (e, stacktrace) {
105+
port.send(_PortChannelResult.error(request.id, e, stacktrace));
106+
}
107+
});
108+
}
109+
}
110+
111+
const _closeMessage = '_Close';
112+
113+
class _InitMessage {
114+
final SendPort port;
115+
116+
_InitMessage(this.port);
117+
}
118+
119+
class _RequestMessage {
120+
final int id;
121+
final Object message;
122+
123+
_RequestMessage(this.id, this.message);
124+
}
125+
126+
class ClosedException implements Exception {
127+
const ClosedException();
128+
}
129+
130+
class _PortChannelResult<T> {
131+
final int requestId;
132+
final bool success;
133+
final T? _result;
134+
final Object? _error;
135+
final StackTrace? stackTrace;
136+
137+
const _PortChannelResult.success(this.requestId, T result)
138+
: success = true,
139+
_error = null,
140+
stackTrace = null,
141+
_result = result;
142+
const _PortChannelResult.error(this.requestId, Object error,
143+
[this.stackTrace])
144+
: success = false,
145+
_result = null,
146+
_error = error;
147+
148+
T get value {
149+
if (success) {
150+
return _result as T;
151+
} else {
152+
if (_error != null && stackTrace != null) {
153+
Error.throwWithStackTrace(_error!, stackTrace!);
154+
} else {
155+
throw _error!;
156+
}
157+
}
158+
}
159+
160+
T get result {
161+
assert(success);
162+
return _result as T;
163+
}
164+
165+
Object get error {
166+
assert(!success);
167+
return _error!;
168+
}
169+
}

0 commit comments

Comments
 (0)