Skip to content

Commit 8677a6e

Browse files
committed
Add migration support.
1 parent 7dedd3c commit 8677a6e

File tree

7 files changed

+426
-9
lines changed

7 files changed

+426
-9
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ query access.
1515

1616
## Features
1717

18-
* Fast.
19-
* Direct SQL query access.
20-
* Uses a connection pool to allow concurrent queries.
18+
* All operations are asynchronous by default - does not block the main isolate.
19+
* Concurrent transactions supported by default - one write transaction and many multiple read transactions.
20+
* Uses WAL mode with minimal locking.
21+
* Direct synchronous access in an isolate is supported for performance-sensitive use cases.

lib/sqlite_async.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ export 'src/update_notification.dart';
88
export 'src/sqlite_database.dart';
99
export 'src/sqlite_options.dart';
1010
export 'src/sqlite_open_factory.dart';
11+
export 'src/sqlite_migrations.dart';

lib/src/sqlite_migrations.dart

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import 'dart:async';
2+
import 'dart:convert';
3+
4+
import 'package:sqlite3/sqlite3.dart';
5+
6+
import 'sqlite_connection.dart';
7+
8+
class SqliteMigrations {
9+
String migrationTable;
10+
List<SqliteMigration> migrations = [];
11+
SqliteMigration? createDatabase;
12+
13+
SqliteMigrations({this.migrationTable = "_migrations"});
14+
15+
add(SqliteMigration migration) {
16+
assert(
17+
migrations.isEmpty || migrations.last.toVersion < migration.toVersion);
18+
19+
final down = migration.downMigration;
20+
if (down != null) {
21+
if (migrations.isEmpty) {
22+
if (down.toVersion != 0) {
23+
throw MigrationError(
24+
'Down migration for first migration must have toVersion = 0');
25+
}
26+
} else {
27+
if (down.toVersion > migrations.last.toVersion) {
28+
throw MigrationError(
29+
'Down migration for ${migration.toVersion} must have a toVersion <= ${migrations.last.toVersion}');
30+
}
31+
}
32+
}
33+
34+
migrations.add(migration);
35+
}
36+
37+
get version {
38+
return migrations.isEmpty ? 0 : migrations.last.toVersion;
39+
}
40+
41+
Future<int> getCurrentVersion(SqliteWriteContext db) async {
42+
try {
43+
final currentVersionRow = await db.getOptional(
44+
'SELECT ifnull(max(id), 0) as version FROM $migrationTable');
45+
int currentVersion =
46+
currentVersionRow == null ? 0 : currentVersionRow['version'];
47+
return currentVersion;
48+
} on SqliteException catch (e) {
49+
if (e.message.contains('no such table')) {
50+
return 0;
51+
}
52+
rethrow;
53+
}
54+
}
55+
56+
_validateCreateDatabase() {
57+
if (createDatabase != null) {
58+
if (createDatabase!.downMigration != null) {
59+
throw MigrationError("createDatabase may not contain down migrations");
60+
}
61+
62+
var hasMatchingVersion = migrations
63+
.where((element) => element.toVersion == createDatabase!.toVersion)
64+
.isNotEmpty;
65+
if (!hasMatchingVersion) {
66+
throw MigrationError(
67+
"createDatabase.version (${createDatabase!.toVersion} must match a migration version");
68+
}
69+
}
70+
}
71+
72+
Future<void> migrate(SqliteConnection db) async {
73+
_validateCreateDatabase();
74+
75+
await db.writeTransaction((tx) async {
76+
await tx.execute(
77+
'CREATE TABLE IF NOT EXISTS $migrationTable(id INTEGER PRIMARY KEY, down_migrations TEXT)');
78+
79+
int currentVersion = await getCurrentVersion(tx);
80+
81+
if (currentVersion == version) {
82+
return;
83+
}
84+
85+
// Handle down migrations
86+
while (currentVersion > version) {
87+
final migrationRow = await tx.getOptional(
88+
'SELECT id, down_migrations FROM $migrationTable WHERE id > ? ORDER BY id DESC LIMIT 1',
89+
[version]);
90+
91+
if (migrationRow == null || migrationRow['down_migrations'] == null) {
92+
throw MigrationError(
93+
'No down migration found from $currentVersion to $version');
94+
}
95+
96+
final migrations = jsonDecode(migrationRow['down_migrations']);
97+
for (var migration in migrations) {
98+
await tx.execute(migration['sql'], migration['params']);
99+
}
100+
101+
// Refresh version
102+
int prevVersion = currentVersion;
103+
currentVersion = await getCurrentVersion(tx);
104+
if (prevVersion == currentVersion) {
105+
throw MigrationError(
106+
'Database down from version $currentVersion to $version failed - version not updated after dow migration');
107+
}
108+
}
109+
110+
// Clean setup
111+
if (currentVersion == 0 && createDatabase != null) {
112+
await createDatabase!.fn(tx);
113+
114+
// Still need to persist the migrations
115+
for (var migration in migrations) {
116+
if (migration.toVersion <= createDatabase!.toVersion) {
117+
await _persistMigration(migration, tx, migrationTable);
118+
}
119+
}
120+
121+
currentVersion = await getCurrentVersion(tx);
122+
}
123+
124+
// Up migrations
125+
for (var migration in migrations) {
126+
if (migration.toVersion > currentVersion) {
127+
await migration.fn(tx);
128+
await _persistMigration(migration, tx, migrationTable);
129+
}
130+
}
131+
});
132+
}
133+
}
134+
135+
Future<void> _persistMigration(SqliteMigration migration, SqliteWriteContext db,
136+
String migrationTable) async {
137+
final down = migration.downMigration;
138+
if (down != null) {
139+
List<_SqliteMigrationStatement> statements = down._statements;
140+
statements.insert(
141+
0,
142+
_SqliteMigrationStatement(
143+
'DELETE FROM $migrationTable WHERE id > ${down.toVersion}'));
144+
145+
var json = jsonEncode(statements);
146+
await db.execute(
147+
'INSERT INTO $migrationTable(id, down_migrations) VALUES(?, ?)',
148+
[migration.toVersion, json]);
149+
} else {
150+
await db.execute(
151+
'INSERT INTO $migrationTable(id, down_migrations) VALUES(?, ?)',
152+
[migration.toVersion, null]);
153+
}
154+
}
155+
156+
class MigrationError extends Error {
157+
final String message;
158+
159+
MigrationError(this.message);
160+
161+
@override
162+
String toString() {
163+
return 'MigrationError: $message';
164+
}
165+
}
166+
167+
typedef SqliteMigrationFunction = FutureOr<void> Function(
168+
SqliteWriteContext tx);
169+
170+
class SqliteMigration {
171+
final SqliteMigrationFunction fn;
172+
final int toVersion;
173+
174+
/// Optional: Add a down migration to allow this migration to be reverted
175+
/// if the user installs an older version.
176+
SqliteDownMigration? downMigration;
177+
178+
SqliteMigration(this.toVersion, this.fn, {this.downMigration});
179+
}
180+
181+
class _SqliteMigrationStatement {
182+
final String sql;
183+
final List<Object?> params;
184+
185+
_SqliteMigrationStatement(this.sql, [this.params = const []]);
186+
187+
Map<String, dynamic> toJson() {
188+
return {'sql': sql, 'params': params};
189+
}
190+
}
191+
192+
class SqliteDownMigration {
193+
final int toVersion;
194+
final List<_SqliteMigrationStatement> _statements = [];
195+
196+
SqliteDownMigration({required this.toVersion});
197+
198+
add(String sql, [List<Object?>? params]) {
199+
_statements.add(_SqliteMigrationStatement(sql, params ?? []));
200+
}
201+
}

lib/src/sqlite_options.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class SqliteOptions {
1313

1414
const SqliteOptions.defaults()
1515
: journalMode = SqliteJournalMode.wal,
16-
journalSizeLimit = 2 * 1024 * 1024,
16+
journalSizeLimit = 6 * 1024 * 1024, // 1.5x the default checkpoint size
1717
synchronous = SqliteSynchronous.normal;
1818

1919
const SqliteOptions(

scripts/benchmark.dart

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import 'dart:convert';
12
import 'dart:isolate';
3+
import 'dart:math';
24

35
import 'package:benchmarking/benchmarking.dart';
46
import 'package:collection/collection.dart';
@@ -21,6 +23,29 @@ class SqliteBenchmark {
2123
}
2224

2325
List<SqliteBenchmark> benchmarks = [
26+
SqliteBenchmark('Insert: JSON1',
27+
(SqliteDatabase db, List<List<String>> parameters) async {
28+
await db.writeTransaction((tx) async {
29+
for (var i = 0; i < parameters.length; i += 5000) {
30+
var sublist = parameters.sublist(i, min(parameters.length, i + 5000));
31+
await tx.execute(
32+
"WITH list AS (SELECT e.value ->> 0 as name, e.value ->> 1 as email FROM json_each(?) e)"
33+
'INSERT INTO customers(name, email) SELECT name, email FROM list',
34+
[jsonEncode(sublist)]);
35+
}
36+
});
37+
}, maxBatchSize: 20000),
38+
SqliteBenchmark('Read: JSON1',
39+
(SqliteDatabase db, List<List<String>> parameters) async {
40+
await db.readTransaction((tx) async {
41+
for (var i = 0; i < parameters.length; i += 10000) {
42+
var sublist = List.generate(10000, (index) => index);
43+
await tx.getAll(
44+
'SELECT name, email FROM customers WHERE id IN (SELECT e.value FROM json_each(?) e)',
45+
[jsonEncode(sublist)]);
46+
}
47+
});
48+
}, maxBatchSize: 200000, enabled: false),
2449
SqliteBenchmark('writeLock in isolate',
2550
(SqliteDatabase db, List<List<String>> parameters) async {
2651
var factory = db.isolateConnectionFactory();
@@ -145,9 +170,10 @@ void main() async {
145170

146171
createTables(SqliteDatabase db) async {
147172
await db.writeTransaction((tx) async {
173+
await tx.execute('DROP TABLE IF EXISTS customers');
148174
await tx.execute(
149-
'CREATE TABLE IF NOT EXISTS customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT)');
150-
await tx.execute('DELETE FROM customers WHERE 1');
175+
'CREATE TABLE customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT)');
176+
// await tx.execute('CREATE INDEX customer_email ON customers(email, id)');
151177
});
152178
await db.execute('VACUUM');
153179
await db.execute('PRAGMA wal_checkpoint(TRUNCATE)');
@@ -171,10 +197,13 @@ void main() async {
171197
final rows1 = await db.execute('SELECT count(*) as count FROM customers');
172198
assert(rows1[0]['count'] == 0);
173199
final results = await asyncBenchmark(benchmark.name, () async {
200+
final stopwatch = Stopwatch()..start();
174201
await benchmark.fn(db, limitedParameters);
202+
final duration = stopwatch.elapsedMilliseconds;
203+
print("${benchmark.name} $duration");
175204
}, teardown: () async {
176-
// This would make the benchmark fair, but only if each benchmark uses the
177-
// same batch size.
205+
// This would make the benchmark fair if it runs inside the benchmark,
206+
// but only if each benchmark uses the same batch size.
178207
await db.execute('PRAGMA wal_checkpoint(TRUNCATE)');
179208
});
180209

0 commit comments

Comments
 (0)