Skip to content

Commit c0e06a8

Browse files
committed
Remove SqliteConnectionFactory - single-isolate support for now.
1 parent acda397 commit c0e06a8

File tree

9 files changed

+102
-137
lines changed

9 files changed

+102
-137
lines changed

lib/src/connection_pool.dart

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import 'dart:async';
2+
import 'dart:isolate';
23

4+
import 'mutex.dart';
35
import 'sqlite_connection.dart';
4-
import 'sqlite_connection_factory.dart';
56
import 'sqlite_connection_impl.dart';
7+
import 'sqlite_open_factory.dart';
68
import 'sqlite_queries.dart';
79
import 'update_notification.dart';
810

@@ -12,7 +14,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
1214

1315
final List<SqliteConnectionImpl> _readConnections = [];
1416

15-
final SqliteConnectionFactory _factory;
17+
final SqliteOpenFactory _factory;
18+
final SendPort _upstreamPort;
1619

1720
@override
1821
final Stream<UpdateNotification>? updates;
@@ -21,6 +24,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
2124

2225
final String? debugName;
2326

27+
final Mutex mutex;
28+
2429
/// Open a new connection pool.
2530
///
2631
/// The provided factory is used to open connections on demand. Connections
@@ -35,8 +40,11 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
3540
{this.updates,
3641
this.maxReaders = 5,
3742
SqliteConnection? writeConnection,
38-
this.debugName})
39-
: _writeConnection = writeConnection;
43+
this.debugName,
44+
required this.mutex,
45+
required SendPort upstreamPort})
46+
: _writeConnection = writeConnection,
47+
_upstreamPort = upstreamPort;
4048

4149
@override
4250
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
@@ -92,8 +100,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
92100
@override
93101
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
94102
{Duration? lockTimeout}) {
95-
_writeConnection ??= _factory.openConnection(
96-
debugName: debugName != null ? '$debugName-writer' : null);
103+
_writeConnection ??= SqliteConnectionImpl(
104+
upstreamPort: _upstreamPort,
105+
primary: false,
106+
updates: updates,
107+
debugName: debugName != null ? '$debugName-writer' : null,
108+
mutex: mutex,
109+
readOnly: false,
110+
openFactory: _factory);
97111
return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout);
98112
}
99113

@@ -106,10 +120,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
106120
var name = debugName == null
107121
? null
108122
: '$debugName-${_readConnections.length + 1}';
109-
var connection = _factory.openConnection(
123+
var connection = SqliteConnectionImpl(
124+
upstreamPort: _upstreamPort,
125+
primary: false,
110126
updates: updates,
111127
debugName: name,
112-
readOnly: true) as SqliteConnectionImpl;
128+
mutex: mutex,
129+
readOnly: true,
130+
openFactory: _factory);
113131
_readConnections.add(connection);
114132

115133
// Edge case:

lib/src/port_channel.dart

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ import 'dart:async';
22
import 'dart:collection';
33
import 'dart:isolate';
44

5-
import 'package:sqlite_async/src/isolate_completer.dart';
6-
75
class PortClient {
86
late Future<SendPort> sendPort;
97
ReceivePort receivePort = ReceivePort();

lib/src/sqlite_connection_factory.dart

Lines changed: 0 additions & 60 deletions
This file was deleted.

lib/src/sqlite_connection_impl.dart

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import 'dart:async';
2-
import 'dart:collection';
32
import 'dart:isolate';
43

54
import 'package:sqlite3/sqlite3.dart' as sqlite;
6-
import 'package:sqlite_async/src/port_channel.dart';
75

6+
import 'isolate_completer.dart';
87
import 'mutex.dart';
8+
import 'port_channel.dart';
99
import 'sqlite_connection.dart';
10-
import 'sqlite_connection_factory.dart';
10+
import 'sqlite_open_factory.dart';
1111
import 'sqlite_queries.dart';
1212
import 'update_notification.dart';
1313

@@ -16,31 +16,42 @@ typedef TxCallback<T> = Future<T> Function(sqlite.Database db);
1616
/// Implements a SqliteConnection using a separate isolate for the database
1717
/// operations.
1818
class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
19-
final SqliteConnectionFactory _factory;
20-
2119
/// Private to this connection
2220
final SimpleMutex _connectionMutex = SimpleMutex();
21+
final Mutex _writeMutex;
2322

2423
@override
2524
final Stream<UpdateNotification>? updates;
2625
final PortClient _dbIsolate = PortClient();
2726
final String? debugName;
2827
final bool readOnly;
2928

30-
SqliteConnectionImpl(this._factory,
31-
{this.updates, this.debugName, this.readOnly = false}) {
32-
_open();
29+
SqliteConnectionImpl(
30+
{required SqliteOpenFactory openFactory,
31+
required Mutex mutex,
32+
required SendPort upstreamPort,
33+
this.updates,
34+
this.debugName,
35+
this.readOnly = false,
36+
bool primary = false})
37+
: _writeMutex = mutex {
38+
_open(openFactory, primary: primary, upstreamPort: upstreamPort);
3339
}
3440

3541
Future<void> get ready async {
3642
await _dbIsolate.ready;
3743
}
3844

39-
Future<void> _open() async {
45+
Future<void> _open(SqliteOpenFactory openFactory,
46+
{required bool primary, required SendPort upstreamPort}) async {
4047
await _connectionMutex.lock(() async {
4148
var isolate = await Isolate.spawn(
4249
_sqliteConnectionIsolate,
43-
_SqliteConnectionParams(_factory, _dbIsolate.server(),
50+
_SqliteConnectionParams(
51+
openFactory: openFactory,
52+
port: upstreamPort,
53+
primary: primary,
54+
portServer: _dbIsolate.server(),
4455
readOnly: readOnly),
4556
debugName: debugName,
4657
paused: true);
@@ -88,7 +99,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
8899
stopWatch.stop();
89100
}
90101
// DB lock so that only one write happens at a time
91-
return await _factory.mutex.lock(() async {
102+
return await _writeMutex.lock(() async {
92103
final ctx = _TransactionContext(_dbIsolate);
93104
try {
94105
return await callback(ctx);
@@ -173,7 +184,7 @@ class _TransactionContext implements SqliteWriteContext {
173184
@override
174185
Future<void> executeBatch(String sql, List<List<Object?>> parameterSets) {
175186
return computeWithDatabase((db) async {
176-
final statement = db.prepare(sql);
187+
final statement = db.prepare(sql, checkNoTail: true);
177188
try {
178189
for (var parameters in parameterSets) {
179190
statement.execute(parameters);
@@ -186,15 +197,24 @@ class _TransactionContext implements SqliteWriteContext {
186197
}
187198

188199
void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
189-
final db = await params.factory.openRawDatabase(readOnly: params.readOnly);
190-
final port = params.factory.port;
200+
final port = params.port;
201+
if (!params.primary) {
202+
// Wait until the primary connection has been initialized.
203+
// The primary connection is responsible for configuring journal mode,
204+
// running migrations, and other setup.
205+
var initialized = IsolateResult<void>();
206+
port.send(['init-db', initialized.completer]);
207+
await initialized.future;
208+
}
209+
final db = await params.openFactory.open(SqliteOpenOptions(
210+
primaryConnection: params.primary, readOnly: params.readOnly));
191211

192212
final server = params.portServer;
193213
final commandPort = ReceivePort();
194214

195215
Set<String> updatedTables = {};
196-
int? txId = null;
197-
Object? txError = null;
216+
int? txId;
217+
Object? txError;
198218

199219
db.updates.listen((event) {
200220
updatedTables.add(event.tableName);
@@ -261,12 +281,19 @@ void _sqliteConnectionIsolate(_SqliteConnectionParams params) async {
261281
}
262282

263283
class _SqliteConnectionParams {
264-
SqliteConnectionFactory factory;
265-
PortServer portServer;
266-
bool readOnly;
284+
final PortServer portServer;
285+
final bool readOnly;
286+
287+
final SendPort port;
288+
final bool primary;
289+
final SqliteOpenFactory openFactory;
267290

268-
_SqliteConnectionParams(this.factory, this.portServer,
269-
{required this.readOnly});
291+
_SqliteConnectionParams(
292+
{required this.openFactory,
293+
required this.portServer,
294+
required this.port,
295+
required this.readOnly,
296+
required this.primary});
270297
}
271298

272299
class _SqliteIsolateStatement {

lib/src/sqlite_database.dart

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import 'dart:async';
22
import 'dart:isolate';
33

4-
import 'package:sqlite_async/src/sqlite_open_factory.dart';
5-
64
import 'connection_pool.dart';
75
import 'isolate_completer.dart';
86
import 'mutex.dart';
97
import 'sqlite_connection.dart';
10-
import 'sqlite_connection_factory.dart';
118
import 'sqlite_connection_impl.dart';
9+
import 'sqlite_open_factory.dart';
1210
import 'sqlite_options.dart';
1311
import 'sqlite_queries.dart';
1412
import 'update_notification.dart';
@@ -24,7 +22,7 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
2422
final int maxReaders;
2523

2624
/// Global lock to serialize write transactions.
27-
final Mutex mutex = Mutex.shared();
25+
final Mutex mutex = Mutex();
2826

2927
/// Factory that opens a raw database connection in each isolate.
3028
///
@@ -74,11 +72,13 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
7472
SqliteDatabase.withFactory({required this.openFactory, this.maxReaders = 5}) {
7573
updates = _updatesController.stream;
7674
_internalConnection = _openPrimaryConnection(debugName: 'sqlite-writer');
77-
_pool = SqliteConnectionPool(_factory(),
75+
_pool = SqliteConnectionPool(openFactory,
76+
upstreamPort: _eventsPort.sendPort,
7877
updates: updates,
7978
writeConnection: _internalConnection,
8079
debugName: 'sqlite',
81-
maxReaders: maxReaders);
80+
maxReaders: maxReaders,
81+
mutex: mutex);
8282

8383
_listenForEvents();
8484

@@ -129,23 +129,13 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
129129
}
130130

131131
SqliteConnectionImpl _openPrimaryConnection({String? debugName}) {
132-
return SqliteConnectionImpl(_factory(primary: true),
133-
updates: updates, debugName: debugName);
134-
}
135-
136-
/// Advanced: Get a connection factory.
137-
///
138-
/// This factory can be passed to other isolates, to allow querying from
139-
/// different isolates.
140-
SqliteConnectionFactory connectionFactory() {
141-
return _factory();
142-
}
143-
144-
SqliteConnectionFactory _factory({bool primary = false}) {
145-
return SqliteConnectionFactory(
146-
port: _eventsPort.sendPort,
132+
return SqliteConnectionImpl(
133+
upstreamPort: _eventsPort.sendPort,
134+
primary: true,
135+
updates: updates,
136+
debugName: debugName,
147137
mutex: mutex,
148-
primary: primary,
138+
readOnly: false,
149139
openFactory: openFactory);
150140
}
151141

lib/src/sqlite_queries.dart

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import 'sqlite_connection.dart';
2-
import 'update_notification.dart';
1+
import 'package:sqlite3/sqlite3.dart' as sqlite;
2+
33
import 'database_utils.dart';
4+
import 'sqlite_connection.dart';
45
import 'throttle.dart';
5-
import 'package:sqlite3/sqlite3.dart' as sqlite;
6+
import 'update_notification.dart';
67

78
/// Mixin to provide default query functionality.
89
///

lib/src/sqlite_setup.dart

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)