Skip to content

Commit cd29da4

Browse files
committed
Implement watch in isolate connections.
1 parent 6d41200 commit cd29da4

File tree

7 files changed

+181
-6
lines changed

7 files changed

+181
-6
lines changed

lib/src/database_utils.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22
import 'dart:convert';
3+
import 'dart:isolate';
34

45
import 'package:sqlite3/sqlite3.dart' as sqlite;
56

@@ -110,3 +111,15 @@ Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql) async {
110111
class InitDb {
111112
const InitDb();
112113
}
114+
115+
class SubscribeToUpdates {
116+
final SendPort port;
117+
118+
SubscribeToUpdates(this.port);
119+
}
120+
121+
class UnsubscribeToUpdates {
122+
final SendPort port;
123+
124+
UnsubscribeToUpdates(this.port);
125+
}
Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import 'sqlite_connection_impl.dart';
2-
import 'sqlite_connection.dart';
1+
import 'dart:async';
2+
import 'dart:isolate';
3+
4+
import 'database_utils.dart';
35
import 'mutex.dart';
46
import 'port_channel.dart';
7+
import 'sqlite_connection.dart';
8+
import 'sqlite_connection_impl.dart';
59
import 'sqlite_open_factory.dart';
10+
import 'update_notification.dart';
611

712
class IsolateConnectionFactory {
813
SqliteOpenFactory openFactory;
@@ -15,12 +20,70 @@ class IsolateConnectionFactory {
1520
required this.upstreamPort});
1621

1722
SqliteConnection open({String? debugName, bool readOnly = false}) {
18-
return SqliteConnectionImpl(
23+
final updates = _IsolateUpdateListener(upstreamPort);
24+
25+
var openMutex = mutex.open();
26+
27+
return _IsolateSqliteConnection(
1928
openFactory: openFactory,
20-
mutex: mutex.open(),
29+
mutex: openMutex,
2130
upstreamPort: upstreamPort,
2231
readOnly: readOnly,
2332
debugName: debugName,
24-
primary: false);
33+
updates: updates.stream,
34+
closeFunction: () {
35+
openMutex.close();
36+
updates.close();
37+
});
38+
}
39+
}
40+
41+
class _IsolateUpdateListener {
42+
final ChildPortClient client;
43+
final ReceivePort port = ReceivePort();
44+
late final StreamController<UpdateNotification> controller;
45+
46+
_IsolateUpdateListener(SerializedPortClient upstreamPort)
47+
: client = upstreamPort.open() {
48+
controller = StreamController.broadcast(onListen: () {
49+
client.fire(SubscribeToUpdates(port.sendPort));
50+
}, onCancel: () {
51+
client.fire(UnsubscribeToUpdates(port.sendPort));
52+
});
53+
54+
port.listen((message) {
55+
if (message is UpdateNotification) {
56+
controller.add(message);
57+
}
58+
});
59+
}
60+
61+
Stream<UpdateNotification> get stream {
62+
return controller.stream;
63+
}
64+
65+
close() {
66+
client.fire(UnsubscribeToUpdates(port.sendPort));
67+
controller.close();
68+
port.close();
69+
}
70+
}
71+
72+
class _IsolateSqliteConnection extends SqliteConnectionImpl {
73+
final void Function() closeFunction;
74+
75+
_IsolateSqliteConnection(
76+
{required super.openFactory,
77+
required super.mutex,
78+
required super.upstreamPort,
79+
super.updates,
80+
super.debugName,
81+
super.readOnly = false,
82+
required this.closeFunction});
83+
84+
@override
85+
Future<void> close() async {
86+
await super.close();
87+
closeFunction();
2588
}
2689
}

lib/src/sqlite_connection_impl.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
2020
final SimpleMutex _connectionMutex = SimpleMutex();
2121
final Mutex _writeMutex;
2222

23+
/// Must be a broadcast stream
2324
@override
2425
final Stream<UpdateNotification>? updates;
2526
final ParentPortClient _isolateClient = ParentPortClient();

lib/src/sqlite_database.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:isolate';
23

34
import 'connection_pool.dart';
45
import 'database_utils.dart';
@@ -101,6 +102,8 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
101102
void _listenForEvents() {
102103
UpdateNotification? updates;
103104

105+
Map<SendPort, StreamSubscription> subscriptions = {};
106+
104107
_eventsPort = PortServer((message) async {
105108
if (message is UpdateNotification) {
106109
if (updates == null) {
@@ -119,6 +122,17 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
119122
}
120123
} else if (message is InitDb) {
121124
await _initialized;
125+
} else if (message is SubscribeToUpdates) {
126+
if (subscriptions.containsKey(message.port)) {
127+
return;
128+
}
129+
final subscription = _updatesController.stream.listen((event) {
130+
message.port.send(event);
131+
});
132+
subscriptions[message.port] = subscription;
133+
} else if (message is UnsubscribeToUpdates) {
134+
final subscription = subscriptions.remove(message.port);
135+
subscription?.cancel();
122136
}
123137
});
124138
}

lib/src/sqlite_queries.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import 'update_notification.dart';
1010
/// Classes using this need to implement [SqliteConnection.readLock]
1111
/// and [SqliteConnection.writeLock].
1212
mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
13+
/// Broadcast stream that is notified of any table updates
1314
Stream<UpdateNotification>? get updates;
1415

1516
@override

test/basic_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void main() {
4747
]));
4848
});
4949

50-
// Manual test
50+
// Manually verified
5151
test('Concurrency', () async {
5252
final db = SqliteDatabase.withFactory(
5353
openFactory: testFactory(path: path), maxReaders: 3);

test/watch_test.dart

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import 'dart:async';
2+
import 'dart:isolate';
23
import 'dart:math';
34

5+
import 'package:async/async.dart';
6+
import 'package:sqlite3/sqlite3.dart';
47
import 'package:sqlite_async/sqlite_async.dart';
58
import 'package:sqlite_async/src/database_utils.dart';
9+
import 'package:sqlite_async/src/isolate_connection_factory.dart';
610
import 'package:test/test.dart';
711

812
import 'util.dart';
@@ -163,5 +167,84 @@ void main() {
163167
UpdateNotification.single('assets')
164168
]));
165169
});
170+
171+
test('watch in isolate', () async {
172+
final db = await setupDatabase(path: path);
173+
await createTables(db);
174+
175+
const baseTime = 20;
176+
177+
const throttleDuration = Duration(milliseconds: baseTime);
178+
179+
final rows = await db.execute(
180+
'INSERT INTO customers(name) VALUES (?) RETURNING id',
181+
['a customer']);
182+
final id = rows[0]['id'];
183+
184+
var done = false;
185+
inserts() async {
186+
while (!done) {
187+
await db.execute(
188+
'INSERT INTO assets(make, customer_id) VALUES (?, ?)',
189+
['test', id]);
190+
await Future.delayed(
191+
Duration(milliseconds: Random().nextInt(baseTime * 2)));
192+
}
193+
}
194+
195+
const numberOfQueries = 10;
196+
197+
inserts();
198+
199+
final factory = db.isolateConnectionFactory();
200+
201+
var l = await inIsolateWatch(factory, numberOfQueries, throttleDuration);
202+
203+
var results = l[0] as List<ResultSet>;
204+
var times = l[1] as List<DateTime>;
205+
done = true;
206+
207+
var lastCount = 0;
208+
for (var r in results) {
209+
final count = r.first['count'];
210+
// This is not strictly incrementing, since we can't guarantee the
211+
// exact order between reads and writes.
212+
// We can guarantee that there will always be a read after the last write,
213+
// but the previous read may have been after the same write in some cases.
214+
expect(count, greaterThanOrEqualTo(lastCount));
215+
lastCount = count;
216+
}
217+
218+
// The number of read queries must not be greater than the number of writes overall.
219+
expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count']));
220+
221+
DateTime? lastTime;
222+
for (var r in times) {
223+
if (lastTime != null) {
224+
var diff = r.difference(lastTime);
225+
expect(diff, greaterThanOrEqualTo(throttleDuration));
226+
}
227+
lastTime = r;
228+
}
229+
});
230+
});
231+
}
232+
233+
Future<List<Object>> inIsolateWatch(IsolateConnectionFactory factory,
234+
int numberOfQueries, Duration throttleDuration) async {
235+
return await Isolate.run(() async {
236+
final db = factory.open();
237+
238+
final stream = db.watch(
239+
'SELECT count() AS count FROM assets INNER JOIN customers ON customers.id = assets.customer_id',
240+
throttle: throttleDuration);
241+
List<DateTime> times = [];
242+
final results = await stream.take(numberOfQueries).map((e) {
243+
times.add(DateTime.now());
244+
return e;
245+
}).toList();
246+
247+
db.close();
248+
return [results, times];
166249
});
167250
}

0 commit comments

Comments
 (0)