Skip to content

Commit c226f1e

Browse files
committed
Add IsolateConnectionFactory.
1 parent 6f6f89d commit c226f1e

File tree

9 files changed

+159
-18
lines changed

9 files changed

+159
-18
lines changed

lib/src/connection_pool.dart

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
2626

2727
final Mutex mutex;
2828

29+
bool closed = false;
30+
2931
/// Open a new connection pool.
3032
///
3133
/// The provided factory is used to open connections on demand. Connections
@@ -100,6 +102,9 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
100102
@override
101103
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
102104
{Duration? lockTimeout}) {
105+
if (closed) {
106+
throw AssertionError('Closed');
107+
}
103108
_writeConnection ??= SqliteConnectionImpl(
104109
upstreamPort: _upstreamPort,
105110
primary: false,
@@ -112,7 +117,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
112117
}
113118

114119
Future<void> _expandPool() async {
115-
if (_readConnections.length >= maxReaders) {
120+
if (closed || _readConnections.length >= maxReaders) {
116121
return;
117122
}
118123
bool hasCapacity = _readConnections.any((connection) => !connection.locked);
@@ -138,4 +143,13 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
138143
await connection.ready;
139144
}
140145
}
146+
147+
@override
148+
Future<void> close() async {
149+
closed = true;
150+
await _writeConnection?.close();
151+
for (var connection in _readConnections) {
152+
await connection.close();
153+
}
154+
}
141155
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import 'sqlite_connection_impl.dart';
2+
import 'sqlite_connection.dart';
3+
import 'mutex.dart';
4+
import 'port_channel.dart';
5+
import 'sqlite_open_factory.dart';
6+
7+
class IsolateConnectionFactory {
8+
SqliteOpenFactory openFactory;
9+
Mutex mutex;
10+
SerializedPortClient upstreamPort;
11+
12+
IsolateConnectionFactory(
13+
{required this.openFactory,
14+
required this.mutex,
15+
required this.upstreamPort});
16+
17+
SqliteConnection open({String? debugName, bool readOnly = false}) {
18+
return SqliteConnectionImpl(
19+
openFactory: openFactory,
20+
mutex: mutex,
21+
upstreamPort: upstreamPort,
22+
readOnly: readOnly,
23+
debugName: debugName,
24+
primary: false);
25+
}
26+
}

lib/src/mutex.dart

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class SimpleMutex implements Mutex {
3939

4040
bool get locked => last != null;
4141

42+
SharedMutex? _shared;
43+
4244
@override
4345
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
4446
if (Zone.current[this] != null) {
@@ -98,17 +100,28 @@ class SimpleMutex implements Mutex {
98100

99101
@override
100102
Future<void> close() async {
103+
_shared?.close();
101104
await lock(() async {});
102105
}
106+
107+
SharedMutex get shared {
108+
_shared ??= SharedMutex._withMutex(this);
109+
return _shared!;
110+
}
103111
}
104112

105113
/// Like Mutex, but can be coped across Isolates.
106114
class SharedMutex implements Mutex {
107115
late final SendPort _lockPort;
108116

109-
SharedMutex._() {
110-
final ReceivePort receivePort = ReceivePort();
117+
factory SharedMutex._() {
111118
final Mutex mutex = Mutex();
119+
return SharedMutex._withMutex(mutex);
120+
}
121+
122+
SharedMutex._withMutex(Mutex mutex) {
123+
final ReceivePort receivePort = ReceivePort();
124+
112125
receivePort.listen((dynamic arg) {
113126
if (arg is _AcquireMessage) {
114127
IsolateResult unlock = IsolateResult();
@@ -134,7 +147,7 @@ class SharedMutex implements Mutex {
134147

135148
@override
136149
Future<void> close() async {
137-
final r = IsolateResult<bool>();
150+
final r = IsolateResult<void>();
138151
_lockPort.send(_CloseMessage(r.completer));
139152
await r.future;
140153
}

lib/src/port_channel.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ParentPortClient implements PortClient {
5959
await sendPort;
6060
}
6161

62-
_cancelAll(Object error) {
62+
void _cancelAll(Object error) {
6363
var handlers = this.handlers;
6464
this.handlers = {};
6565
for (var message in handlers.values) {
@@ -91,7 +91,7 @@ class ParentPortClient implements PortClient {
9191
return RequestPortServer(receivePort.sendPort);
9292
}
9393

94-
close() async {
94+
void close() async {
9595
if (!closed) {
9696
closed = true;
9797

@@ -182,11 +182,11 @@ class PortServer {
182182
return SerializedPortClient(sendPort);
183183
}
184184

185-
close() {
185+
void close() {
186186
_receivePort.close();
187187
}
188188

189-
_init() {
189+
void _init() {
190190
_receivePort.listen((request) async {
191191
if (request is _FireMessage) {
192192
handle(request.message);

lib/src/sqlite_connection.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,6 @@ abstract class SqliteConnection extends SqliteWriteContext {
101101
/// In most cases, [writeTransaction] should be used instead.
102102
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
103103
{Duration? lockTimeout});
104+
105+
Future<void> close();
104106
}

lib/src/sqlite_connection_impl.dart

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
2222

2323
@override
2424
final Stream<UpdateNotification>? updates;
25-
final ParentPortClient _dbIsolate = ParentPortClient();
25+
final ParentPortClient _isolateClient = ParentPortClient();
26+
late final Isolate _isolate;
2627
final String? debugName;
2728
final bool readOnly;
2829

@@ -39,27 +40,35 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
3940
}
4041

4142
Future<void> get ready async {
42-
await _dbIsolate.ready;
43+
await _isolateClient.ready;
4344
}
4445

4546
Future<void> _open(SqliteOpenFactory openFactory,
4647
{required bool primary,
4748
required SerializedPortClient upstreamPort}) async {
4849
await _connectionMutex.lock(() async {
49-
var isolate = await Isolate.spawn(
50+
_isolate = await Isolate.spawn(
5051
_sqliteConnectionIsolate,
5152
_SqliteConnectionParams(
5253
openFactory: openFactory,
5354
port: upstreamPort,
5455
primary: primary,
55-
portServer: _dbIsolate.server(),
56+
portServer: _isolateClient.server(),
5657
readOnly: readOnly),
5758
debugName: debugName,
5859
paused: true);
59-
_dbIsolate.tieToIsolate(isolate);
60-
isolate.resume(isolate.pauseCapability!);
60+
_isolateClient.tieToIsolate(_isolate);
61+
_isolate.resume(_isolate.pauseCapability!);
6162

62-
await _dbIsolate.ready;
63+
await _isolateClient.ready;
64+
});
65+
}
66+
67+
@override
68+
Future<void> close() async {
69+
await _connectionMutex.lock(() async {
70+
await _isolateClient.post(_SqliteIsolateConnectionClose());
71+
_isolate.kill();
6372
});
6473
}
6574

@@ -73,7 +82,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
7382
// Private lock to synchronize this with other statements on the same connection,
7483
// to ensure that transactions aren't interleaved.
7584
return _connectionMutex.lock(() async {
76-
final ctx = _TransactionContext(_dbIsolate);
85+
final ctx = _TransactionContext(_isolateClient);
7786
try {
7887
return await callback(ctx);
7988
} finally {
@@ -96,7 +105,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
96105
}
97106
// DB lock so that only one write happens at a time
98107
return await _writeMutex.lock(() async {
99-
final ctx = _TransactionContext(_dbIsolate);
108+
final ctx = _TransactionContext(_isolateClient);
100109
try {
101110
return await callback(ctx);
102111
} finally {
@@ -269,6 +278,8 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
269278
updatedTables = {};
270279
}
271280
}
281+
} else if (data is _SqliteIsolateConnectionClose) {
282+
db.dispose();
272283
}
273284
});
274285

@@ -312,3 +323,7 @@ class _SqliteIsolateClose {
312323

313324
const _SqliteIsolateClose(this.ctxId);
314325
}
326+
327+
class _SqliteIsolateConnectionClose {
328+
const _SqliteIsolateConnectionClose();
329+
}

lib/src/sqlite_database.dart

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22

33
import 'connection_pool.dart';
44
import 'database_utils.dart';
5+
import 'isolate_connection_factory.dart';
56
import 'mutex.dart';
67
import 'port_channel.dart';
78
import 'sqlite_connection.dart';
@@ -22,7 +23,7 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
2223
final int maxReaders;
2324

2425
/// Global lock to serialize write transactions.
25-
final Mutex mutex = Mutex();
26+
final SimpleMutex mutex = SimpleMutex();
2627

2728
/// Factory that opens a raw database connection in each isolate.
2829
///
@@ -122,6 +123,13 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
122123
});
123124
}
124125

126+
IsolateConnectionFactory isolateConnectionFactory() {
127+
return IsolateConnectionFactory(
128+
openFactory: openFactory,
129+
mutex: mutex.shared,
130+
upstreamPort: _eventsPort.client());
131+
}
132+
125133
SqliteConnectionImpl _openPrimaryConnection({String? debugName}) {
126134
return SqliteConnectionImpl(
127135
upstreamPort: _eventsPort.client(),
@@ -133,6 +141,14 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
133141
openFactory: openFactory);
134142
}
135143

144+
@override
145+
Future<void> close() async {
146+
await _pool.close();
147+
_updatesController.close();
148+
_eventsPort.close();
149+
await mutex.close();
150+
}
151+
136152
/// Open a read-only transaction.
137153
///
138154
/// Up to [maxReaders] read transactions can run concurrently.

scripts/benchmark.dart

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:isolate';
2+
13
import 'package:benchmarking/benchmarking.dart';
24
import 'package:collection/collection.dart';
35

@@ -47,6 +49,20 @@ List<SqliteBenchmark> benchmarks = [
4749
}
4850
});
4951
}, maxBatchSize: 1000),
52+
SqliteBenchmark('Insert: writeTransaction in isolate',
53+
(SqliteDatabase db, List<List<String>> parameters) async {
54+
var factory = db.isolateConnectionFactory();
55+
await Isolate.run(() async {
56+
final db = factory.open();
57+
await db.writeTransaction((tx) async {
58+
for (var params in parameters) {
59+
await tx.execute(
60+
'INSERT INTO customers(name, email) VALUES(?, ?)', params);
61+
}
62+
});
63+
await db.close();
64+
});
65+
}, maxBatchSize: 1000),
5066
SqliteBenchmark('Insert: writeTransaction no await',
5167
(SqliteDatabase db, List<List<String>> parameters) async {
5268
await db.writeTransaction((tx) async {
@@ -148,4 +164,6 @@ void main() async {
148164
for (var entry in benchmarks) {
149165
await benchmark(entry);
150166
}
167+
168+
await db.close();
151169
}

test/isolate_test.dart

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import 'dart:isolate';
2+
import 'dart:math';
3+
4+
import 'package:sqlite_async/sqlite_async.dart';
5+
import 'package:test/test.dart';
6+
import 'package:sqlite3/sqlite3.dart' as sqlite;
7+
import 'util.dart';
8+
9+
void main() {
10+
setupLogger();
11+
12+
group('Isolate Tests', () {
13+
late String path;
14+
15+
setUp(() async {
16+
path = dbPath();
17+
await cleanDb(path: path);
18+
});
19+
20+
tearDown(() async {
21+
await cleanDb(path: path);
22+
});
23+
24+
test('Basic Isolate usage', () async {
25+
final db = await setupDatabase(path: path);
26+
final factory = db.isolateConnectionFactory();
27+
28+
final result = await Isolate.run(() async {
29+
final db = factory.open();
30+
await db
31+
.execute('CREATE TABLE test_in_isolate(id INTEGER PRIMARY KEY)');
32+
return await db.get('SELECT count() as count FROM test_in_isolate');
33+
});
34+
expect(result, equals({'count': 0}));
35+
});
36+
});
37+
}

0 commit comments

Comments
 (0)