Skip to content

Commit 6f6f89d

Browse files
committed
Cleaner port logic.
1 parent c0e06a8 commit 6f6f89d

File tree

5 files changed

+176
-74
lines changed

5 files changed

+176
-74
lines changed

lib/src/connection_pool.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import 'dart:async';
2-
import 'dart:isolate';
32

43
import 'mutex.dart';
4+
import 'port_channel.dart';
55
import 'sqlite_connection.dart';
66
import 'sqlite_connection_impl.dart';
77
import 'sqlite_open_factory.dart';
@@ -15,7 +15,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
1515
final List<SqliteConnectionImpl> _readConnections = [];
1616

1717
final SqliteOpenFactory _factory;
18-
final SendPort _upstreamPort;
18+
final SerializedPortClient _upstreamPort;
1919

2020
@override
2121
final Stream<UpdateNotification>? updates;
@@ -42,7 +42,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
4242
SqliteConnection? writeConnection,
4343
this.debugName,
4444
required this.mutex,
45-
required SendPort upstreamPort})
45+
required SerializedPortClient upstreamPort})
4646
: _writeConnection = writeConnection,
4747
_upstreamPort = upstreamPort;
4848

lib/src/database_utils.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,7 @@ Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql) async {
106106
Set<String> tables = {for (var row in tableRows) row['tbl_name']};
107107
return tables;
108108
}
109+
110+
class InitDb {
111+
const InitDb();
112+
}

lib/src/port_channel.dart

Lines changed: 132 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,28 @@ import 'dart:async';
22
import 'dart:collection';
33
import 'dart:isolate';
44

5-
class PortClient {
5+
abstract class PortClient {
6+
Future<T> post<T>(Object message);
7+
void fire(Object message);
8+
9+
factory PortClient.parent() {
10+
return ParentPortClient();
11+
}
12+
13+
factory PortClient.child(SendPort upstream) {
14+
return ChildPortClient(upstream);
15+
}
16+
}
17+
18+
class ParentPortClient implements PortClient {
619
late Future<SendPort> sendPort;
720
ReceivePort receivePort = ReceivePort();
821
bool closed = false;
922
int _nextId = 1;
1023

1124
Map<int, Completer<Object?>> handlers = HashMap();
1225

13-
PortClient() {
26+
ParentPortClient() {
1427
final initCompleter = Completer<SendPort>();
1528
sendPort = initCompleter.future;
1629
receivePort.listen((message) {
@@ -54,19 +67,28 @@ class PortClient {
5467
}
5568
}
5669

70+
@override
5771
Future<T> post<T>(Object message) async {
5872
if (closed) {
5973
throw ClosedException();
6074
}
6175
var completer = Completer<T>();
6276
var id = _nextId++;
6377
handlers[id] = completer;
64-
(await sendPort).send(_RequestMessage(id, message));
78+
(await sendPort).send(_RequestMessage(id, message, receivePort.sendPort));
6579
return await completer.future;
6680
}
6781

68-
PortServer server() {
69-
return PortServer(receivePort.sendPort);
82+
@override
83+
void fire(Object message) async {
84+
if (closed) {
85+
throw ClosedException();
86+
}
87+
(await sendPort).send(_FireMessage(message));
88+
}
89+
90+
RequestPortServer server() {
91+
return RequestPortServer(receivePort.sendPort);
7092
}
7193

7294
close() async {
@@ -83,24 +105,104 @@ class PortClient {
83105
}
84106
}
85107

108+
class SerializedPortClient {
109+
final SendPort sendPort;
110+
111+
SerializedPortClient(this.sendPort);
112+
113+
ChildPortClient open() {
114+
return ChildPortClient(sendPort);
115+
}
116+
}
117+
118+
class ChildPortClient implements PortClient {
119+
final SendPort sendPort;
120+
final ReceivePort receivePort = ReceivePort();
121+
int _nextId = 1;
122+
123+
final Map<int, Completer<Object?>> handlers = HashMap();
124+
125+
ChildPortClient(this.sendPort) {
126+
receivePort.listen((message) {
127+
if (message is _PortChannelResult) {
128+
final handler = handlers.remove(message.requestId);
129+
assert(handler != null);
130+
if (message.success) {
131+
handler!.complete(message.result);
132+
} else {
133+
handler!.completeError(message.error, message.stackTrace);
134+
}
135+
}
136+
});
137+
}
138+
139+
@override
140+
Future<T> post<T>(Object message) async {
141+
var completer = Completer<T>();
142+
var id = _nextId++;
143+
handlers[id] = completer;
144+
sendPort.send(_RequestMessage(id, message, receivePort.sendPort));
145+
return await completer.future;
146+
}
147+
148+
@override
149+
void fire(Object message) {
150+
sendPort.send(_FireMessage(message));
151+
}
152+
}
153+
154+
class RequestPortServer {
155+
final SendPort port;
156+
157+
RequestPortServer(this.port);
158+
159+
open(Future<Object?> Function(Object? message) handle) {
160+
return PortServer.forSendPort(port, handle);
161+
}
162+
}
163+
86164
class PortServer {
87-
SendPort port;
88-
late ReceivePort receivePort;
89-
late Future<Object?> Function(Object? message) handle;
90-
91-
PortServer(this.port);
92-
93-
void init(Future<Object?> Function(Object? message) handle) {
94-
this.handle = handle;
95-
receivePort = ReceivePort();
96-
port.send(_InitMessage(receivePort.sendPort));
97-
receivePort.listen((message) async {
98-
final request = message as _RequestMessage;
99-
try {
100-
var result = await handle(request.message);
101-
port.send(_PortChannelResult.success(request.id, result));
102-
} catch (e, stacktrace) {
103-
port.send(_PortChannelResult.error(request.id, e, stacktrace));
165+
final ReceivePort _receivePort = ReceivePort();
166+
final Future<Object?> Function(Object? message) handle;
167+
168+
PortServer(this.handle) {
169+
_init();
170+
}
171+
172+
PortServer.forSendPort(SendPort port, this.handle) {
173+
port.send(_InitMessage(_receivePort.sendPort));
174+
_init();
175+
}
176+
177+
SendPort get sendPort {
178+
return _receivePort.sendPort;
179+
}
180+
181+
client() {
182+
return SerializedPortClient(sendPort);
183+
}
184+
185+
close() {
186+
_receivePort.close();
187+
}
188+
189+
_init() {
190+
_receivePort.listen((request) async {
191+
if (request is _FireMessage) {
192+
handle(request.message);
193+
} else if (request is _RequestMessage) {
194+
if (request.id == 0) {
195+
// Fire and forget
196+
handle(request.message);
197+
} else {
198+
try {
199+
var result = await handle(request.message);
200+
request.reply.send(_PortChannelResult.success(request.id, result));
201+
} catch (e, stacktrace) {
202+
request.reply
203+
.send(_PortChannelResult.error(request.id, e, stacktrace));
204+
}
205+
}
104206
}
105207
});
106208
}
@@ -114,11 +216,18 @@ class _InitMessage {
114216
_InitMessage(this.port);
115217
}
116218

219+
class _FireMessage {
220+
final Object message;
221+
222+
const _FireMessage(this.message);
223+
}
224+
117225
class _RequestMessage {
118226
final int id;
119227
final Object message;
228+
final SendPort reply;
120229

121-
_RequestMessage(this.id, this.message);
230+
_RequestMessage(this.id, this.message, this.reply);
122231
}
123232

124233
class ClosedException implements Exception {

lib/src/sqlite_connection_impl.dart

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import 'dart:isolate';
33

44
import 'package:sqlite3/sqlite3.dart' as sqlite;
55

6-
import 'isolate_completer.dart';
6+
import 'database_utils.dart';
77
import 'mutex.dart';
88
import 'port_channel.dart';
99
import 'sqlite_connection.dart';
@@ -22,14 +22,14 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
2222

2323
@override
2424
final Stream<UpdateNotification>? updates;
25-
final PortClient _dbIsolate = PortClient();
25+
final ParentPortClient _dbIsolate = ParentPortClient();
2626
final String? debugName;
2727
final bool readOnly;
2828

2929
SqliteConnectionImpl(
3030
{required SqliteOpenFactory openFactory,
3131
required Mutex mutex,
32-
required SendPort upstreamPort,
32+
required SerializedPortClient upstreamPort,
3333
this.updates,
3434
this.debugName,
3535
this.readOnly = false,
@@ -43,7 +43,8 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
4343
}
4444

4545
Future<void> _open(SqliteOpenFactory openFactory,
46-
{required bool primary, required SendPort upstreamPort}) async {
46+
{required bool primary,
47+
required SerializedPortClient upstreamPort}) async {
4748
await _connectionMutex.lock(() async {
4849
var isolate = await Isolate.spawn(
4950
_sqliteConnectionIsolate,
@@ -66,11 +67,6 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
6667
return _connectionMutex.locked;
6768
}
6869

69-
/// For internal use only
70-
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
71-
return _connectionMutex.lock(callback, timeout: timeout);
72-
}
73-
7470
@override
7571
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
7672
{Duration? lockTimeout}) async {
@@ -197,14 +193,13 @@ class _TransactionContext implements SqliteWriteContext {
197193
}
198194

199195
void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
200-
final port = params.port;
196+
final client = params.port.open();
197+
201198
if (!params.primary) {
202199
// Wait until the primary connection has been initialized.
203200
// The primary connection is responsible for configuring journal mode,
204201
// running migrations, and other setup.
205-
var initialized = IsolateResult<void>();
206-
port.send(['init-db', initialized.completer]);
207-
await initialized.future;
202+
await client.post(const InitDb());
208203
}
209204
final db = await params.openFactory.open(SqliteOpenOptions(
210205
primaryConnection: params.primary, readOnly: params.readOnly));
@@ -219,7 +214,7 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
219214
db.updates.listen((event) {
220215
updatedTables.add(event.tableName);
221216
});
222-
server.init((data) async {
217+
server.open((data) async {
223218
if (data is _SqliteIsolateClose) {
224219
if (txId != null) {
225220
try {
@@ -255,7 +250,7 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
255250
try {
256251
final result = db.select(data.sql, data.args);
257252
if (updatedTables.isNotEmpty) {
258-
port.send(['update', updatedTables]);
253+
client.fire(UpdateNotification(updatedTables));
259254
updatedTables = {};
260255
}
261256
return result;
@@ -270,7 +265,7 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
270265
return await data.cb(db);
271266
} finally {
272267
if (updatedTables.isNotEmpty) {
273-
port.send(['update', updatedTables]);
268+
client.fire(UpdateNotification(updatedTables));
274269
updatedTables = {};
275270
}
276271
}
@@ -281,10 +276,10 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
281276
}
282277

283278
class _SqliteConnectionParams {
284-
final PortServer portServer;
279+
final RequestPortServer portServer;
285280
final bool readOnly;
286281

287-
final SendPort port;
282+
final SerializedPortClient port;
288283
final bool primary;
289284
final SqliteOpenFactory openFactory;
290285

0 commit comments

Comments
 (0)