Skip to content

Commit 3696bc2

Browse files
committed
Fix watch with parameters.
1 parent 3966614 commit 3696bc2

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

lib/src/database_utils.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ Future<Set<String>> getSourceTablesText(
5757
}
5858

5959
/// Given a SELECT query, return the tables that the query depends on.
60-
Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql) async {
61-
final rows = await ctx.getAll('EXPLAIN $sql');
60+
Future<Set<String>> getSourceTables(SqliteReadContext ctx, String sql,
61+
[List<Object?> parameters = const []]) async {
62+
final rows = await ctx.getAll('EXPLAIN $sql', parameters);
6263
List<int> rootpages = [];
6364
for (var row in rows) {
6465
if (row['opcode'] == 'OpenRead' && row['p3'] == 0 && row['p2'] is int) {

lib/src/sqlite_queries.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
5050
Iterable<String>? triggerOnTables}) async* {
5151
assert(updates != null,
5252
'updates stream must be provided to allow query watching');
53-
final tables = triggerOnTables ?? await getSourceTables(this, sql);
53+
final tables =
54+
triggerOnTables ?? await getSourceTables(this, sql, parameters);
5455
final filteredStream =
5556
updates!.transform(UpdateNotification.filterTablesTransformer(tables));
5657
final throttledStream = UpdateNotification.throttleStream(

test/watch_test.dart

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,72 @@ void main() {
245245
lastTime = r;
246246
}
247247
});
248+
249+
test('watch with parameters', () async {
250+
final db = await setupDatabase(path: path);
251+
await createTables(db);
252+
253+
const baseTime = 20;
254+
255+
const throttleDuration = Duration(milliseconds: baseTime);
256+
257+
final rows = await db.execute(
258+
'INSERT INTO customers(name) VALUES (?) RETURNING id',
259+
['a customer']);
260+
final id = rows[0]['id'];
261+
262+
final stream = db.watch(
263+
'SELECT count() AS count FROM assets WHERE customer_id = ?',
264+
parameters: [id],
265+
throttle: throttleDuration);
266+
267+
var done = false;
268+
inserts() async {
269+
while (!done) {
270+
await db.execute(
271+
'INSERT INTO assets(make, customer_id) VALUES (?, ?)',
272+
['test', id]);
273+
await Future.delayed(
274+
Duration(milliseconds: Random().nextInt(baseTime * 2)));
275+
}
276+
}
277+
278+
const numberOfQueries = 10;
279+
280+
inserts();
281+
try {
282+
List<DateTime> times = [];
283+
final results = await stream.take(numberOfQueries).map((e) {
284+
times.add(DateTime.now());
285+
return e;
286+
}).toList();
287+
288+
var lastCount = 0;
289+
for (var r in results) {
290+
final count = r.first['count'];
291+
// This is not strictly incrementing, since we can't guarantee the
292+
// exact order between reads and writes.
293+
// We can guarantee that there will always be a read after the last write,
294+
// but the previous read may have been after the same write in some cases.
295+
expect(count, greaterThanOrEqualTo(lastCount));
296+
lastCount = count;
297+
}
298+
299+
// The number of read queries must not be greater than the number of writes overall.
300+
expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count']));
301+
302+
DateTime? lastTime;
303+
for (var r in times) {
304+
if (lastTime != null) {
305+
var diff = r.difference(lastTime);
306+
expect(diff, greaterThanOrEqualTo(throttleDuration));
307+
}
308+
lastTime = r;
309+
}
310+
} finally {
311+
done = true;
312+
}
313+
});
248314
});
249315
}
250316

0 commit comments

Comments
 (0)