From 434ec8fa1e0ceca6e0c9a7fb886ce14107246bf3 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 24 Nov 2025 11:28:55 +0200 Subject: [PATCH 01/22] Added integration test shell. --- .../PowerSync.Common.IntegrationTests.csproj | 25 ++++++++++++++ .../TestSchema.cs | 33 +++++++++++++++++++ .../UnitTest1.cs | 10 ++++++ root.sln | 15 +++++++++ 4 files changed, 83 insertions(+) create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj new file mode 100644 index 0000000..48d252a --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -0,0 +1,25 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs new file mode 100644 index 0000000..ef294a0 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs @@ -0,0 +1,33 @@ +namespace PowerSync.Common.Tests; + +using PowerSync.Common.DB.Schema; + +public class TestSchema +{ + public static Table Todos = new Table(new Dictionary + { + { "list_id", ColumnType.TEXT }, + { "created_at", ColumnType.TEXT }, + { "completed_at", ColumnType.TEXT }, + { "description", ColumnType.TEXT }, + { "created_by", ColumnType.TEXT }, + { "completed_by", ColumnType.TEXT }, + { "completed", ColumnType.INTEGER } + }, new TableOptions + { + Indexes = new Dictionary> { { "list", new List { "list_id" } } } + }); + + public static Table Lists = new Table(new Dictionary + { + { "created_at", ColumnType.TEXT }, + { "name", ColumnType.TEXT }, + { "owner_id", ColumnType.TEXT } + }); + + public static Schema PowerSyncSchema = new Schema(new Dictionary + { + { "todos", Todos }, + { "lists", Lists } + }); +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs new file mode 100644 index 0000000..6213246 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs @@ -0,0 +1,10 @@ +namespace PowerSync.Common.IntegrationTests; + +public class UnitTest1 +{ + [Fact] + public void Test1() + { + + } +} diff --git a/root.sln b/root.sln index f83584e..b4c92ae 100644 --- a/root.sln +++ b/root.sln @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WPF", "demos\WPF\WPF.csproj EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PowerSync.Maui", "PowerSync\PowerSync.Maui\PowerSync.Maui.csproj", "{A4A91B9F-0C86-41CB-BEF0-C002819C43BE}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PowerSync.Common.IntegrationTests", "Tests\PowerSync\PowerSync.Common.IntegrationTests\PowerSync.Common.IntegrationTests.csproj", "{EB81D453-777D-40B5-A504-4144906ADBF4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -105,6 +107,18 @@ Global {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x64.Build.0 = Release|Any CPU {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x86.ActiveCfg = Release|Any CPU {A4A91B9F-0C86-41CB-BEF0-C002819C43BE}.Release|x86.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x64.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x64.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x86.ActiveCfg = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Debug|x86.Build.0 = Debug|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|Any CPU.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x64.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x64.Build.0 = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x86.ActiveCfg = Release|Any CPU + {EB81D453-777D-40B5-A504-4144906ADBF4}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -117,5 +131,6 @@ Global {B8B2A9E2-FEC9-495B-B03E-23078E7B651D} = {9144195A-C68F-4B1E-A574-474EDD424D6C} {AF297026-0BEA-4B8E-97C9-6540C6D52B36} = {9144195A-C68F-4B1E-A574-474EDD424D6C} {A4A91B9F-0C86-41CB-BEF0-C002819C43BE} = {B1D87BA9-8812-4EFA-BBBE-1FF1EEEB5433} + {EB81D453-777D-40B5-A504-4144906ADBF4} = {C784FBE4-CC1E-4A0A-AE8E-6B818DD3724D} EndGlobalSection EndGlobal From 563012f98f9a893c34fa8560a9cd9b4d3af8c2e5 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 24 Nov 2025 16:39:57 +0200 Subject: [PATCH 02/22] Added initial integration tests. --- README.md | 14 + .../NodeClient.cs | 110 ++++++++ .../NodeConnector.cs | 113 ++++++++ .../PowerSync.Common.IntegrationTests.csproj | 4 +- .../SyncTests.cs | 244 ++++++++++++++++++ .../TestSchema.cs | 2 +- .../UnitTest1.cs | 10 - Tools/Setup/Setup.cs | 42 +-- 8 files changed, 506 insertions(+), 33 deletions(-) create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs delete mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs diff --git a/README.md b/README.md index 2607d75..468d6f3 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,20 @@ Run a specific test dotnet test -v n --framework net8.0 --filter "test-file-pattern" ``` +### Integration Tests +Integration tests in `PowerSync.Common.IntegrationTests` are intended to run against the [self-host nodejs demo](https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs). +The integration tests are disabled by default, define the following environment variable to let them run. + +```bash +RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 +``` + +Run integration tests exclusively. + +```bash +RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 --filter "Category=Integration" +``` + ## Using the PowerSync.Common package in your project ```bash dotnet add package PowerSync.Common --prerelease diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs new file mode 100644 index 0000000..eab9a3e --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeClient.cs @@ -0,0 +1,110 @@ +namespace PowerSync.Common.IntegrationTests; + +using System; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using System.Collections.Generic; +using PowerSync.Common.DB.Crud; + +public class NodeClient +{ + private readonly HttpClient _httpClient; + private readonly string _backendUrl; + private readonly string _userId; + + public NodeClient(string userId) + { + _httpClient = new HttpClient(); + _backendUrl = "http://localhost:6060"; + _userId = userId; + } + + public NodeClient(string backendUrl, string userId) + { + _httpClient = new HttpClient(); + _backendUrl = backendUrl; + _userId = userId; + } + + public Task CreateList(string id, string name) + { + return CreateItem("lists", id, name); + } + async Task CreateItem(string table, string id, string name) + { + var data = new Dictionary + { + { "created_at", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss") }, + { "name", name }, + { "owner_id", _userId } + }; + + var batch = new[] + { + new + { + op = UpdateType.PUT.ToString(), + table = table, + id = id, + data = data + } + }; + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{_backendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + Console.WriteLine(await response.Content.ReadAsStringAsync()); + throw new Exception( + $"Failed to create item. Status: {response.StatusCode}, " + + $"Response: {await response.Content.ReadAsStringAsync()}" + ); + } + + return await response.Content.ReadAsStringAsync(); + } + + public Task DeleteList(string id) + { + return DeleteItem("lists", id); + } + + async Task DeleteItem(string table, string id) + { + var batch = new[] + { + new + { + op = UpdateType.DELETE.ToString(), + table = table, + id = id + } + }; + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{_backendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + Console.WriteLine(await response.Content.ReadAsStringAsync()); + throw new Exception( + $"Failed to delete item. Status: {response.StatusCode}, " + + $"Response: {await response.Content.ReadAsStringAsync()}" + ); + } + + return await response.Content.ReadAsStringAsync(); + } + + public void Dispose() + { + _httpClient?.Dispose(); + } +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs new file mode 100644 index 0000000..b9448b3 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/NodeConnector.cs @@ -0,0 +1,113 @@ +namespace PowerSync.Common.IntegrationTests; + + +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using System.IO; +using PowerSync.Common.Client; +using PowerSync.Common.Client.Connection; +using PowerSync.Common.DB.Crud; + + +public class NodeConnector : IPowerSyncBackendConnector +{ + private readonly HttpClient _httpClient; + + public string BackendUrl { get; } + public string PowerSyncUrl { get; } + public string UserId { get; private set; } + private string? clientId; + + public NodeConnector(string userId) + { + _httpClient = new HttpClient(); + + // Load or generate User ID + UserId = userId; + + BackendUrl = "http://localhost:6060"; + PowerSyncUrl = "http://localhost:8080"; + + clientId = null; + } + + public async Task FetchCredentials() + { + string tokenEndpoint = "api/auth/token"; + string url = $"{BackendUrl}/{tokenEndpoint}?user_id={UserId}"; + + HttpResponseMessage response = await _httpClient.GetAsync(url); + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Received {response.StatusCode} from {tokenEndpoint}: {await response.Content.ReadAsStringAsync()}"); + } + + string responseBody = await response.Content.ReadAsStringAsync(); + var jsonResponse = JsonSerializer.Deserialize>(responseBody); + + if (jsonResponse == null || !jsonResponse.ContainsKey("token")) + { + throw new Exception("Invalid response received from authentication endpoint."); + } + + return new PowerSyncCredentials(PowerSyncUrl, jsonResponse["token"]); + } + + public async Task UploadData(IPowerSyncDatabase database) + { + CrudTransaction? transaction; + try + { + transaction = await database.GetNextCrudTransaction(); + } + catch (Exception ex) + { + Console.WriteLine($"UploadData Error: {ex.Message}"); + return; + } + + if (transaction == null) + { + return; + } + + clientId ??= await database.GetClientId(); + + try + { + var batch = new List(); + + foreach (var operation in transaction.Crud) + { + batch.Add(new + { + op = operation.Op.ToString(), + table = operation.Table, + id = operation.Id, + data = operation.OpData + }); + } + + var payload = JsonSerializer.Serialize(new { batch }); + var content = new StringContent(payload, Encoding.UTF8, "application/json"); + + HttpResponseMessage response = await _httpClient.PostAsync($"{BackendUrl}/api/data", content); + + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Received {response.StatusCode} from /api/data: {await response.Content.ReadAsStringAsync()}"); + } + + await transaction.Complete(); + } + catch (Exception ex) + { + Console.WriteLine($"UploadData Error: {ex.Message}"); + throw; + } + } +} diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj index 48d252a..06652fd 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -1,7 +1,9 @@  - net9.0 + + net8.0 + enable enable false diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs new file mode 100644 index 0000000..01b22b8 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs @@ -0,0 +1,244 @@ +using Newtonsoft.Json; +using PowerSync.Common.Client; +using System.Data.Common; +using System.Diagnostics; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + + +namespace PowerSync.Common.IntegrationTests; + +[Trait("Category", "Integration")] +public class SyncTests : IAsyncLifetime +{ + private record ListResult(string id, string name, string owner_id, string created_at); + + private string userId = Uuid(); + + private NodeClient nodeClient = default!; + + private PowerSyncDatabase db = default!; + + public async Task InitializeAsync() + { + // Create a logger factory + ILoggerFactory loggerFactory = LoggerFactory.Create(builder => + { + builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Information); + }); + + var logger = loggerFactory.CreateLogger("PowerSyncLogger"); + + nodeClient = new NodeClient(userId); + db = new PowerSyncDatabase(new PowerSyncDatabaseOptions + { + Database = new SQLOpenOptions { DbFilename = "powersync-sync-tests.db" }, + Schema = TestSchema.PowerSyncSchema, + Logger = logger + + }); + await db.Init(); + var connector = new NodeConnector(userId); + + await ClearAllData(); + + Console.WriteLine($"Using User ID: {userId}"); + try + { + await db.Connect(connector); + await db.WaitForFirstSync(); + } + catch (Exception ex) + { + Console.WriteLine($"Exception during InitializeAsync: {ex}"); + throw; + } + } + + public async Task DisposeAsync() + { + await ClearAllData(); + await Task.Delay(2000); + await db.DisconnectAndClear(); + await db.Close(); + } + + [IntegrationFact(Timeout = 3000)] + public async Task SyncDownCreateOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 1) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await nodeClient.CreateList(id, name: "Test List magic"); + await watched.Task; + } + + [IntegrationFact(Timeout = 3000)] + public async Task SyncDownDeleteOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + + await nodeClient.CreateList(id, name: "Test List to delete"); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 1) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await watched.Task; + await nodeClient.DeleteList(id); + + watched = new TaskCompletionSource(); + cts = new CancellationTokenSource(); + + await db.Watch("select * from lists where id = ?", [id], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was deleted locally + if (x.Length == 0) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + await watched.Task; + } + + [IntegrationFact(Timeout = 5000)] + public async Task SyncDownLargeCreateOperationTest() + { + var watched = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + var listName = Uuid(); + + await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + { + OnResult = (x) => + { + // Verify that the item was added locally + if (x.Length == 100) + { + watched.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + for (int i = 0; i < 100; i++) + { + await nodeClient.CreateList(Uuid(), listName); + } + await watched.Task; + } + + [IntegrationFact(Timeout = 5000)] + public async Task SyncDownCreateOperationAfterLargeUploadTest() + { + var localInsertWatch = new TaskCompletionSource(); + var backendInsertWatch = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var id = Uuid(); + var listName = Uuid(); + + await db.Watch("select * from lists where name = ?", [listName], new WatchHandler + { + OnResult = (x) => + { + // Verify that the items were added locally + if (x.Length == 100) + { + localInsertWatch.SetResult(true); + } + // Verify that the new item added to backend was synced down + else if (x.Length == 101) + { + backendInsertWatch.SetResult(true); + cts.Cancel(); + } + } + }, new SQLWatchOptions + { + Signal = cts.Token + }); + + for (int i = 0; i < 100; i++) + { + await db.Execute("insert into lists (id, name, owner_id, created_at) values (uuid(), ?, ?, datetime())", + [listName, userId]); + } + await localInsertWatch.Task; + + // let the crud upload finish + await Task.Delay(2000); + + await nodeClient.CreateList(Uuid(), listName); + await backendInsertWatch.Task; + } + + private async Task ClearAllData() + { + // Inefficient but simple way to clear all data, avoiding payload limitations + var results = await db.GetAll("select * from lists"); + foreach (var item in results) + { + await nodeClient.DeleteList(item.id); + } + } + static string Uuid() + { + return Guid.NewGuid().ToString(); + } +} + +[Trait("Category", "Integration")] +public class IntegrationFactAttribute : FactAttribute +{ + public IntegrationFactAttribute() + { + if (Environment.GetEnvironmentVariable("RUN_INTEGRATION_TESTS") != "true") + { + Skip = "Integration tests are disabled. Set RUN_INTEGRATION_TESTS=true to run."; + } + } +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs index ef294a0..dbf79a2 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/TestSchema.cs @@ -1,4 +1,4 @@ -namespace PowerSync.Common.Tests; +namespace PowerSync.Common.IntegrationTests; using PowerSync.Common.DB.Schema; diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs deleted file mode 100644 index 6213246..0000000 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/UnitTest1.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace PowerSync.Common.IntegrationTests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - - } -} diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index b47192e..09136bc 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -10,7 +10,7 @@ public class PowerSyncSetup private const string VERSION = "0.3.14"; private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/co/powersync/powersync-sqlite-core/{VERSION}"; - + private readonly HttpClient _httpClient; private readonly string _basePath; @@ -37,7 +37,7 @@ public async Task RunSetup() public async Task SetupDesktop() { Console.WriteLine("Setting up Desktop libraries..."); - + var runtimeConfigs = GetDesktopRuntimeConfigs(); var commonPath = Path.Combine(_basePath, "PowerSync.Common"); @@ -63,19 +63,19 @@ private async Task ProcessDesktopRuntime(string basePath, KeyValuePair Date: Mon, 1 Dec 2025 09:16:43 +0200 Subject: [PATCH 03/22] Support 0.4.0 for C_SHARP sync implementation. --- .../PowerSync.Common.IntegrationTests.csproj | 21 ++++++++++ .../Client/Sync/Bucket/SqliteBucketStorage.cs | 2 +- .../Client/Sync/Bucket/SyncDataBucket.cs | 39 ++++++++++--------- Tools/Setup/Setup.cs | 2 +- 4 files changed, 43 insertions(+), 21 deletions(-) create mode 100644 PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj diff --git a/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj new file mode 100644 index 0000000..d7f0b2e --- /dev/null +++ b/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -0,0 +1,21 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 495f6e0..c6e25ec 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -111,7 +111,7 @@ await db.WriteTransaction(async tx => foreach (var b in batch.Buckets) { var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - ["save", JsonConvert.SerializeObject(new { buckets = new[] { b.ToJSON() } })]); + ["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]); logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result)); count += b.Data.Length; } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs index 7699778..8c9fea5 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs @@ -9,14 +9,14 @@ public class SyncDataBucketJSON [JsonProperty("bucket")] public string Bucket { get; set; } = null!; - [JsonProperty("has_more")] - public bool? HasMore { get; set; } + // [JsonProperty("has_more")] + // public bool? HasMore { get; set; } - [JsonProperty("after")] - public string? After { get; set; } + // [JsonProperty("after")] + // public string? After { get; set; } - [JsonProperty("next_after")] - public string? NextAfter { get; set; } + // [JsonProperty("next_after")] + // public string? NextAfter { get; set; } [JsonProperty("data")] public List Data { get; set; } = []; @@ -25,15 +25,16 @@ public class SyncDataBucketJSON public class SyncDataBucket( string bucket, OplogEntry[] data, - bool hasMore, - string? after = null, - string? nextAfter = null) + bool hasMore = false + // string? after = null, + // string? nextAfter = null + ) { public string Bucket { get; private set; } = bucket; public OplogEntry[] Data { get; private set; } = data; - public bool HasMore { get; private set; } = hasMore; - public string? After { get; private set; } = after; - public string? NextAfter { get; private set; } = nextAfter; + + // public string? After { get; private set; } = after; + // public string? NextAfter { get; private set; } = nextAfter; public static SyncDataBucket FromRow(SyncDataBucketJSON row) { @@ -46,10 +47,10 @@ public static SyncDataBucket FromRow(SyncDataBucketJSON row) return new SyncDataBucket( row.Bucket, - dataEntries, - row.HasMore ?? false, - row.After, - row.NextAfter + dataEntries + // row.HasMore ?? false, + // row.After, + // row.NextAfter ); } @@ -63,9 +64,9 @@ public string ToJSON() var jsonObject = new SyncDataBucketJSON { Bucket = Bucket, - HasMore = HasMore, - After = After, - NextAfter = NextAfter, + // HasMore = HasMore, + // After = After, + // NextAfter = NextAfter, Data = dataObjects }; diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 09136bc..9416075 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -7,7 +7,7 @@ public class PowerSyncSetup { - private const string VERSION = "0.3.14"; + private const string VERSION = "0.4.0"; private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/co/powersync/powersync-sqlite-core/{VERSION}"; From f731d70381c43736e7772b04bdf875959a61d5c7 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 1 Dec 2025 09:57:36 +0200 Subject: [PATCH 04/22] Initial rust sync support. --- .../Client/PowerSyncDatabase.cs | 8 +- .../Sync/Bucket/BucketStorageAdapter.cs | 5 + .../Client/Sync/Bucket/SqliteBucketStorage.cs | 65 +- .../Client/Sync/Bucket/SyncDataBucket.cs | 14 +- .../Client/Sync/Stream/CoreInstructions.cs | 129 ++++ .../Client/Sync/Stream/Remote.cs | 31 + .../Stream/StreamingSyncImplementation.cs | 669 ++++++++++++------ .../PowerSync.Common/DB/Crud/SyncProgress.cs | 81 +++ .../PowerSync.Common/DB/Crud/SyncStatus.cs | 121 +++- .../MDSQLite/MDSQLiteAdapter.cs | 12 +- .../MDSQLite/MDSQLiteConnection.cs | 2 +- .../PowerSync.Common.IntegrationTests.csproj | 6 + .../{SyncTests.cs => SyncIntegrationTests.cs} | 16 +- Tools/Setup/Setup.cs | 6 +- 14 files changed, 859 insertions(+), 306 deletions(-) create mode 100644 PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs create mode 100644 PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs rename Tests/PowerSync/PowerSync.Common.IntegrationTests/{SyncTests.cs => SyncIntegrationTests.cs} (93%) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 835e75c..7dc0b77 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -60,11 +60,11 @@ public interface IPowerSyncDatabase : IEventStream Task Execute(string query, object[]? parameters = null); - Task GetAll(string sql, params object[]? parameters); + Task GetAll(string sql, object[]? parameters = null); - Task GetOptional(string sql, params object[]? parameters); + Task GetOptional(string sql, object[]? parameters = null); - Task Get(string sql, params object[]? parameters); + Task Get(string sql, object[]? parameters = null); Task ReadLock(Func> fn, DBLockOptions? options = null); @@ -236,7 +236,7 @@ protected async Task UpdateHasSynced() ); DateTime? lastCompleteSync = null; - + // TODO: Will be altered/extended when reporting individual sync priority statuses is supported foreach (var result in results) { diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index aa76d58..ba94eb7 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -113,4 +113,9 @@ public interface IBucketStorageAdapter : IEventStream /// Get a unique client ID. /// Task GetClientId(); + + /// + /// Invokes the `powersync_control` function for the sync client. + /// + Task Control(string op, object? payload); } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index c6e25ec..4ec5e51 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -351,16 +351,18 @@ public async Task UpdateLocalTarget(Func> callback) if (seqAfter != seqBefore) { - logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, seqBefore); + logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, + seqBefore); return false; } var response = await tx.Execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [opId] - ); + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", + [opId] + ); - logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", JsonConvert.SerializeObject(response)); + logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", + JsonConvert.SerializeObject(response)); return true; }); } @@ -388,33 +390,33 @@ public async Task UpdateLocalTarget(Func> callback) var last = all[all.Length - 1]; return new CrudBatch( - Crud: all, - HaveMore: true, - CompleteCallback: async (string? writeCheckpoint) => - { - await db.WriteTransaction(async tx => + Crud: all, + HaveMore: true, + CompleteCallback: async (string? writeCheckpoint) => { - await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]); - - if (!string.IsNullOrEmpty(writeCheckpoint)) + await db.WriteTransaction(async tx => { - var crudResult = await tx.GetAll("SELECT 1 FROM ps_crud LIMIT 1"); - if (crudResult?.Length > 0) + await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]); + + if (!string.IsNullOrEmpty(writeCheckpoint)) + { + var crudResult = await tx.GetAll("SELECT 1 FROM ps_crud LIMIT 1"); + if (crudResult?.Length > 0) + { + await tx.Execute( + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", + [writeCheckpoint]); + } + } + else { await tx.Execute( "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [writeCheckpoint]); + [GetMaxOpId()]); } - } - else - { - await tx.Execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", - [GetMaxOpId()]); - } - }); - } - ); + }); + } + ); } public async Task NextCrudItem() @@ -434,4 +436,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint) // No Op await Task.CompletedTask; } + + record ControlResult(string? r); + + public async Task Control(string op, object? payload = null) + { + return await db.WriteTransaction(async tx => + { + var result = await tx.Get("SELECT powersync_control(?, ?) AS r", [op, payload ?? ""]); + return result.r!; + }); + } } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs index 8c9fea5..5499f4b 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs @@ -24,18 +24,12 @@ public class SyncDataBucketJSON public class SyncDataBucket( string bucket, - OplogEntry[] data, - bool hasMore = false - // string? after = null, - // string? nextAfter = null + OplogEntry[] data ) { public string Bucket { get; private set; } = bucket; public OplogEntry[] Data { get; private set; } = data; - // public string? After { get; private set; } = after; - // public string? NextAfter { get; private set; } = nextAfter; - public static SyncDataBucket FromRow(SyncDataBucketJSON row) { var dataEntries = row.Data != null @@ -48,9 +42,6 @@ public static SyncDataBucket FromRow(SyncDataBucketJSON row) return new SyncDataBucket( row.Bucket, dataEntries - // row.HasMore ?? false, - // row.After, - // row.NextAfter ); } @@ -64,9 +55,6 @@ public string ToJSON() var jsonObject = new SyncDataBucketJSON { Bucket = Bucket, - // HasMore = HasMore, - // After = After, - // NextAfter = NextAfter, Data = dataObjects }; diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs new file mode 100644 index 0000000..7bfef98 --- /dev/null +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs @@ -0,0 +1,129 @@ +using Newtonsoft.Json.Linq; +using Newtonsoft.Json; + +namespace PowerSync.Common.Client.Sync.Stream; + +/// +/// An internal instruction emitted by the sync client in the core extension in response to the +/// SDK passing sync data into the extension. +/// +public abstract class Instruction +{ + + public static Instruction[] ParseInstructions(string rawResponse) + { + var jsonArray = JArray.Parse(rawResponse); + List instructions = []; + + foreach (JObject item in jsonArray) + { + var instruction = ParseInstruction(item); + if (instruction == null) + { + throw new JsonSerializationException("Failed to parse instruction from JSON."); + } + instructions.Add(instruction); + } + + return instructions.ToArray(); + } + + public static Instruction? ParseInstruction(JObject json) + { + if (json.ContainsKey("LogLine")) + return json["LogLine"]!.ToObject(); + if (json.ContainsKey("UpdateSyncStatus")) + return json["UpdateSyncStatus"]!.ToObject(); + if (json.ContainsKey("EstablishSyncStream")) + return json["EstablishSyncStream"]!.ToObject(); + if (json.ContainsKey("FetchCredentials")) + return json["FetchCredentials"]!.ToObject(); + if (json.ContainsKey("CloseSyncStream")) + return new CloseSyncStream(); + if (json.ContainsKey("FlushFileSystem")) + return new FlushFileSystem(); + if (json.ContainsKey("DidCompleteSync")) + return new DidCompleteSync(); + + throw new JsonSerializationException("Unknown Instruction type."); + } +} + +public class LogLine : Instruction +{ + [JsonProperty("severity")] + public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING" + + [JsonProperty("line")] + public string Line { get; set; } = null!; +} + +public class EstablishSyncStream : Instruction +{ + [JsonProperty("request")] + public StreamingSyncRequest Request { get; set; } = null!; +} + +public class UpdateSyncStatus : Instruction +{ + [JsonProperty("status")] + public CoreSyncStatus Status { get; set; } = null!; +} + +public class CoreSyncStatus +{ + [JsonProperty("connected")] + public bool Connected { get; set; } + + [JsonProperty("connecting")] + public bool Connecting { get; set; } + + [JsonProperty("priority_status")] + public List PriorityStatus { get; set; } = []; + + [JsonProperty("downloading")] + public DownloadProgress? Downloading { get; set; } +} + +public class SyncPriorityStatus +{ + [JsonProperty("priority")] + public int Priority { get; set; } + + [JsonProperty("last_synced_at")] + public long LastSyncedAt { get; set; } + + [JsonProperty("has_synced")] + public bool? HasSynced { get; set; } +} + +public class DownloadProgress +{ + [JsonProperty("buckets")] + public Dictionary Buckets { get; set; } = null!; +} + +public class BucketProgress +{ + [JsonProperty("priority")] + public int Priority { get; set; } + + [JsonProperty("at_last")] + public int AtLast { get; set; } + + [JsonProperty("since_last")] + public int SinceLast { get; set; } + + [JsonProperty("target_count")] + public int TargetCount { get; set; } +} + +public class FetchCredentials : Instruction +{ + [JsonProperty("did_expire")] + public bool DidExpire { get; set; } +} + +public class CloseSyncStream : Instruction { } +public class FlushFileSystem : Instruction { } +public class DidCompleteSync : Instruction { } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs index f47228f..d26795f 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs @@ -124,6 +124,37 @@ public async Task Get(string path, Dictionary? headers = n return JsonConvert.DeserializeObject(responseData)!; } + /// + /// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line. + /// + public async Task PostStreamRaw(SyncStreamOptions options) + { + var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers); + var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken); + + if (response.Content == null) + { + throw new HttpRequestException($"HTTP {response.StatusCode}: No content"); + } + + if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized) + { + InvalidateCredentials(); + } + + if (!response.IsSuccessStatusCode) + { + var errorText = await response.Content.ReadAsStringAsync(); + throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}"); + } + + return await response.Content.ReadAsStreamAsync(); + } + + + /// + /// Originally used for the C# streaming sync implementation. + /// public async IAsyncEnumerable PostStream(SyncStreamOptions options) { using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers); diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index c2c56fb..b3e8366 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -1,6 +1,7 @@ namespace PowerSync.Common.Client.Sync.Stream; using System.Net.Sockets; +using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -10,6 +11,30 @@ namespace PowerSync.Common.Client.Sync.Stream; using PowerSync.Common.DB.Crud; using PowerSync.Common.Utils; + + +public enum SyncClientImplementation +{ + + /// + /// This implementation offloads the sync line decoding and handling into the PowerSync core extension. + /// + /// ## Compatibility warning + /// + /// The Rust sync client stores sync data in a format that is slightly different than the one used + /// by the old C# implementation. When adopting the RUST client on existing + /// databases, the PowerSync SDK will migrate the format automatically. + RUST, + /// + /// Decodes and handles sync lines received from the sync service in C#. + /// + /// This is the legacy option. + /// + /// The explicit choice to use the C#-based sync implementation will be removed from a future version of the SDK. + /// + C_SHARP +} + public class AdditionalConnectionOptions(int? retryDelayMs = null, int? crudUploadThrottleMs = null) { /// @@ -48,17 +73,24 @@ public class StreamingSyncImplementationOptions : AdditionalConnectionOptions public ILogger? Logger { get; init; } } -public class BaseConnectionOptions(Dictionary? parameters = null) +public class BaseConnectionOptions(Dictionary? parameters = null, SyncClientImplementation? clientImplementation = null) { /// /// These parameters are passed to the sync rules and will be available under the `user_parameters` object. /// public Dictionary? Params { get; set; } = parameters; + + /// + /// Wehether to use the RUST or C# sync client implementation. + /// + public SyncClientImplementation? ClientImplementation { get; set; } = clientImplementation; } public class RequiredPowerSyncConnectionOptions : BaseConnectionOptions { public new Dictionary Params { get; set; } = new(); + + public new SyncClientImplementation ClientImplementation { get; set; } = new(); } public class StreamingSyncImplementationEvent @@ -95,7 +127,8 @@ public class StreamingSyncImplementation : EventStream { - if (!SyncStatus.Connected || SyncStatus.DataFlowStatus.Uploading) + if (!SyncStatus.Connected) { return; } + notifyCompletedUploads?.Invoke(); + Task.Run(async () => await InternalUploadAllCrud()); }; } @@ -343,8 +378,11 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio protected record StreamingSyncIterationResult { public bool Retry { get; init; } + + public bool? ImmediateRestart { get; init; } } + protected async Task StreamingSyncIteration(CancellationToken signal, PowerSyncConnectionOptions? options) { return await locks.ObtainLock(new LockOptions @@ -355,284 +393,451 @@ protected async Task StreamingSyncIteration(Cancel { var resolvedOptions = new RequiredPowerSyncConnectionOptions { - Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params + Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params, + ClientImplementation = options?.ClientImplementation ?? DEFAULT_STREAM_CONNECTION_OPTIONS.ClientImplementation }; + Console.WriteLine("Using sync client implementation: " + resolvedOptions.ClientImplementation); - logger.LogDebug("Streaming sync iteration started"); - Options.Adapter.StartSession(); - var bucketEntries = await Options.Adapter.GetBucketStates(); - var initialBuckets = new Dictionary(); + if (resolvedOptions.ClientImplementation == SyncClientImplementation.RUST) + { + // Use Rust-based sync implementation + // var rustImpl = new RustStreamingSyncImplementation(Options); + // return await rustImpl.RustSyncIteration(signal, resolvedOptions); - foreach (var entry in bucketEntries) + return await RustStreamingSyncIteration(signal, resolvedOptions); + } + else { - initialBuckets[entry.Bucket] = entry.OpId; + // Use legacy C#-based sync implementation + return await LegacyStreamingSyncIteration(signal, resolvedOptions); } + } + }); + } - var req = initialBuckets - .Select(kvp => new BucketRequest - { - Name = kvp.Key, - After = kvp.Value - }) - .ToList(); + // protected async Task RustStreamingSyncIteration(StreamingSyncImplementationOptions options, CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) + // { + // // var rustImpl = new RustStreamingSyncImplementation(options); + // // return await rustImpl.RustSyncIteration(signal, resolvedOptions); + // throw new NotImplementedException("Rust streaming sync implementation is not yet implemented."); + // } - var targetCheckpoint = (Checkpoint?)null; - var validatedCheckpoint = (Checkpoint?)null; - var appliedCheckpoint = (Checkpoint?)null; + protected async Task RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions) + { + Task? receivingLines = null; - var bucketSet = new HashSet(initialBuckets.Keys); + var nestedCts = new CancellationTokenSource(); + signal?.Register(() => { nestedCts.Cancel(); }); - var clientId = await Options.Adapter.GetClientId(); + async Task Connect(EstablishSyncStream instruction) + { + var syncOptions = new SyncStreamOptions + { + Path = "/sync/stream", + CancellationToken = nestedCts.Token, + Data = instruction.Request + }; - logger.LogDebug("Requesting stream from server"); + var stream = await Options.Remote.PostStreamRaw(syncOptions); + using var reader = new StreamReader(stream, Encoding.UTF8); - var syncOptions = new SyncStreamOptions - { - Path = "/sync/stream", - CancellationToken = signal, - Data = new StreamingSyncRequest - { - Buckets = req, - IncludeChecksum = true, - RawData = true, - Parameters = resolvedOptions.Params, // Replace with actual params - ClientId = clientId - } - }; + syncOptions.CancellationToken.Register(() => + { + try { stream?.Close(); } catch { } + }); - var stream = Options.Remote.PostStream(syncOptions); - var first = true; - await foreach (var line in stream) - { - if (first) - { - first = false; - logger.LogDebug("Stream established. Processing events"); - } + string? line; - if (line == null) - { - logger.LogDebug("Stream has closed while waiting"); - // The stream has closed while waiting - return new StreamingSyncIterationResult { Retry = true }; - } + while ((line = await reader.ReadLineAsync()) != null) + { + logger.LogDebug("Parsing line for rust sync stream {message}", "xx"); + await Control("line_text", line); + } + } + + async Task Stop() + { + await Control("stop"); + } - // A connection is active and messages are being received - if (!SyncStatus.Connected) + async Task Control(string op, object? payload = null) + { + logger.LogDebug("Control call {message}", op); + + var rawResponse = await Options.Adapter.Control(op, payload); + HandleInstructions(Instruction.ParseInstructions(rawResponse)); + } + + void HandleInstructions(Instruction[] instructions) + { + foreach (var instruction in instructions) + { + HandleInstruction(instruction); + } + } + + void HandleInstruction(Instruction instruction) + { + switch (instruction) + { + case LogLine logLine: + switch (logLine.Severity) { - // There is a connection now - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true - }); - TriggerCrudUpload(); + case "DEBUG": + logger.LogDebug("{message}", logLine.Line); + break; + case "INFO": + logger.LogInformation("{message}", logLine.Line); + break; + case "WARNING": + logger.LogWarning("{message}", logLine.Line); + break; } + break; + case UpdateSyncStatus syncStatus: + var info = syncStatus.Status; + var coreCompleteSync = + info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY); + var completeSync = coreCompleteSync != null ? CoreStatusToSyncStatus(coreCompleteSync) : null; - if (line is StreamingSyncCheckpoint syncCheckpoint) + UpdateSyncStatus(new SyncStatusOptions { - logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); - - targetCheckpoint = syncCheckpoint.Checkpoint; - var bucketsToDelete = new HashSet(bucketSet); - var newBuckets = new HashSet(); - - foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) + Connected = info.Connected, + Connecting = info.Connecting, + LastSyncedAt = completeSync?.LastSyncedAt, + HasSynced = completeSync?.HasSynced, + PriorityStatusEntries = info.PriorityStatus.Select(CoreStatusToSyncStatus).ToArray(), + DataFlow = new SyncDataFlowStatus { - newBuckets.Add(checksum.Bucket); - bucketsToDelete.Remove(checksum.Bucket); + Downloading = info.Downloading != null, + DownloadProgress = info.Downloading?.Buckets } - if (bucketsToDelete.Count > 0) + }, + new UpdateSyncStatusOptions { - logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); + ClearDownloadError = true, } - - bucketSet = newBuckets; - await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); - } - else if (line is StreamingSyncCheckpointComplete checkpointComplete) + ); + break; + case EstablishSyncStream establishSyncStream: + if (receivingLines != null) { - logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); + throw new Exception("Unexpected request to establish sync stream, already connected"); + } + receivingLines = Connect(establishSyncStream); + break; + case FetchCredentials fetchCredentials: + Options.Remote.InvalidateCredentials(); + break; + case CloseSyncStream: + nestedCts.Cancel(); + logger.LogWarning("Closing stream"); + break; + case FlushFileSystem: + // ignore + break; + case DidCompleteSync: + UpdateSyncStatus( + new SyncStatusOptions { }, + new UpdateSyncStatusOptions { ClearDownloadError = true }); + break; + } + } - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + try + { + notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); }; + await Control("start", JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params })); + if (receivingLines != null) + { + await receivingLines; + } + } + finally + { + notifyCompletedUploads = null; + await Stop(); + } - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = true }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // Landing here the whole time - } - else - { - appliedCheckpoint = targetCheckpoint; - logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); + return new StreamingSyncIterationResult { Retry = true }; + } - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false - } - }, new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); + protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) + { + // There's also a warning explaining that the legacy client will be removed in the future + logger.LogWarning("The legacy sync client implementation is deprecated and will be removed in a future release."); + logger.LogDebug("Streaming sync iteration started"); + Options.Adapter.StartSession(); + var bucketEntries = await Options.Adapter.GetBucketStates(); + var initialBuckets = new Dictionary(); + + foreach (var entry in bucketEntries) + { + initialBuckets[entry.Bucket] = entry.OpId; + } - } + var req = initialBuckets + .Select(kvp => new BucketRequest + { + Name = kvp.Key, + After = kvp.Value + }) + .ToList(); - validatedCheckpoint = targetCheckpoint; - } - else if (line is StreamingSyncCheckpointDiff checkpointDiff) + var targetCheckpoint = (Checkpoint?)null; + var validatedCheckpoint = (Checkpoint?)null; + var appliedCheckpoint = (Checkpoint?)null; + + var bucketSet = new HashSet(initialBuckets.Keys); + + var clientId = await Options.Adapter.GetClientId(); + + logger.LogDebug("Requesting stream from server"); + + var syncOptions = new SyncStreamOptions + { + Path = "/sync/stream", + CancellationToken = signal, + Data = new StreamingSyncRequest + { + Buckets = req, + IncludeChecksum = true, + RawData = true, + Parameters = resolvedOptions.Params, // Replace with actual params + ClientId = clientId + } + }; + + var stream = Options.Remote.PostStream(syncOptions); + var first = true; + await foreach (var line in stream) + { + if (first) + { + first = false; + logger.LogDebug("Stream established. Processing events"); + } + + if (line == null) + { + logger.LogDebug("Stream has closed while waiting"); + // The stream has closed while waiting + return new StreamingSyncIterationResult { Retry = true }; + } + + // A connection is active and messages are being received + if (!SyncStatus.Connected) + { + // There is a connection now + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true + }); + TriggerCrudUpload(); + } + + if (line is StreamingSyncCheckpoint syncCheckpoint) + { + logger.LogDebug("Sync checkpoint: {message}", syncCheckpoint); + + targetCheckpoint = syncCheckpoint.Checkpoint; + var bucketsToDelete = new HashSet(bucketSet); + var newBuckets = new HashSet(); + + foreach (var checksum in syncCheckpoint.Checkpoint.Buckets) + { + newBuckets.Add(checksum.Bucket); + bucketsToDelete.Remove(checksum.Bucket); + } + if (bucketsToDelete.Count > 0) + { + logger.LogDebug("Removing buckets: {message}", string.Join(", ", bucketsToDelete)); + } + + bucketSet = newBuckets; + await Options.Adapter.RemoveBuckets([.. bucketsToDelete]); + await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + } + else if (line is StreamingSyncCheckpointComplete checkpointComplete) + { + logger.LogDebug("Checkpoint complete: {message}", targetCheckpoint); + + var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + + if (!result.CheckpointValid) + { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + await Task.Delay(50); + return new StreamingSyncIterationResult { Retry = true }; + } + else if (!result.Ready) + { + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. + // Landing here the whole time + } + else + { + appliedCheckpoint = targetCheckpoint; + logger.LogDebug("Validated checkpoint: {message}", appliedCheckpoint); + + UpdateSyncStatus(new SyncStatusOptions { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) + Connected = true, + LastSyncedAt = DateTime.Now, + DataFlow = new SyncDataFlowStatus { - throw new Exception("Checkpoint diff without previous checkpoint"); + Downloading = false } + }, new UpdateSyncStatusOptions + { + ClearDownloadError = true + }); - var diff = checkpointDiff.CheckpointDiff; - var newBuckets = new Dictionary(); + } - foreach (var checksum in targetCheckpoint.Buckets) - { - newBuckets[checksum.Bucket] = checksum; - } + validatedCheckpoint = targetCheckpoint; + } + else if (line is StreamingSyncCheckpointDiff checkpointDiff) + { + // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint + if (targetCheckpoint == null) + { + throw new Exception("Checkpoint diff without previous checkpoint"); + } - foreach (var checksum in diff.UpdatedBuckets) - { - newBuckets[checksum.Bucket] = checksum; - } + var diff = checkpointDiff.CheckpointDiff; + var newBuckets = new Dictionary(); - foreach (var bucket in diff.RemovedBuckets) - { - newBuckets.Remove(bucket); - } + foreach (var checksum in targetCheckpoint.Buckets) + { + newBuckets[checksum.Bucket] = checksum; + } - var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; - var newCheckpoint = new Checkpoint - { - LastOpId = diff.LastOpId, - Buckets = [.. newBuckets.Values], - WriteCheckpoint = newWriteCheckpoint - }; + foreach (var checksum in diff.UpdatedBuckets) + { + newBuckets[checksum.Bucket] = checksum; + } + + foreach (var bucket in diff.RemovedBuckets) + { + newBuckets.Remove(bucket); + } + + var newWriteCheckpoint = !string.IsNullOrEmpty(diff.WriteCheckpoint) ? diff.WriteCheckpoint : null; + var newCheckpoint = new Checkpoint + { + LastOpId = diff.LastOpId, + Buckets = [.. newBuckets.Values], + WriteCheckpoint = newWriteCheckpoint + }; - targetCheckpoint = newCheckpoint; + targetCheckpoint = newCheckpoint; - bucketSet = [.. newBuckets.Keys]; + bucketSet = [.. newBuckets.Keys]; - var bucketsToDelete = diff.RemovedBuckets.ToArray(); - if (bucketsToDelete.Length > 0) - { - logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); - } + var bucketsToDelete = diff.RemovedBuckets.ToArray(); + if (bucketsToDelete.Length > 0) + { + logger.LogDebug("Remove buckets: {message}", string.Join(", ", bucketsToDelete)); + } + + // Perform async operations + await Options.Adapter.RemoveBuckets(bucketsToDelete); + await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + } + else if (line is StreamingSyncDataJSON dataJSON) + { + UpdateSyncStatus(new SyncStatusOptions + { + DataFlow = new SyncDataFlowStatus + { + Downloading = true + } + }); + await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); + } + else if (line is StreamingSyncKeepalive keepalive) + { + var remainingSeconds = keepalive.TokenExpiresIn; + if (remainingSeconds == 0) + { + // Connection would be closed automatically right after this + logger.LogDebug("Token expiring; reconnect"); + Options.Remote.InvalidateCredentials(); + + // For a rare case where the backend connector does not update the token + // (uses the same one), this should have some delay. + // + await DelayRetry(); + return new StreamingSyncIterationResult { Retry = true }; + } + else if (remainingSeconds < 30) + { + logger.LogDebug("Token will expire soon; reconnect"); + // Pre-emptively refresh the token + Options.Remote.InvalidateCredentials(); + return new StreamingSyncIterationResult { Retry = true }; + } + TriggerCrudUpload(); + } + else + { + logger.LogDebug("Sync complete"); - // Perform async operations - await Options.Adapter.RemoveBuckets(bucketsToDelete); - await Options.Adapter.SetTargetCheckpoint(targetCheckpoint); + if (targetCheckpoint == appliedCheckpoint) + { + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true, + LastSyncedAt = DateTime.Now, + }, + new UpdateSyncStatusOptions + { + ClearDownloadError = true } - else if (line is StreamingSyncDataJSON dataJSON) + ); + } + else if (validatedCheckpoint == targetCheckpoint) + { + var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); + if (!result.CheckpointValid) { - UpdateSyncStatus(new SyncStatusOptions - { - DataFlow = new SyncDataFlowStatus - { - Downloading = true - } - }); - await Options.Adapter.SaveSyncData(new SyncDataBatch([SyncDataBucket.FromRow(dataJSON.Data)])); + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + await Task.Delay(50); + return new StreamingSyncIterationResult { Retry = false }; } - else if (line is StreamingSyncKeepalive keepalive) + else if (!result.Ready) { - var remainingSeconds = keepalive.TokenExpiresIn; - if (remainingSeconds == 0) - { - // Connection would be closed automatically right after this - logger.LogDebug("Token expiring; reconnect"); - Options.Remote.InvalidateCredentials(); - - // For a rare case where the backend connector does not update the token - // (uses the same one), this should have some delay. - // - await DelayRetry(); - return new StreamingSyncIterationResult { Retry = true }; - } - else if (remainingSeconds < 30) - { - logger.LogDebug("Token will expire soon; reconnect"); - // Pre-emptively refresh the token - Options.Remote.InvalidateCredentials(); - return new StreamingSyncIterationResult { Retry = true }; - } - TriggerCrudUpload(); + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. } else { - logger.LogDebug("Sync complete"); - - if (targetCheckpoint == appliedCheckpoint) + appliedCheckpoint = targetCheckpoint; + UpdateSyncStatus(new SyncStatusOptions { - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - }, - new UpdateSyncStatusOptions + Connected = true, + LastSyncedAt = DateTime.Now, + DataFlow = new SyncDataFlowStatus { - ClearDownloadError = true + Downloading = false, } - ); - } - else if (validatedCheckpoint == targetCheckpoint) + }, + new UpdateSyncStatusOptions { - var result = await Options.Adapter.SyncLocalDatabase(targetCheckpoint!); - if (!result.CheckpointValid) - { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = false }; - } - else if (!result.Ready) - { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - } - else - { - appliedCheckpoint = targetCheckpoint; - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true, - LastSyncedAt = DateTime.Now, - DataFlow = new SyncDataFlowStatus - { - Downloading = false, - } - }, - new UpdateSyncStatusOptions - { - ClearDownloadError = true - }); - } - } + ClearDownloadError = true + }); } } - - logger.LogDebug("Stream input empty"); - // Connection closed. Likely due to auth issue. - return new StreamingSyncIterationResult { Retry = true }; } - }); + } + logger.LogDebug("Stream input empty"); + // Connection closed. Likely due to auth issue. + return new StreamingSyncIterationResult { Retry = true }; } public new void Close() @@ -781,6 +986,16 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); } + private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) + { + return new DB.Crud.SyncPriorityStatus + { + Priority = status.Priority, + HasSynced = status.HasSynced ?? null, + LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null + }; + } + private async Task DelayRetry() { if (Options.RetryDelayMs.HasValue) diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs new file mode 100644 index 0000000..b709a2c --- /dev/null +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncProgress.cs @@ -0,0 +1,81 @@ +using PowerSync.Common.Client.Sync.Stream; + +namespace PowerSync.Common.DB.Crud; + +/// +/// Provides realtime progress on how PowerSync is downloading rows. +/// +/// The reported progress always reflects the status towards th end of a sync iteration (after +/// which a consistent snapshot of all buckets is available locally). +/// +/// In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets) +/// operation takes place between syncs), it's possible for the returned numbers to be slightly +/// inaccurate. For this reason, the sync progress should be seen as an approximation of progress. +/// The information returned is good enough to build progress bars, but not exact enough to track +/// individual download counts. +/// +/// Also note that data is downloaded in bulk, which means that individual counters are unlikely +/// to be updated one-by-one. +/// +public class SyncProgress : ProgressWithOperations +{ + public static readonly int FULL_SYNC_PRIORITY = 2147483647; + protected Dictionary InternalProgress { get; } + + public SyncProgress(Dictionary progress) + { + this.InternalProgress = progress; + var untilCompletion = UntilPriority(FULL_SYNC_PRIORITY); + + TotalOperations = untilCompletion.TotalOperations; + DownloadedOperations = untilCompletion.DownloadedOperations; + DownloadedFraction = untilCompletion.DownloadedFraction; + } + + public ProgressWithOperations UntilPriority(int priority) + { + var total = 0; + var downloaded = 0; + + foreach (var progress in InternalProgress.Values) + { + // Include higher-priority buckets, which are represented by lower numbers. + if (progress.Priority <= priority) + { + downloaded += progress.SinceLast; + total += progress.TargetCount - progress.AtLast; + } + } + + return new ProgressWithOperations + { + TotalOperations = total, + DownloadedOperations = downloaded, + DownloadedFraction = total == 0 ? 1.0 : (double)downloaded / total + }; + } +} + +/// +/// Information about a progressing download made by the PowerSync SDK. +/// +/// +public class ProgressWithOperations +{ + /// + /// The total number of operations to download for the current sync iteration to complete. + /// + public int TotalOperations { get; set; } + + /// + /// The numnber of operations that have already been downloaded. + /// + public int DownloadedOperations { get; set; } + + /// + /// This will be a number between 0.0 and 1.0 (inclusive). + /// + /// When this number reaches 1.0, all changes have been received from the sync service. + /// + public double DownloadedFraction { get; set; } +} \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index 4107a59..7416816 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -1,39 +1,49 @@ namespace PowerSync.Common.DB.Crud; +using PowerSync.Common.Client.Sync.Stream; using Newtonsoft.Json; public class SyncDataFlowStatus { - [JsonProperty("downloading")] - public bool Downloading { get; set; } = false; + [JsonProperty("downloading")] public bool Downloading { get; set; } = false; - [JsonProperty("uploading")] - public bool Uploading { get; set; } = false; + [JsonProperty("uploading")] public bool Uploading { get; set; } = false; - [JsonProperty("downloadError")] - public string? DownloadErrorMessage => DownloadError?.Message; - - [JsonProperty("uploadError")] - public string? UploadErrorMessage => UploadError?.Message; - /// /// Error during downloading (including connecting). /// Cleared on the next successful data download. /// - [JsonIgnore] + [JsonProperty("downloadError")] public Exception? DownloadError { get; set; } = null; /// /// Error during uploading. /// Cleared on the next successful upload. /// - [JsonIgnore] + [JsonProperty("uploadError")] public Exception? UploadError { get; set; } = null; + + + /// + /// Internal information about how far we are downloading operations in buckets. + /// + public Dictionary? DownloadProgress { get; set; } = null; +} + +public class SyncPriorityStatus +{ + [JsonProperty("uploading")] public int Priority { get; set; } + + [JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; } + + [JsonProperty("hasSynced")] public bool? HasSynced { get; set; } } public class SyncStatusOptions { - public SyncStatusOptions() { } + public SyncStatusOptions() + { + } public SyncStatusOptions(SyncStatusOptions options) { @@ -42,22 +52,21 @@ public SyncStatusOptions(SyncStatusOptions options) DataFlow = options.DataFlow; LastSyncedAt = options.LastSyncedAt; HasSynced = options.HasSynced; + PriorityStatusEntries = options.PriorityStatusEntries; } - [JsonProperty("connected")] - public bool? Connected { get; set; } + [JsonProperty("connected")] public bool? Connected { get; set; } + + [JsonProperty("connecting")] public bool? Connecting { get; set; } - [JsonProperty("connecting")] - public bool? Connecting { get; set; } + [JsonProperty("dataFlow")] public SyncDataFlowStatus? DataFlow { get; set; } - [JsonProperty("dataFlow")] - public SyncDataFlowStatus? DataFlow { get; set; } + [JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; } - [JsonProperty("lastSyncedAt")] - public DateTime? LastSyncedAt { get; set; } + [JsonProperty("hasSynced")] public bool? HasSynced { get; set; } - [JsonProperty("hasSynced")] - public bool? HasSynced { get; set; } + [JsonProperty("priorityStatusEntries")] + public SyncPriorityStatus[]? PriorityStatusEntries { get; set; } } public class SyncStatus(SyncStatusOptions options) @@ -85,6 +94,63 @@ public class SyncStatus(SyncStatusOptions options) /// public SyncDataFlowStatus DataFlowStatus => Options.DataFlow ?? new SyncDataFlowStatus(); + /// + /// Provides sync status information for all bucket priorities, sorted by priority (highest first). + /// + public SyncPriorityStatus[] PriorityStatusEntries => + (Options.PriorityStatusEntries ?? []) + .OrderBy(entry => entry.Priority) + .ToArray(); + + /// + /// A realtime progress report on how many operations have been downloaded and + /// how many are necessary in total to complete the next sync iteration. + /// + public SyncProgress? DownloadProgress() + { + var internalProgress = Options.DataFlow?.DownloadProgress; + if (internalProgress == null) + { + return null; + } + + return new SyncProgress(internalProgress); + } + + /// + /// Reports the sync status (a pair of HasSynced and LastSyncedAt fields) + /// for a specific bucket priority level. + /// + /// When buckets with different priorities are declared, PowerSync may choose to synchronize higher-priority + /// buckets first. When a consistent view over all buckets for all priorities up until the given priority is + /// reached, PowerSync makes data from those buckets available before lower-priority buckets have finished + /// syncing. + /// + /// This method returns the status for the requested priority or the next higher priority level that has + /// status information available. This is because when PowerSync makes data for a given priority available, + /// all buckets in higher-priorities are guaranteed to be consistent with that checkpoint. + /// For example, if PowerSync just finished synchronizing buckets in priority level 3, calling this method + /// with a priority of 1 may return information for priority level 3. + /// + public SyncPriorityStatus StatusForPriority(int priority) + { + foreach (var known in PriorityStatusEntries) + { + if (known.Priority >= priority) + { + return known; + } + } + + // Fallback if no matching or higher-priority entry is found + return new SyncPriorityStatus + { + Priority = priority, + LastSyncedAt = LastSyncedAt, + HasSynced = HasSynced + }; + } + public bool IsEqual(SyncStatus status) { return JsonConvert.SerializeObject(Options) == JsonConvert.SerializeObject(status.Options); @@ -93,11 +159,18 @@ public bool IsEqual(SyncStatus status) public string GetMessage() { var dataFlow = DataFlowStatus; - return $"SyncStatus"; + return + $"SyncStatus"; } public string ToJSON() { return JsonConvert.SerializeObject(this); } + + private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) + { + // Lower numbers = higher priority + return a.Priority.CompareTo(b.Priority); + } } \ No newline at end of file diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs index 0b4c7bb..1ca43a0 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs @@ -155,18 +155,18 @@ public Task ExecuteBatch(string query, object[][]? parameters = nul throw new NotImplementedException(); } - public async Task Get(string sql, params object[]? parameters) + public async Task Get(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.Get(sql, parameters)); ; } - public async Task GetAll(string sql, params object[]? parameters) + public async Task GetAll(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.GetAll(sql, parameters)); } - public async Task GetOptional(string sql, params object[]? parameters) + public async Task GetOptional(string sql, object[]? parameters = null) { return await ReadLock((ctx) => ctx.GetOptional(sql, parameters)); } @@ -307,17 +307,17 @@ public Task Execute(string query, object[]? parameters = null) return connection.Execute(query, parameters); } - public Task Get(string sql, params object[]? parameters) + public Task Get(string sql, object[]? parameters = null) { return connection.Get(sql, parameters); } - public Task GetAll(string sql, params object[]? parameters) + public Task GetAll(string sql, object[]? parameters = null) { return connection.GetAll(sql, parameters); } - public Task GetOptional(string sql, params object[]? parameters) + public Task GetOptional(string sql, object[]? parameters = null) { return connection.GetOptional(sql, parameters); } diff --git a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs index 16c5c30..6c73c8b 100644 --- a/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs +++ b/PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs @@ -142,7 +142,7 @@ public async Task ExecuteQuery(string query, object[]? parameters = var row = new Dictionary(); for (int i = 0; i < reader.FieldCount; i++) { - row[reader.GetName(i)] = reader.IsDBNull(i) ? null : reader.GetValue(i); + row[reader.GetName(i)] = reader.IsDBNull(i) ? null! : reader.GetValue(i); } rows.Add(row); } diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj index 06652fd..fc29ad9 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -20,6 +20,12 @@ + + + PreserveNewest + + + diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs similarity index 93% rename from Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs rename to Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index 01b22b8..8e9ad97 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -1,15 +1,16 @@ -using Newtonsoft.Json; +using Newtonsoft.Json; using PowerSync.Common.Client; using System.Data.Common; using System.Diagnostics; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using PowerSync.Common.Client.Sync.Stream; namespace PowerSync.Common.IntegrationTests; [Trait("Category", "Integration")] -public class SyncTests : IAsyncLifetime +public class SyncIntegrationTests : IAsyncLifetime { private record ListResult(string id, string name, string owner_id, string created_at); @@ -46,7 +47,10 @@ public async Task InitializeAsync() Console.WriteLine($"Using User ID: {userId}"); try { - await db.Connect(connector); + await db.Connect(connector, new PowerSyncConnectionOptions + { + ClientImplementation = SyncClientImplementation.C_SHARP, + }); await db.WaitForFirstSync(); } catch (Exception ex) @@ -240,5 +244,11 @@ public IntegrationFactAttribute() { Skip = "Integration tests are disabled. Set RUN_INTEGRATION_TESTS=true to run."; } + + // Set default timeout if not already set + if (Timeout == 0) + { + Timeout = 5000; // 30 seconds default for all integration tests + } } } \ No newline at end of file diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 9416075..81fd4c0 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -7,9 +7,11 @@ public class PowerSyncSetup { - private const string VERSION = "0.4.0"; + private const string VERSION = "0.4.9"; + + // https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v0.4.9/libpowersync_aarch64.android.a private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; - private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/co/powersync/powersync-sqlite-core/{VERSION}"; + private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/com/powersync/powersync-sqlite-core/{VERSION}"; private readonly HttpClient _httpClient; private readonly string _basePath; From e372cf622d619b8d404a0bd8c91b93ca5a61785a Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 1 Dec 2025 13:14:11 +0200 Subject: [PATCH 05/22] Fixed crud tests. Removed boolean parameter from BucketStorageTests. --- .../xunit.runner.json | 5 ++ .../Client/Sync/BucketStorageTests.cs | 60 +++++++++---------- .../Client/Sync/CRUDTests.cs | 16 ++--- 3 files changed, 44 insertions(+), 37 deletions(-) create mode 100644 Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json b/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json new file mode 100644 index 0000000..fcdb064 --- /dev/null +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/xunit.runner.json @@ -0,0 +1,5 @@ +{ + "methodDisplay": "method", + "diagnosticMessages": true, + "longRunningTestSeconds": 10 +} \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs index b19727f..8e8dcb2 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs @@ -130,7 +130,7 @@ public async Task BasicSetup() var initialBucketStates = await bucketStorage.GetBucketStates(); Assert.Empty(initialBucketStates); - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)])); + await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])])); var bucketStates = await bucketStorage.GetBucketStates(); @@ -154,7 +154,7 @@ public async Task ShouldGetObjectFromMultipleBuckets() { await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3])]) ); await SyncLocalChecked(new Checkpoint @@ -176,7 +176,7 @@ public async Task ShouldPrioritizeLaterUpdates() await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_1], false)]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_1])]) ); await SyncLocalChecked(new Checkpoint @@ -193,7 +193,7 @@ public async Task ShouldIgnoreRemoveFromOneBucket() { // When we have 1 PUT and 1 REMOVE, the object must be kept.); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4])]) ); await SyncLocalChecked(new Checkpoint @@ -210,7 +210,7 @@ public async Task ShouldRemoveWhenRemovedFromAllBuckets() { // When we only have REMOVE left for an object, it must be deleted. await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5]), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4])]) ); await SyncLocalChecked(new Checkpoint @@ -250,7 +250,7 @@ public async Task ShouldUseSubkeys() }); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4])]) ); await SyncLocalChecked(new Checkpoint @@ -262,7 +262,7 @@ await SyncLocalChecked(new Checkpoint var result = await db.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); Assert.Equal(new AssetResult("O1", "B", null), result[0]); - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5], false)])); + await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5])])); await SyncLocalChecked(new Checkpoint { @@ -278,7 +278,7 @@ public async Task ShouldFailChecksumValidation() { // Simple checksum validation await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])]) ); var result = await bucketStorage.SyncLocalDatabase(new Checkpoint @@ -304,7 +304,7 @@ public async Task ShouldDeleteBuckets() { await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3])]) ); await bucketStorage.RemoveBuckets(["bucket2"]); @@ -335,7 +335,7 @@ public async Task ShouldDeleteAndRecreateBuckets() { // Save some data await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1])]) ); // Delete the bucket @@ -343,7 +343,7 @@ await bucketStorage.SaveSyncData( // Save some data again await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3])]) ); // Delete again @@ -351,7 +351,7 @@ await bucketStorage.SaveSyncData( // Final save of data await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3])]) ); // Check that the data is there @@ -388,12 +388,12 @@ await bucketStorage.SaveSyncData( Op = new OpType(OpTypeEnum.MOVE).ToJSON(), Checksum = 1 }) - ], false) + ]) ]) ); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false)]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3])]) ); await SyncLocalChecked(new Checkpoint @@ -412,7 +412,7 @@ public async Task ShouldHandleClear() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1]) ]) ); @@ -446,7 +446,7 @@ await bucketStorage.SaveSyncData( ObjectId = TestData.putAsset2_2.ObjectId, ObjectType = TestData.putAsset2_2.ObjectType }) - ], false) + ]) ]) ); @@ -478,7 +478,7 @@ public async Task UpdateWithNewTypes() await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])]) ); await SyncLocalChecked(new Checkpoint @@ -524,7 +524,7 @@ public async Task ShouldRemoveTypes() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -578,7 +578,7 @@ public async Task ShouldCompact() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4]) ]) ); @@ -613,7 +613,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_ServerRemoved() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -676,7 +676,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", Array.Empty(), false) + new SyncDataBucket("bucket1", Array.Empty()) ]) ); @@ -700,7 +700,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_1() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -743,7 +743,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", Array.Empty(), false) + new SyncDataBucket("bucket1", Array.Empty()) ]) ); @@ -769,7 +769,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_2() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -797,7 +797,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [], false) + new SyncDataBucket("bucket1", []) ]) ); @@ -826,7 +826,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_UpdateOnServer() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -865,7 +865,7 @@ await bucketStorage.SaveSyncData( Checksum = 5, Data = new { description = "server updated" } }) - ], false) + ]) ]) ); @@ -889,7 +889,7 @@ public async Task ShouldRevertAFailingInsert() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -931,7 +931,7 @@ public async Task ShouldRevertAFailingDelete() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); @@ -974,7 +974,7 @@ public async Task ShouldRevertAFailingUpdate() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) ]) ); diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index 965cd13..e094639 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -45,8 +45,8 @@ public async Task Insert_RecordCrudEntryTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { description = "test" } }), crudEntry.Data @@ -80,8 +80,8 @@ public async Task InsertOrReplaceTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { description = "test2" } }), crudEntry.Data @@ -112,8 +112,8 @@ public async Task UpdateTest() JsonConvert.SerializeObject(new { op = "PATCH", - type = "assets", id = testId, + type = "assets", data = new { description = "test2" } }), crudEntry.Data @@ -127,6 +127,8 @@ public async Task UpdateTest() { "description", "test2" } }); + Console.WriteLine(JsonConvert.SerializeObject(tx.Crud.First())); + Console.WriteLine(JsonConvert.SerializeObject(expectedCrudEntry)); Assert.True(tx.Crud.First().Equals(expectedCrudEntry)); } @@ -144,8 +146,8 @@ public async Task DeleteTest() JsonConvert.SerializeObject(new { op = "DELETE", + id = testId, type = "assets", - id = testId }), crudEntry.Data ); @@ -234,8 +236,8 @@ public async Task BigNumbersIntegerTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { quantity = bigNumber } }), crudEntry.Data @@ -267,8 +269,8 @@ public async Task BigNumbersTextTest() JsonConvert.SerializeObject(new { op = "PUT", - type = "assets", id = testId, + type = "assets", data = new { quantity = bigNumber.ToString() } }), crudEntry.Data @@ -287,8 +289,8 @@ await db.Execute("UPDATE assets SET description = ?, quantity = CAST(quantity AS JsonConvert.SerializeObject(new { op = "PATCH", - type = "assets", id = testId, + type = "assets", data = new { description = "updated", quantity = bigNumber + 1 } }), crudEntry.Data From 88006ecbce81650eec02af7456f8030b2a0c17b1 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 1 Dec 2025 13:25:41 +0200 Subject: [PATCH 06/22] Revert SyncDataBucket changes. --- .../Client/Sync/Bucket/SyncDataBucket.cs | 29 ++++++--- .../Client/Sync/BucketStorageTests.cs | 60 +++++++++---------- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs index 5499f4b..7699778 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SyncDataBucket.cs @@ -9,14 +9,14 @@ public class SyncDataBucketJSON [JsonProperty("bucket")] public string Bucket { get; set; } = null!; - // [JsonProperty("has_more")] - // public bool? HasMore { get; set; } + [JsonProperty("has_more")] + public bool? HasMore { get; set; } - // [JsonProperty("after")] - // public string? After { get; set; } + [JsonProperty("after")] + public string? After { get; set; } - // [JsonProperty("next_after")] - // public string? NextAfter { get; set; } + [JsonProperty("next_after")] + public string? NextAfter { get; set; } [JsonProperty("data")] public List Data { get; set; } = []; @@ -24,11 +24,16 @@ public class SyncDataBucketJSON public class SyncDataBucket( string bucket, - OplogEntry[] data - ) + OplogEntry[] data, + bool hasMore, + string? after = null, + string? nextAfter = null) { public string Bucket { get; private set; } = bucket; public OplogEntry[] Data { get; private set; } = data; + public bool HasMore { get; private set; } = hasMore; + public string? After { get; private set; } = after; + public string? NextAfter { get; private set; } = nextAfter; public static SyncDataBucket FromRow(SyncDataBucketJSON row) { @@ -41,7 +46,10 @@ public static SyncDataBucket FromRow(SyncDataBucketJSON row) return new SyncDataBucket( row.Bucket, - dataEntries + dataEntries, + row.HasMore ?? false, + row.After, + row.NextAfter ); } @@ -55,6 +63,9 @@ public string ToJSON() var jsonObject = new SyncDataBucketJSON { Bucket = Bucket, + HasMore = HasMore, + After = After, + NextAfter = NextAfter, Data = dataObjects }; diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs index 8e8dcb2..b19727f 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs @@ -130,7 +130,7 @@ public async Task BasicSetup() var initialBucketStates = await bucketStorage.GetBucketStates(); Assert.Empty(initialBucketStates); - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])])); + await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)])); var bucketStates = await bucketStorage.GetBucketStates(); @@ -154,7 +154,7 @@ public async Task ShouldGetObjectFromMultipleBuckets() { await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3])]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) ); await SyncLocalChecked(new Checkpoint @@ -176,7 +176,7 @@ public async Task ShouldPrioritizeLaterUpdates() await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_1])]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_1], false)]) ); await SyncLocalChecked(new Checkpoint @@ -193,7 +193,7 @@ public async Task ShouldIgnoreRemoveFromOneBucket() { // When we have 1 PUT and 1 REMOVE, the object must be kept.); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) ); await SyncLocalChecked(new Checkpoint @@ -210,7 +210,7 @@ public async Task ShouldRemoveWhenRemovedFromAllBuckets() { // When we only have REMOVE left for an object, it must be deleted. await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5]), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3, TestData.removeAsset1_5], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3, TestData.removeAsset1_4], false)]) ); await SyncLocalChecked(new Checkpoint @@ -250,7 +250,7 @@ public async Task ShouldUseSubkeys() }); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3, put4], false)]) ); await SyncLocalChecked(new Checkpoint @@ -262,7 +262,7 @@ await SyncLocalChecked(new Checkpoint var result = await db.GetAll("SELECT id, description, make FROM assets WHERE id = 'O1'"); Assert.Equal(new AssetResult("O1", "B", null), result[0]); - await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5])])); + await bucketStorage.SaveSyncData(new SyncDataBatch([new SyncDataBucket("bucket1", [remove5], false)])); await SyncLocalChecked(new Checkpoint { @@ -278,7 +278,7 @@ public async Task ShouldFailChecksumValidation() { // Simple checksum validation await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) ); var result = await bucketStorage.SyncLocalDatabase(new Checkpoint @@ -304,7 +304,7 @@ public async Task ShouldDeleteBuckets() { await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_3]), new SyncDataBucket("bucket2", [TestData.putAsset1_3])]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_3], false), new SyncDataBucket("bucket2", [TestData.putAsset1_3], false)]) ); await bucketStorage.RemoveBuckets(["bucket2"]); @@ -335,7 +335,7 @@ public async Task ShouldDeleteAndRecreateBuckets() { // Save some data await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1], false)]) ); // Delete the bucket @@ -343,7 +343,7 @@ await bucketStorage.SaveSyncData( // Save some data again await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) ); // Delete again @@ -351,7 +351,7 @@ await bucketStorage.SaveSyncData( // Final save of data await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset1_3], false)]) ); // Check that the data is there @@ -388,12 +388,12 @@ await bucketStorage.SaveSyncData( Op = new OpType(OpTypeEnum.MOVE).ToJSON(), Checksum = 1 }) - ]) + ], false) ]) ); await bucketStorage.SaveSyncData( - new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3])]) + new SyncDataBatch([new SyncDataBucket("bucket1", [TestData.putAsset1_3], false)]) ); await SyncLocalChecked(new Checkpoint @@ -412,7 +412,7 @@ public async Task ShouldHandleClear() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1], false) ]) ); @@ -446,7 +446,7 @@ await bucketStorage.SaveSyncData( ObjectId = TestData.putAsset2_2.ObjectId, ObjectType = TestData.putAsset2_2.ObjectType }) - ]) + ], false) ]) ); @@ -478,7 +478,7 @@ public async Task UpdateWithNewTypes() await bucketStorage.SaveSyncData( new SyncDataBatch( - [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3])]) + [new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false)]) ); await SyncLocalChecked(new Checkpoint @@ -524,7 +524,7 @@ public async Task ShouldRemoveTypes() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -578,7 +578,7 @@ public async Task ShouldCompact() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.removeAsset1_4], false) ]) ); @@ -613,7 +613,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_ServerRemoved() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -676,7 +676,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", Array.Empty()) + new SyncDataBucket("bucket1", Array.Empty(), false) ]) ); @@ -700,7 +700,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_1() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -743,7 +743,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", Array.Empty()) + new SyncDataBucket("bucket1", Array.Empty(), false) ]) ); @@ -769,7 +769,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_WhenMoreCrudIsAdded_2() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -797,7 +797,7 @@ await SyncLocalChecked(new Checkpoint await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", []) + new SyncDataBucket("bucket1", [], false) ]) ); @@ -826,7 +826,7 @@ public async Task ShouldNotSyncLocalDbWithPendingCrud_UpdateOnServer() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -865,7 +865,7 @@ await bucketStorage.SaveSyncData( Checksum = 5, Data = new { description = "server updated" } }) - ]) + ], false) ]) ); @@ -889,7 +889,7 @@ public async Task ShouldRevertAFailingInsert() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -931,7 +931,7 @@ public async Task ShouldRevertAFailingDelete() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); @@ -974,7 +974,7 @@ public async Task ShouldRevertAFailingUpdate() await bucketStorage.SaveSyncData( new SyncDataBatch( [ - new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3]) + new SyncDataBucket("bucket1", [TestData.putAsset1_1, TestData.putAsset2_2, TestData.putAsset1_3], false) ]) ); From b6599c91d4d6cf0e4f8a8792768dfa7890e30478 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 1 Dec 2025 13:43:16 +0200 Subject: [PATCH 07/22] Fixed bucket storage tests related to core extension changes. --- .../Client/Sync/BucketStorageTests.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs index b19727f..7a2f9bf 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/BucketStorageTests.cs @@ -4,7 +4,7 @@ namespace PowerSync.Common.Tests.Client.Sync; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; - +using Newtonsoft.Json; using PowerSync.Common.Client; using PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.DB.Schema; @@ -17,7 +17,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O1", - Data = new { description = "bar" }, + Data = JsonConvert.SerializeObject(new { description = "bar" }), Checksum = 1 }); @@ -27,7 +27,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O2", - Data = new { description = "bar" }, + Data = JsonConvert.SerializeObject(new { description = "bar" }), Checksum = 2 }); @@ -37,7 +37,7 @@ class TestData Op = new OpType(OpTypeEnum.PUT).ToJSON(), ObjectType = "assets", ObjectId = "O1", - Data = new { description = "bard" }, + Data = JsonConvert.SerializeObject(new { description = "bard" }), Checksum = 3 }); @@ -235,7 +235,7 @@ public async Task ShouldUseSubkeys() Subkey = "b", ObjectType = "assets", ObjectId = "O1", - Data = new { description = "B" }, + Data = JsonConvert.SerializeObject(new { description = "B" }), Checksum = 4 }); @@ -863,7 +863,7 @@ await bucketStorage.SaveSyncData( ObjectType = "assets", ObjectId = "O3", Checksum = 5, - Data = new { description = "server updated" } + Data = JsonConvert.SerializeObject(new { description = "server updated" }) }) ], false) ]) From 0fbc7e47f76eca2ad6b78b902df850a30d59689f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 1 Dec 2025 15:59:44 +0200 Subject: [PATCH 08/22] Introduced PowerSyncControlCommand constants. --- .../Sync/Bucket/BucketStorageAdapter.cs | 11 ++++ .../Stream/StreamingSyncImplementation.cs | 56 +++++++++---------- 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs index ba94eb7..4611bb2 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs @@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket; using PowerSync.Common.Utils; using Newtonsoft.Json; +public static class PowerSyncControlCommand +{ + public const string PROCESS_TEXT_LINE = "line_text"; + public const string PROCESS_BSON_LINE = "line_binary"; + public const string STOP = "stop"; + public const string START = "start"; + public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token"; + public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload"; + public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions"; +} + public class Checkpoint { [JsonProperty("last_op_id")] diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index b3e8366..7dd9c53 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -301,15 +301,15 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio while (true) { UpdateSyncStatus(new SyncStatusOptions { Connecting = true }); - + var iterationResult = (StreamingSyncIterationResult?)null; try { if (signal.Value.IsCancellationRequested) { break; } - var iterationResult = await StreamingSyncIteration(nestedCts.Token, options); - if (!iterationResult.Retry) + iterationResult = await StreamingSyncIteration(nestedCts.Token, options); + if (!iterationResult.Retry.GetValueOrDefault(false)) { // A sync error ocurred that we cannot recover from here. @@ -345,12 +345,10 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio DownloadError = ex } }); - - // On error, wait a little before retrying - await DelayRetry(); } finally { + notifyCompletedUploads = null; if (!signal.Value.IsCancellationRequested) { @@ -359,11 +357,18 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio nestedCts = new CancellationTokenSource(); } - UpdateSyncStatus(new SyncStatusOptions + + if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false)) { - Connected = false, - Connecting = true // May be unnecessary - }); + UpdateSyncStatus(new SyncStatusOptions + { + Connected = false, + Connecting = true // May be unnecessary + }); + + // On error, wait a little before retrying + await DelayRetry(); + } } } @@ -377,7 +382,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio protected record StreamingSyncIterationResult { - public bool Retry { get; init; } + public bool? Retry { get; init; } public bool? ImmediateRestart { get; init; } } @@ -396,35 +401,24 @@ protected async Task StreamingSyncIteration(Cancel Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params, ClientImplementation = options?.ClientImplementation ?? DEFAULT_STREAM_CONNECTION_OPTIONS.ClientImplementation }; - Console.WriteLine("Using sync client implementation: " + resolvedOptions.ClientImplementation); if (resolvedOptions.ClientImplementation == SyncClientImplementation.RUST) { - // Use Rust-based sync implementation - // var rustImpl = new RustStreamingSyncImplementation(Options); - // return await rustImpl.RustSyncIteration(signal, resolvedOptions); - return await RustStreamingSyncIteration(signal, resolvedOptions); } else { - // Use legacy C#-based sync implementation return await LegacyStreamingSyncIteration(signal, resolvedOptions); } } }); } - // protected async Task RustStreamingSyncIteration(StreamingSyncImplementationOptions options, CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) - // { - // // var rustImpl = new RustStreamingSyncImplementation(options); - // // return await rustImpl.RustSyncIteration(signal, resolvedOptions); - // throw new NotImplementedException("Rust streaming sync implementation is not yet implemented."); - // } - protected async Task RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions) { Task? receivingLines = null; + var hideDisconnectOnRestart = false; + var nestedCts = new CancellationTokenSource(); signal?.Register(() => { nestedCts.Cancel(); }); @@ -451,18 +445,18 @@ async Task Connect(EstablishSyncStream instruction) while ((line = await reader.ReadLineAsync()) != null) { logger.LogDebug("Parsing line for rust sync stream {message}", "xx"); - await Control("line_text", line); + await Control(PowerSyncControlCommand.PROCESS_TEXT_LINE, line); } } async Task Stop() { - await Control("stop"); + await Control(PowerSyncControlCommand.STOP); } async Task Control(string op, object? payload = null) { - logger.LogDebug("Control call {message}", op); + logger.LogTrace("Control call {message}", op); var rawResponse = await Options.Adapter.Control(op, payload); HandleInstructions(Instruction.ParseInstructions(rawResponse)); @@ -531,6 +525,7 @@ void HandleInstruction(Instruction instruction) break; case CloseSyncStream: nestedCts.Cancel(); + hideDisconnectOnRestart = true; logger.LogWarning("Closing stream"); break; case FlushFileSystem: @@ -546,8 +541,9 @@ void HandleInstruction(Instruction instruction) try { - notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); }; - await Control("start", JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params })); + await Control(PowerSyncControlCommand.START, JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params })); + notifyCompletedUploads = () => { Task.Run(async () => await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED)); }; + if (receivingLines != null) { await receivingLines; @@ -559,7 +555,7 @@ void HandleInstruction(Instruction instruction) await Stop(); } - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { ImmediateRestart = hideDisconnectOnRestart }; } protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) From b2585702195c6a4417ffc39ad994051957906bf7 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 2 Dec 2025 14:15:15 +0200 Subject: [PATCH 09/22] Removed old package. --- .../PowerSync.Common.IntegrationTests.csproj | 21 ------------------- 1 file changed, 21 deletions(-) delete mode 100644 PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj diff --git a/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj deleted file mode 100644 index d7f0b2e..0000000 --- a/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj +++ /dev/null @@ -1,21 +0,0 @@ - - - - net9.0 - enable - enable - false - - - - - - - - - - - - - - From 10371915bf2c3ab878db4a5ea7a19f302f17b30e Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 2 Dec 2025 16:43:49 +0200 Subject: [PATCH 10/22] Tracer fix for csharp sync. --- .../Stream/StreamingSyncImplementation.cs | 90 +++++++++++-------- .../PowerSync.Common/DB/Crud/SyncStatus.cs | 14 ++- 2 files changed, 65 insertions(+), 39 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 7dd9c53..23757e7 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -294,23 +294,29 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio }); }); - /// This loops runs until [retry] is false or the abort signal is set to aborted. - /// Aborting the nestedCts will: - /// - Abort any pending fetch requests - /// - Close any sync stream ReadableStreams (which will also close any established network requests) + // This loops runs until [retry] is false or the abort signal is set to aborted. + // Aborting the nestedCts will: + // - Abort any pending fetch requests + // - Close any sync stream ReadableStreams (which will also close any established network requests) while (true) { + Console.WriteLine(1); UpdateSyncStatus(new SyncStatusOptions { Connecting = true }); + Console.WriteLine(2); var iterationResult = (StreamingSyncIterationResult?)null; try { + Console.WriteLine(3); if (signal.Value.IsCancellationRequested) { + Console.WriteLine("BREAKINGNNNNG"); break; } + Console.WriteLine(4); iterationResult = await StreamingSyncIteration(nestedCts.Token, options); - if (!iterationResult.Retry.GetValueOrDefault(false)) + if (!iterationResult.Retry) { + Console.WriteLine(5); // A sync error ocurred that we cannot recover from here. // This loop must terminate. @@ -318,6 +324,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio break; } // Continue immediately + Console.WriteLine(6); } catch (Exception ex) { @@ -327,9 +334,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio exMessage = "Stream closed or timed out -" + ex.InnerException.Message; } - + Console.WriteLine(7); logger.LogError("Caught exception in streaming sync: {message}", exMessage); - + Console.WriteLine(8); // Either: // - A network request failed with a failed connection or not OKAY response code. // - There was a sync processing error. @@ -345,9 +352,14 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio DownloadError = ex } }); + + Console.WriteLine(9); + await DelayRetry(); + Console.WriteLine(10); } finally { + Console.WriteLine(11); notifyCompletedUploads = null; if (!signal.Value.IsCancellationRequested) @@ -358,17 +370,18 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio } - if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false)) + // if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false)) + // { + Console.WriteLine(12); + UpdateSyncStatus(new SyncStatusOptions { - UpdateSyncStatus(new SyncStatusOptions - { - Connected = false, - Connecting = true // May be unnecessary - }); + Connected = false, + Connecting = true // May be unnecessary + }); - // On error, wait a little before retrying - await DelayRetry(); - } + // On error, wait a little before retrying + + // } } } @@ -382,7 +395,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio protected record StreamingSyncIterationResult { - public bool? Retry { get; init; } + public bool Retry { get; init; } public bool? ImmediateRestart { get; init; } } @@ -956,30 +969,37 @@ protected record UpdateSyncStatusOptions( ); protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptions? updateOptions = null) { - var updatedStatus = new SyncStatus(new SyncStatusOptions + try { - Connected = options.Connected ?? SyncStatus.Connected, - Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), - LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, - DataFlow = new SyncDataFlowStatus + var updatedStatus = new SyncStatus(new SyncStatusOptions { - Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, - Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, - DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, - UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, + Connected = options.Connected ?? SyncStatus.Connected, + Connecting = !options.Connected.GetValueOrDefault() && (options.Connecting ?? SyncStatus.Connecting), + LastSyncedAt = options.LastSyncedAt ?? SyncStatus.LastSyncedAt, + DataFlow = new SyncDataFlowStatus + { + Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading, + Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading, + DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError, + UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError, + } + }); + + if (!SyncStatus.Equals(updatedStatus)) + { + SyncStatus = updatedStatus; + logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); + // Only trigger this if there was a change + Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); } - }); - if (!SyncStatus.Equals(updatedStatus)) + // Trigger this for all updates + Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); + } + catch (Exception ex) { - SyncStatus = updatedStatus; - logger.LogDebug("[Sync status updated]: {message}", updatedStatus.ToJSON()); - // Only trigger this if there was a change - Emit(new StreamingSyncImplementationEvent { StatusChanged = updatedStatus }); + logger.LogError("Error updating sync status: {message}", ex.Message); } - - // Trigger this for all updates - Emit(new StreamingSyncImplementationEvent { StatusUpdated = options }); } private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status) diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index 7416816..5f931ab 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -2,6 +2,7 @@ namespace PowerSync.Common.DB.Crud; using PowerSync.Common.Client.Sync.Stream; using Newtonsoft.Json; +using Microsoft.Extensions.Options; public class SyncDataFlowStatus { @@ -13,14 +14,14 @@ public class SyncDataFlowStatus /// Error during downloading (including connecting). /// Cleared on the next successful data download. /// - [JsonProperty("downloadError")] + [JsonIgnore] public Exception? DownloadError { get; set; } = null; /// /// Error during uploading. /// Cleared on the next successful upload. /// - [JsonProperty("uploadError")] + [JsonIgnore] public Exception? UploadError { get; set; } = null; @@ -151,9 +152,14 @@ public SyncPriorityStatus StatusForPriority(int priority) }; } + private string SerializeObject() + { + return JsonConvert.SerializeObject(new { Options = Options, UploadErrorMessage = Options.DataFlow?.UploadError?.Message, DownloadErrorMessage = DataFlowStatus.DownloadError?.Message }); + } + public bool IsEqual(SyncStatus status) { - return JsonConvert.SerializeObject(Options) == JsonConvert.SerializeObject(status.Options); + return this.SerializeObject() == status.SerializeObject(); } public string GetMessage() @@ -165,7 +171,7 @@ public string GetMessage() public string ToJSON() { - return JsonConvert.SerializeObject(this); + return SerializeObject(); } private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) From a310bac793d256a2a1087822066d21cdbf09e4d5 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 2 Dec 2025 17:07:19 +0200 Subject: [PATCH 11/22] Cleanup. Final iteration result handling that works for both Rust and C_SHARP sync implementations. --- .../Stream/StreamingSyncImplementation.cs | 72 ++++++++----------- 1 file changed, 30 insertions(+), 42 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 23757e7..30d5ce5 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -300,31 +300,17 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio // - Close any sync stream ReadableStreams (which will also close any established network requests) while (true) { - Console.WriteLine(1); UpdateSyncStatus(new SyncStatusOptions { Connecting = true }); - Console.WriteLine(2); var iterationResult = (StreamingSyncIterationResult?)null; + var shouldDelayRetry = true; + try { - Console.WriteLine(3); if (signal.Value.IsCancellationRequested) { - Console.WriteLine("BREAKINGNNNNG"); break; } - Console.WriteLine(4); iterationResult = await StreamingSyncIteration(nestedCts.Token, options); - if (!iterationResult.Retry) - { - Console.WriteLine(5); - - // A sync error ocurred that we cannot recover from here. - // This loop must terminate. - // The nestedCts will close any open network requests and streams below. - break; - } - // Continue immediately - Console.WriteLine(6); } catch (Exception ex) { @@ -334,15 +320,20 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio exMessage = "Stream closed or timed out -" + ex.InnerException.Message; } - Console.WriteLine(7); - logger.LogError("Caught exception in streaming sync: {message}", exMessage); - Console.WriteLine(8); // Either: // - A network request failed with a failed connection or not OKAY response code. // - There was a sync processing error. // This loop will retry. // The nested abort controller will cleanup any open network requests and streams. - // The WebRemote should only abort pending fetch requests or close active Readable streams. + if (nestedCts.IsCancellationRequested) + { + logger.LogWarning("Caught exception in streaming sync: {message}", exMessage); + shouldDelayRetry = false; + } + else + { + logger.LogError("Caught exception in streaming sync: {message}", exMessage); + } UpdateSyncStatus(new SyncStatusOptions { @@ -352,14 +343,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio DownloadError = ex } }); - - Console.WriteLine(9); - await DelayRetry(); - Console.WriteLine(10); } finally { - Console.WriteLine(11); notifyCompletedUploads = null; if (!signal.Value.IsCancellationRequested) @@ -369,19 +355,21 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio nestedCts = new CancellationTokenSource(); } - - // if (iterationResult != null && iterationResult.ImmediateRestart.GetValueOrDefault(false)) - // { - Console.WriteLine(12); - UpdateSyncStatus(new SyncStatusOptions + if (iterationResult != null && (iterationResult.ImmediateRestart != true && iterationResult.LegacyRetry != true)) { - Connected = false, - Connecting = true // May be unnecessary - }); - // On error, wait a little before retrying + UpdateSyncStatus(new SyncStatusOptions + { + Connected = false, + Connecting = true + }); - // } + // On error, wait a little before retrying + if (shouldDelayRetry) + { + await DelayRetry(); + } + } } } @@ -395,7 +383,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio protected record StreamingSyncIterationResult { - public bool Retry { get; init; } + public bool? LegacyRetry { get; init; } public bool? ImmediateRestart { get; init; } } @@ -631,7 +619,7 @@ protected async Task LegacyStreamingSyncIteration( { logger.LogDebug("Stream has closed while waiting"); // The stream has closed while waiting - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { LegacyRetry = true }; } // A connection is active and messages are being received @@ -678,7 +666,7 @@ protected async Task LegacyStreamingSyncIteration( // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { LegacyRetry = true }; } else if (!result.Ready) { @@ -780,14 +768,14 @@ protected async Task LegacyStreamingSyncIteration( // (uses the same one), this should have some delay. // await DelayRetry(); - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { LegacyRetry = true }; } else if (remainingSeconds < 30) { logger.LogDebug("Token will expire soon; reconnect"); // Pre-emptively refresh the token Options.Remote.InvalidateCredentials(); - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { LegacyRetry = true }; } TriggerCrudUpload(); } @@ -816,7 +804,7 @@ protected async Task LegacyStreamingSyncIteration( // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await Task.Delay(50); - return new StreamingSyncIterationResult { Retry = false }; + return new StreamingSyncIterationResult { LegacyRetry = false }; } else if (!result.Ready) { @@ -846,7 +834,7 @@ protected async Task LegacyStreamingSyncIteration( logger.LogDebug("Stream input empty"); // Connection closed. Likely due to auth issue. - return new StreamingSyncIterationResult { Retry = true }; + return new StreamingSyncIterationResult { LegacyRetry = true }; } public new void Close() From 08b4ee73401b2d68f9a885f7a5c4e501f97ab9ab Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 09:10:24 +0200 Subject: [PATCH 12/22] Ignoring NET-iOS8 warning. --- Directory.build.props | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Directory.build.props b/Directory.build.props index 5b4d6fa..bcdc24a 100644 --- a/Directory.build.props +++ b/Directory.build.props @@ -1,5 +1,8 @@ + + $(MSBuildWarningsAsMessages);NETSDK1202 + From 077ee4c0742cbf736d1db55d48578898abebcd09 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 09:43:45 +0200 Subject: [PATCH 13/22] Changelog and cleanup entries. --- PowerSync/PowerSync.Common/CHANGELOG.md | 3 +++ .../Client/Sync/Stream/StreamingSyncImplementation.cs | 7 ++----- PowerSync/PowerSync.Maui/CHANGELOG.md | 4 ++++ README.md | 4 ++-- .../PowerSync.Common.IntegrationTests.csproj | 2 -- .../SyncIntegrationTests.cs | 2 +- .../PowerSync.Common.Tests/Client/Sync/CRUDTests.cs | 2 -- Tools/Setup/Setup.cs | 1 - 8 files changed, 12 insertions(+), 13 deletions(-) diff --git a/PowerSync/PowerSync.Common/CHANGELOG.md b/PowerSync/PowerSync.Common/CHANGELOG.md index 21f94ac..4fc0f47 100644 --- a/PowerSync/PowerSync.Common/CHANGELOG.md +++ b/PowerSync/PowerSync.Common/CHANGELOG.md @@ -1,5 +1,8 @@ # PowerSync.Common Changelog +## 0.0.5-alpha.1 +- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. + ## 0.0.4-alpha.1 - Fixed MAUI issues related to extension loading when installing package outside of the monorepo. diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index 30d5ce5..f1992b3 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -81,7 +81,7 @@ public class BaseConnectionOptions(Dictionary? parameters = null public Dictionary? Params { get; set; } = parameters; /// - /// Wehether to use the RUST or C# sync client implementation. + /// Whether to use the RUST or C# sync client implementation. /// public SyncClientImplementation? ClientImplementation { get; set; } = clientImplementation; } @@ -561,7 +561,6 @@ void HandleInstruction(Instruction instruction) protected async Task LegacyStreamingSyncIteration(CancellationToken signal, RequiredPowerSyncConnectionOptions resolvedOptions) { - // There's also a warning explaining that the legacy client will be removed in the future logger.LogWarning("The legacy sync client implementation is deprecated and will be removed in a future release."); logger.LogDebug("Streaming sync iteration started"); Options.Adapter.StartSession(); @@ -600,7 +599,7 @@ protected async Task LegacyStreamingSyncIteration( Buckets = req, IncludeChecksum = true, RawData = true, - Parameters = resolvedOptions.Params, // Replace with actual params + Parameters = resolvedOptions.Params, ClientId = clientId } }; @@ -664,7 +663,6 @@ protected async Task LegacyStreamingSyncIteration( if (!result.CheckpointValid) { // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off await Task.Delay(50); return new StreamingSyncIterationResult { LegacyRetry = true }; } @@ -802,7 +800,6 @@ protected async Task LegacyStreamingSyncIteration( if (!result.CheckpointValid) { // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off await Task.Delay(50); return new StreamingSyncIterationResult { LegacyRetry = false }; } diff --git a/PowerSync/PowerSync.Maui/CHANGELOG.md b/PowerSync/PowerSync.Maui/CHANGELOG.md index a0f0049..ee6b964 100644 --- a/PowerSync/PowerSync.Maui/CHANGELOG.md +++ b/PowerSync/PowerSync.Maui/CHANGELOG.md @@ -1,5 +1,9 @@ # PowerSync.Maui Changelog +## 0.0.3-alpha.1 +- Upstream PowerSync.Common version bump +- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`. + ## 0.0.2-alpha.1 - Fixed issues related to extension loading when installing package outside of the monorepo. diff --git a/README.md b/README.md index 468d6f3..98a61bb 100644 --- a/README.md +++ b/README.md @@ -106,13 +106,13 @@ dotnet test -v n --framework net8.0 --filter "test-file-pattern" ### Integration Tests Integration tests in `PowerSync.Common.IntegrationTests` are intended to run against the [self-host nodejs demo](https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs). -The integration tests are disabled by default, define the following environment variable to let them run. +The integration tests are disabled by default, define the following environment variable when running the tests. ```bash RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 ``` -Run integration tests exclusively. +Only run integration tests, without any unit tests. ```bash RUN_INTEGRATION_TESTS=true dotnet test -v n --framework net8.0 --filter "Category=Integration" diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj index fc29ad9..21189c9 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/PowerSync.Common.IntegrationTests.csproj @@ -1,9 +1,7 @@  - net8.0 - enable enable false diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index 8e9ad97..baa2a8d 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -248,7 +248,7 @@ public IntegrationFactAttribute() // Set default timeout if not already set if (Timeout == 0) { - Timeout = 5000; // 30 seconds default for all integration tests + Timeout = 5000; // 5 seconds default for all integration tests } } } \ No newline at end of file diff --git a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs index e094639..c3ce177 100644 --- a/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs +++ b/Tests/PowerSync/PowerSync.Common.Tests/Client/Sync/CRUDTests.cs @@ -127,8 +127,6 @@ public async Task UpdateTest() { "description", "test2" } }); - Console.WriteLine(JsonConvert.SerializeObject(tx.Crud.First())); - Console.WriteLine(JsonConvert.SerializeObject(expectedCrudEntry)); Assert.True(tx.Crud.First().Equals(expectedCrudEntry)); } diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 81fd4c0..80d454c 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -9,7 +9,6 @@ public class PowerSyncSetup { private const string VERSION = "0.4.9"; - // https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v0.4.9/libpowersync_aarch64.android.a private const string GITHUB_BASE_URL = $"https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v{VERSION}"; private const string MAVEN_BASE_URL = $"https://repo1.maven.org/maven2/com/powersync/powersync-sqlite-core/{VERSION}"; From bbdcd2748eccad3e9fc422ee1c79cc4ea4d68aa5 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 09:53:30 +0200 Subject: [PATCH 14/22] MAUI workloads for test restore. --- .github/workflows/test.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 42f1e8f..7e70847 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,6 +17,9 @@ jobs: with: dotnet-version: '8.0' + - name: Install MAUI Workloads + run: dotnet workload restore + - name: Download PowerSync extension run: dotnet run --project Tools/Setup From 0e012b80917815063b671029f23c46f9790f69e2 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 11:35:28 +0200 Subject: [PATCH 15/22] Removed unused function. --- PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index 5f931ab..3e7507a 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -173,10 +173,4 @@ public string ToJSON() { return SerializeObject(); } - - private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b) - { - // Lower numbers = higher priority - return a.Priority.CompareTo(b.Priority); - } } \ No newline at end of file From 9fb2ec7e7a5f908787d3ce0a6715cad1f8fe9ceb Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 13:43:30 +0200 Subject: [PATCH 16/22] Passing null payloads to control (if exists). Invoke notifyCompletedUploads after crud upload completes. --- .../Client/Sync/Bucket/SqliteBucketStorage.cs | 2 +- .../Sync/Stream/StreamingSyncImplementation.cs | 15 +++++++++++---- PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs | 2 +- .../SyncIntegrationTests.cs | 2 +- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 4ec5e51..7dfff74 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -443,7 +443,7 @@ public async Task Control(string op, object? payload = null) { return await db.WriteTransaction(async tx => { - var result = await tx.Get("SELECT powersync_control(?, ?) AS r", [op, payload ?? ""]); + var result = await tx.Get("SELECT powersync_control(?, ?) AS r", [op, payload]); return result.r!; }); } diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index f1992b3..aed8e3c 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -141,6 +141,8 @@ public class StreamingSyncImplementation : EventStream { - if (!SyncStatus.Connected) + if (!SyncStatus.Connected || isUploadingCrud) { return; } - notifyCompletedUploads?.Invoke(); - - Task.Run(async () => await InternalUploadAllCrud()); + isUploadingCrud = true; + Task.Run(async () => + { + await InternalUploadAllCrud(); + notifyCompletedUploads?.Invoke(); + isUploadingCrud = false; + }); }; } diff --git a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs index 3e7507a..cdda36a 100644 --- a/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs +++ b/PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs @@ -33,7 +33,7 @@ public class SyncDataFlowStatus public class SyncPriorityStatus { - [JsonProperty("uploading")] public int Priority { get; set; } + [JsonProperty("priority")] public int Priority { get; set; } [JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; } diff --git a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs index baa2a8d..6e2da0e 100644 --- a/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs +++ b/Tests/PowerSync/PowerSync.Common.IntegrationTests/SyncIntegrationTests.cs @@ -49,7 +49,7 @@ public async Task InitializeAsync() { await db.Connect(connector, new PowerSyncConnectionOptions { - ClientImplementation = SyncClientImplementation.C_SHARP, + ClientImplementation = SyncClientImplementation.RUST, }); await db.WaitForFirstSync(); } From 6cda500466bfaa2c535e5a5bcfba7e180ec15396 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 14:49:41 +0200 Subject: [PATCH 17/22] Updating setup for latest versions. --- Tools/Setup/Setup.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Tools/Setup/Setup.cs b/Tools/Setup/Setup.cs index 80d454c..81fd9c3 100644 --- a/Tools/Setup/Setup.cs +++ b/Tools/Setup/Setup.cs @@ -52,10 +52,10 @@ private static Dictionary GetDesktopRuntimeConfigs() { return new Dictionary { - { "osx-x64", new RuntimeConfig("libpowersync_x64.dylib", "libpowersync.dylib") }, - { "osx-arm64", new RuntimeConfig("libpowersync_aarch64.dylib", "libpowersync.dylib") }, - { "linux-x64", new RuntimeConfig("libpowersync_x64.so", "libpowersync.so") }, - { "linux-arm64", new RuntimeConfig("libpowersync_aarch64.so", "libpowersync.so") }, + { "osx-x64", new RuntimeConfig("libpowersync_x64.macos.dylib", "libpowersync.dylib") }, + { "osx-arm64", new RuntimeConfig("libpowersync_aarch64.macos.dylib", "libpowersync.dylib") }, + { "linux-x64", new RuntimeConfig("libpowersync_x64.linux.so", "libpowersync.so") }, + { "linux-arm64", new RuntimeConfig("libpowersync_aarch64.linux.so", "libpowersync.so") }, { "win-x64", new RuntimeConfig("powersync_x64.dll", "powersync.dll") } }; } From d516eae72f08f4b2137287f86dd052922b43ddcc Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 15:28:50 +0200 Subject: [PATCH 18/22] Further improvements on rust sync implementation. Triggering crud upload on first line read. Sending NOTIFY_TOKEN_REFRESH control on non-expired fetch credentials instruction. --- .../Stream/StreamingSyncImplementation.cs | 61 ++++++++++++++++--- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index aed8e3c..f143356 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -2,6 +2,7 @@ namespace PowerSync.Common.Client.Sync.Stream; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -425,7 +426,8 @@ protected async Task StreamingSyncIteration(Cancel protected async Task RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions) { Task? receivingLines = null; - var hideDisconnectOnRestart = false; + bool hadSyncLine = false; + bool hideDisconnectOnRestart = false; var nestedCts = new CancellationTokenSource(); @@ -448,12 +450,23 @@ async Task Connect(EstablishSyncStream instruction) try { stream?.Close(); } catch { } }); - string? line; + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true + }); + string? line; while ((line = await reader.ReadLineAsync()) != null) { - logger.LogDebug("Parsing line for rust sync stream {message}", "xx"); await Control(PowerSyncControlCommand.PROCESS_TEXT_LINE, line); + + // Triggers a local CRUD upload when the first sync line has been received. + // This allows uploading local changes that have been made while offline or disconnected. + if (!hadSyncLine) + { + TriggerCrudUpload(); + hadSyncLine = true; + } } } @@ -464,21 +477,20 @@ async Task Stop() async Task Control(string op, object? payload = null) { - logger.LogTrace("Control call {message}", op); - var rawResponse = await Options.Adapter.Control(op, payload); + logger.LogTrace("powersync_control {op}, {payload}, {rawResponse}", op, payload, rawResponse); HandleInstructions(Instruction.ParseInstructions(rawResponse)); } - void HandleInstructions(Instruction[] instructions) + async void HandleInstructions(Instruction[] instructions) { foreach (var instruction in instructions) { - HandleInstruction(instruction); + await HandleInstruction(instruction); } } - void HandleInstruction(Instruction instruction) + async Task HandleInstruction(Instruction instruction) { switch (instruction) { @@ -529,7 +541,26 @@ void HandleInstruction(Instruction instruction) receivingLines = Connect(establishSyncStream); break; case FetchCredentials fetchCredentials: - Options.Remote.InvalidateCredentials(); + if (fetchCredentials.DidExpire) + { + Options.Remote.InvalidateCredentials(); + } + else + { + Options.Remote.InvalidateCredentials(); + + // Restart iteration after the credentials have been refreshed. + try + { + await Options.Remote.FetchCredentials(); + await Control(PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED); + } + catch (Exception err) + { + logger.LogWarning("Could not prefetch credentials: {message}", err.Message); + } + + } break; case CloseSyncStream: nestedCts.Cancel(); @@ -550,7 +581,17 @@ void HandleInstruction(Instruction instruction) try { await Control(PowerSyncControlCommand.START, JsonConvert.SerializeObject(new { parameters = resolvedOptions.Params })); - notifyCompletedUploads = () => { Task.Run(async () => await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED)); }; + + notifyCompletedUploads = () => + { + Task.Run(async () => + { + if (!nestedCts.IsCancellationRequested) + { + await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED); + } + }); + }; if (receivingLines != null) { From 9710cbaef2c0f710e049fb07531019d913d63634 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 17:10:22 +0200 Subject: [PATCH 19/22] Using `main.sqlite_sequence` instead of `sqlite_sequence`. --- .../Client/Sync/Bucket/SqliteBucketStorage.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs index 7dfff74..78c549d 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs @@ -314,7 +314,7 @@ public async Task UpdateLocalTarget(Func> callback) } var rs = await db.GetAll( - "SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'" + "SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'" ); if (rs.Length == 0) @@ -338,7 +338,7 @@ public async Task UpdateLocalTarget(Func> callback) } var rsAfter = await tx.GetAll( - "SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'" + "SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'" ); if (rsAfter.Length == 0) From 8bdff421798a958a70f2ec66645768ce7a4b01ad Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 3 Dec 2025 17:23:37 +0200 Subject: [PATCH 20/22] Hardcoded MAUI version for dependencies. --- demos/MAUITodo/MAUITodo.csproj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/demos/MAUITodo/MAUITodo.csproj b/demos/MAUITodo/MAUITodo.csproj index cee3763..50f596e 100644 --- a/demos/MAUITodo/MAUITodo.csproj +++ b/demos/MAUITodo/MAUITodo.csproj @@ -28,6 +28,8 @@ None true true + + 8.0.90 11.0 21.0 From a6b8b1f30d6250f8c2dee5ee133e5fd14586b578 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 4 Dec 2025 14:25:41 +0200 Subject: [PATCH 21/22] Using event stream to interface with central control call. --- .../Stream/StreamingSyncImplementation.cs | 78 +++++++++++++------ .../PowerSync.Common/Utils/EventStream.cs | 3 + 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index f143356..bd52ca9 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -396,6 +396,12 @@ protected record StreamingSyncIterationResult public bool? ImmediateRestart { get; init; } } + protected record EnqueuedCommand + { + public string Command { get; init; } = null!; + public object? Payload { get; init; } + } + protected async Task StreamingSyncIteration(CancellationToken signal, PowerSyncConnectionOptions? options) { @@ -423,12 +429,14 @@ protected async Task StreamingSyncIteration(Cancel }); } + protected async Task RustStreamingSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions) { Task? receivingLines = null; bool hadSyncLine = false; bool hideDisconnectOnRestart = false; + var controlInvocations = (EventStream)null!; var nestedCts = new CancellationTokenSource(); signal?.Register(() => { nestedCts.Cancel(); }); @@ -442,32 +450,52 @@ async Task Connect(EstablishSyncStream instruction) Data = instruction.Request }; - var stream = await Options.Remote.PostStreamRaw(syncOptions); - using var reader = new StreamReader(stream, Encoding.UTF8); - - syncOptions.CancellationToken.Register(() => + controlInvocations = new EventStream(); + try { - try { stream?.Close(); } catch { } - }); + controlInvocations?.RunListenerAsync(async (line) => + { + await Control(line.Command, line.Payload); - UpdateSyncStatus(new SyncStatusOptions - { - Connected = true - }); + // Triggers a local CRUD upload when the first sync line has been received. + // This allows uploading local changes that have been made while offline or disconnected. + if (!hadSyncLine) + { + TriggerCrudUpload(); + hadSyncLine = true; + } + }); - string? line; - while ((line = await reader.ReadLineAsync()) != null) - { - await Control(PowerSyncControlCommand.PROCESS_TEXT_LINE, line); + var stream = await Options.Remote.PostStreamRaw(syncOptions); + using var reader = new StreamReader(stream, Encoding.UTF8); - // Triggers a local CRUD upload when the first sync line has been received. - // This allows uploading local changes that have been made while offline or disconnected. - if (!hadSyncLine) + syncOptions.CancellationToken.Register(() => { - TriggerCrudUpload(); - hadSyncLine = true; + try { stream?.Close(); } catch { } + }); + + UpdateSyncStatus(new SyncStatusOptions + { + Connected = true + }); + + string? line; + while ((line = await reader.ReadLineAsync()) != null) + { + controlInvocations?.Emit(new EnqueuedCommand + { + Command = PowerSyncControlCommand.PROCESS_TEXT_LINE, + Payload = line + }); + } } + finally + { + var activeInstructions = controlInvocations; + controlInvocations = null; + activeInstructions?.Close(); + } } async Task Stop() @@ -553,7 +581,10 @@ async Task HandleInstruction(Instruction instruction) try { await Options.Remote.FetchCredentials(); - await Control(PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED); + controlInvocations?.Emit(new EnqueuedCommand + { + Command = PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED + }); } catch (Exception err) { @@ -586,9 +617,12 @@ async Task HandleInstruction(Instruction instruction) { Task.Run(async () => { - if (!nestedCts.IsCancellationRequested) + if (controlInvocations != null && !controlInvocations.Closed) { - await Control(PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED); + controlInvocations?.Emit(new EnqueuedCommand + { + Command = PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED + }); } }); }; diff --git a/PowerSync/PowerSync.Common/Utils/EventStream.cs b/PowerSync/PowerSync.Common/Utils/EventStream.cs index 671abcd..3bf93cd 100644 --- a/PowerSync/PowerSync.Common/Utils/EventStream.cs +++ b/PowerSync/PowerSync.Common/Utils/EventStream.cs @@ -25,6 +25,8 @@ CancellationTokenSource RunListenerAsync( public class EventStream : IEventStream { + public bool Closed = false; + // Closest implementation to a ConcurrentSet in .Net private readonly ConcurrentDictionary, byte> subscribers = new(); @@ -158,6 +160,7 @@ public void Close() subscriber.Writer.TryComplete(); RemoveSubscriber(subscriber); } + Closed = true; } private void RemoveSubscriber(Channel channel) From 848d78c42f2cb7c71e09e70d94161f43cb93834c Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 4 Dec 2025 15:04:23 +0200 Subject: [PATCH 22/22] Warning cleanup. --- PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs | 2 +- .../Client/Sync/Stream/StreamingSyncImplementation.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs index 7dc0b77..9f17882 100644 --- a/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs +++ b/PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs @@ -88,7 +88,7 @@ public class PowerSyncDatabase : EventStream, IPowerSyncDataba private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30; private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled); - public bool Closed; + public new bool Closed; public bool Ready; protected Task isReadyTask; diff --git a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs index bd52ca9..62466cc 100644 --- a/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs +++ b/PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs @@ -615,7 +615,7 @@ async Task HandleInstruction(Instruction instruction) notifyCompletedUploads = () => { - Task.Run(async () => + Task.Run(() => { if (controlInvocations != null && !controlInvocations.Closed) {