Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
434ec8f
Added integration test shell.
Chriztiaan Nov 24, 2025
563012f
Added initial integration tests.
Chriztiaan Nov 24, 2025
89f5181
Support 0.4.0 for C_SHARP sync implementation.
Chriztiaan Dec 1, 2025
f731d70
Initial rust sync support.
Chriztiaan Dec 1, 2025
e372cf6
Fixed crud tests. Removed boolean parameter from BucketStorageTests.
Chriztiaan Dec 1, 2025
88006ec
Revert SyncDataBucket changes.
Chriztiaan Dec 1, 2025
b6599c9
Fixed bucket storage tests related to core extension changes.
Chriztiaan Dec 1, 2025
0fbc7e4
Introduced PowerSyncControlCommand constants.
Chriztiaan Dec 1, 2025
b258570
Removed old package.
Chriztiaan Dec 2, 2025
1037191
Tracer fix for csharp sync.
Chriztiaan Dec 2, 2025
a310bac
Cleanup. Final iteration result handling that works for both Rust and…
Chriztiaan Dec 2, 2025
08b4ee7
Ignoring NET-iOS8 warning.
Chriztiaan Dec 3, 2025
077ee4c
Changelog and cleanup entries.
Chriztiaan Dec 3, 2025
bbdcd27
MAUI workloads for test restore.
Chriztiaan Dec 3, 2025
0e012b8
Removed unused function.
Chriztiaan Dec 3, 2025
9fb2ec7
Passing null payloads to control (if exists). Invoke notifyCompletedU…
Chriztiaan Dec 3, 2025
6cda500
Updating setup for latest versions.
Chriztiaan Dec 3, 2025
d516eae
Further improvements on rust sync implementation. Triggering crud upl…
Chriztiaan Dec 3, 2025
9710cba
Using `main.sqlite_sequence` instead of `sqlite_sequence`.
Chriztiaan Dec 3, 2025
8bdff42
Hardcoded MAUI version for dependencies.
Chriztiaan Dec 3, 2025
a6b8b1f
Using event stream to interface with central control call.
Chriztiaan Dec 4, 2025
848d78c
Warning cleanup.
Chriztiaan Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ jobs:
with:
dotnet-version: '8.0'

- name: Install MAUI Workloads
run: dotnet workload restore

- name: Download PowerSync extension
run: dotnet run --project Tools/Setup

Expand Down
3 changes: 3 additions & 0 deletions Directory.build.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<MSBuildWarningsAsMessages>$(MSBuildWarningsAsMessages);NETSDK1202</MSBuildWarningsAsMessages>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(MSBuildThisFileDirectory)IsExternalInit.cs" Visible="false" />
</ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# PowerSync.Common Changelog

## 0.0.5-alpha.1
- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`.

## 0.0.4-alpha.1
- Fixed MAUI issues related to extension loading when installing package outside of the monorepo.

Expand Down
8 changes: 4 additions & 4 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>

Task<NonQueryResult> Execute(string query, object[]? parameters = null);

Task<T[]> GetAll<T>(string sql, params object[]? parameters);
Task<T[]> GetAll<T>(string sql, object[]? parameters = null);

Task<T?> GetOptional<T>(string sql, params object[]? parameters);
Task<T?> GetOptional<T>(string sql, object[]? parameters = null);

Task<T> Get<T>(string sql, params object[]? parameters);
Task<T> Get<T>(string sql, object[]? parameters = null);

Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null);

Expand Down Expand Up @@ -236,7 +236,7 @@ protected async Task UpdateHasSynced()
);

DateTime? lastCompleteSync = null;

// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
foreach (var result in results)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket;
using PowerSync.Common.Utils;
using Newtonsoft.Json;

public static class PowerSyncControlCommand
{
public const string PROCESS_TEXT_LINE = "line_text";
public const string PROCESS_BSON_LINE = "line_binary";
public const string STOP = "stop";
public const string START = "start";
public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token";
public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload";
public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions";
}

public class Checkpoint
{
[JsonProperty("last_op_id")]
Expand Down Expand Up @@ -113,4 +124,9 @@ public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
/// Get a unique client ID.
/// </summary>
Task<string> GetClientId();

/// <summary>
/// Invokes the `powersync_control` function for the sync client.
/// </summary>
Task<string> Control(string op, object? payload);
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ await db.WriteTransaction(async tx =>
foreach (var b in batch.Buckets)
{
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
["save", JsonConvert.SerializeObject(new { buckets = new[] { b.ToJSON() } })]);
["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]);
logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result));
count += b.Data.Length;
}
Expand Down Expand Up @@ -351,16 +351,18 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)

if (seqAfter != seqBefore)
{
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, seqBefore);
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter,
seqBefore);
return false;
}

var response = await tx.Execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
[opId]
);
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
[opId]
);

logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", JsonConvert.SerializeObject(response));
logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}",
JsonConvert.SerializeObject(response));
return true;
});
}
Expand Down Expand Up @@ -388,33 +390,33 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
var last = all[all.Length - 1];

return new CrudBatch(
Crud: all,
HaveMore: true,
CompleteCallback: async (string? writeCheckpoint) =>
{
await db.WriteTransaction(async tx =>
Crud: all,
HaveMore: true,
CompleteCallback: async (string? writeCheckpoint) =>
{
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);

if (!string.IsNullOrEmpty(writeCheckpoint))
await db.WriteTransaction(async tx =>
{
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
if (crudResult?.Length > 0)
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);

if (!string.IsNullOrEmpty(writeCheckpoint))
{
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
if (crudResult?.Length > 0)
{
await tx.Execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
[writeCheckpoint]);
}
}
else
{
await tx.Execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
[writeCheckpoint]);
[GetMaxOpId()]);
}
}
else
{
await tx.Execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
[GetMaxOpId()]);
}
});
}
);
});
}
);
}

public async Task<CrudEntry?> NextCrudItem()
Expand All @@ -434,4 +436,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint)
// No Op
await Task.CompletedTask;
}

record ControlResult(string? r);

public async Task<string> Control(string op, object? payload = null)
{
return await db.WriteTransaction(async tx =>
{
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload]);
return result.r!;
});
}
}
129 changes: 129 additions & 0 deletions PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;

namespace PowerSync.Common.Client.Sync.Stream;

/// <summary>
/// An internal instruction emitted by the sync client in the core extension in response to the
/// SDK passing sync data into the extension.
/// </summary>
public abstract class Instruction
{

public static Instruction[] ParseInstructions(string rawResponse)
{
var jsonArray = JArray.Parse(rawResponse);
List<Instruction> instructions = [];

foreach (JObject item in jsonArray)
{
var instruction = ParseInstruction(item);
if (instruction == null)
{
throw new JsonSerializationException("Failed to parse instruction from JSON.");
}
instructions.Add(instruction);
}

return instructions.ToArray();
}

public static Instruction? ParseInstruction(JObject json)
{
if (json.ContainsKey("LogLine"))
return json["LogLine"]!.ToObject<LogLine>();
if (json.ContainsKey("UpdateSyncStatus"))
return json["UpdateSyncStatus"]!.ToObject<UpdateSyncStatus>();
if (json.ContainsKey("EstablishSyncStream"))
return json["EstablishSyncStream"]!.ToObject<EstablishSyncStream>();
if (json.ContainsKey("FetchCredentials"))
return json["FetchCredentials"]!.ToObject<FetchCredentials>();
if (json.ContainsKey("CloseSyncStream"))
return new CloseSyncStream();
if (json.ContainsKey("FlushFileSystem"))
return new FlushFileSystem();
if (json.ContainsKey("DidCompleteSync"))
return new DidCompleteSync();

throw new JsonSerializationException("Unknown Instruction type.");
}
}

public class LogLine : Instruction
{
[JsonProperty("severity")]
public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING"

[JsonProperty("line")]
public string Line { get; set; } = null!;
}

public class EstablishSyncStream : Instruction
{
[JsonProperty("request")]
public StreamingSyncRequest Request { get; set; } = null!;
}

public class UpdateSyncStatus : Instruction
{
[JsonProperty("status")]
public CoreSyncStatus Status { get; set; } = null!;
}

public class CoreSyncStatus
{
[JsonProperty("connected")]
public bool Connected { get; set; }

[JsonProperty("connecting")]
public bool Connecting { get; set; }

[JsonProperty("priority_status")]
public List<SyncPriorityStatus> PriorityStatus { get; set; } = [];

[JsonProperty("downloading")]
public DownloadProgress? Downloading { get; set; }
}

public class SyncPriorityStatus
{
[JsonProperty("priority")]
public int Priority { get; set; }

[JsonProperty("last_synced_at")]
public long LastSyncedAt { get; set; }

[JsonProperty("has_synced")]
public bool? HasSynced { get; set; }
}

public class DownloadProgress
{
[JsonProperty("buckets")]
public Dictionary<string, BucketProgress> Buckets { get; set; } = null!;
}

public class BucketProgress
{
[JsonProperty("priority")]
public int Priority { get; set; }

[JsonProperty("at_last")]
public int AtLast { get; set; }

[JsonProperty("since_last")]
public int SinceLast { get; set; }

[JsonProperty("target_count")]
public int TargetCount { get; set; }
}

public class FetchCredentials : Instruction
{
[JsonProperty("did_expire")]
public bool DidExpire { get; set; }
}

public class CloseSyncStream : Instruction { }
public class FlushFileSystem : Instruction { }
public class DidCompleteSync : Instruction { }
31 changes: 31 additions & 0 deletions PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,37 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
return JsonConvert.DeserializeObject<T>(responseData)!;
}

/// <summary>
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
/// </summary>
public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
{
var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);

if (response.Content == null)
{
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
}

if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
{
InvalidateCredentials();
}

if (!response.IsSuccessStatusCode)
{
var errorText = await response.Content.ReadAsStringAsync();
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
}

return await response.Content.ReadAsStreamAsync();
}


/// <summary>
/// Originally used for the C# streaming sync implementation.
/// </summary>
public async IAsyncEnumerable<StreamingSyncLine?> PostStream(SyncStreamOptions options)
{
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
Expand Down
Loading