Skip to content

Commit 49c0750

Browse files
author
Anouar Hassine
authored
Updates related to XComponent V 5.1 (#83)
Migrating to string state machine ids in order to support xcomponent versions > 5.0 and adding support for chunks in snapshots
1 parent c67c62c commit 49c0750

25 files changed

+506
-211
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace ReactiveXComponent.Common
5+
{
6+
public class ChunkCountdown : IDisposable
7+
{
8+
private readonly ManualResetEvent _completionResetEvent = new ManualResetEvent(false);
9+
10+
private const int ValueNotSet = int.MinValue;
11+
private int _countdown = ValueNotSet;
12+
13+
public void SetValueIfNotInitialized(int countdown)
14+
{
15+
Interlocked.CompareExchange(ref _countdown, countdown, ValueNotSet);
16+
}
17+
18+
public void Decrement()
19+
{
20+
bool isCompleted = Interlocked.Decrement(ref _countdown) <= 0;
21+
if (isCompleted)
22+
{
23+
_completionResetEvent.Set();
24+
}
25+
}
26+
27+
public WaitHandle CompletionResetEvent => _completionResetEvent;
28+
29+
public int Countdown => _countdown;
30+
31+
public void Dispose()
32+
{
33+
_completionResetEvent?.Dispose();
34+
}
35+
}
36+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
3+
namespace ReactiveXComponent.Common
4+
{
5+
public class ChunkedSnapshotEvent
6+
{
7+
public string RequestId { get; set; }
8+
9+
public SnapshotResponseChunk SnapshotResponseChunk { get; set; }
10+
}
11+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Globalization;
4+
using System.IO;
5+
using System.IO.Compression;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
using Newtonsoft.Json;
10+
11+
namespace ReactiveXComponent.Common
12+
{
13+
public class GZipSnapshotItemArrayJsonConverter<T> : JsonConverter
14+
{
15+
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
16+
{
17+
var stringWriter = new StringWriter(new StringBuilder(256), CultureInfo.InvariantCulture);
18+
using (var jsonTextWriter = new JsonTextWriter(stringWriter))
19+
{
20+
jsonTextWriter.Formatting = serializer.Formatting;
21+
serializer.Serialize(jsonTextWriter, value);
22+
var input = Encoding.UTF8.GetBytes(stringWriter.ToString());
23+
using (var ms = new MemoryStream())
24+
{
25+
using (var compressedStream = new GZipStream(ms, CompressionMode.Compress))
26+
{
27+
compressedStream.Write(input, 0, input.Length);
28+
compressedStream.Close();
29+
}
30+
var compressedBase64 = Convert.ToBase64String(ms.ToArray());
31+
serializer.Serialize(writer, compressedBase64, typeof(string));
32+
}
33+
}
34+
}
35+
36+
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
37+
{
38+
var input = reader.Value as string;
39+
if (input != null)
40+
{
41+
var compressedInput = Convert.FromBase64String(input);
42+
using (var ms = new MemoryStream(compressedInput))
43+
{
44+
using (var decompressedStream = new GZipStream(ms, CompressionMode.Decompress))
45+
{
46+
using (var streamReader = new StreamReader(decompressedStream))
47+
{
48+
return serializer.Deserialize(streamReader, typeof(T));
49+
}
50+
}
51+
}
52+
}
53+
54+
return null;
55+
}
56+
57+
public override bool CanConvert(Type objectType)
58+
{
59+
return typeof(T) == objectType;
60+
}
61+
}
62+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace ReactiveXComponent.Common
5+
{
6+
public class SnapshotEvent
7+
{
8+
public string RequestId { get; set; }
9+
10+
public SnapshotResponse SnapshotResponse { get; set; }
11+
}
12+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace ReactiveXComponent.Common
8+
{
9+
public class SnapshotItem
10+
{
11+
public const int DefaultIntValue = -1;
12+
public const string DefaultStringValue = "";
13+
14+
public string StateMachineId { get; set; }
15+
16+
public int WorkerId { get; set; }
17+
18+
public int StateMachineCode { get; set; }
19+
20+
public int StateCode { get; set; }
21+
22+
public int ComponentCode { get; set; }
23+
24+
public object PublicMember { get; set; }
25+
26+
public SnapshotItem()
27+
{
28+
StateMachineId = DefaultStringValue;
29+
WorkerId = DefaultIntValue;
30+
StateMachineCode = DefaultIntValue;
31+
StateCode = DefaultIntValue;
32+
ComponentCode = DefaultIntValue;
33+
PublicMember = null;
34+
}
35+
}
36+
}

ReactiveXComponent/Common/SnapshotItems.cs

Lines changed: 0 additions & 11 deletions
This file was deleted.

ReactiveXComponent/Common/SnapshotMessage.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ public class SnapshotMessage
1313
public TimeSpan? Timeout { get; set; } = DefaultTimeout;
1414
public List<string> CallerPrivateTopic { get; set; }
1515
public string ReplyTopic { get; set; }
16+
public int? ChunkSize { get; set; }
1617
}
1718
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+

2+
using System;
3+
using System.Collections.Generic;
4+
using Newtonsoft.Json;
5+
6+
namespace ReactiveXComponent.Common
7+
{
8+
[Serializable]
9+
public class SnapshotResponse
10+
{
11+
[JsonConverter(typeof(GZipSnapshotItemArrayJsonConverter<List<SnapshotItem>>))]
12+
public List<SnapshotItem> Items { get; set; }
13+
14+
public SnapshotResponse()
15+
{
16+
Items = new List<SnapshotItem>();
17+
}
18+
}
19+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace ReactiveXComponent.Common
5+
{
6+
public class SnapshotResponseChunk
7+
{
8+
public SnapshotResponseChunk()
9+
{
10+
}
11+
12+
public SnapshotResponseChunk(string runtimeId, List<string> knownRuntimeIds)
13+
{
14+
RuntimeId = runtimeId;
15+
KnownRuntimeIds = knownRuntimeIds;
16+
}
17+
18+
public int ChunkCount { get; set; }
19+
20+
public int ChunkId { get; set; }
21+
22+
public string RuntimeId { get; set; }
23+
24+
public List<string> KnownRuntimeIds { get; set; }
25+
26+
public SnapshotResponse Response { get; set; }
27+
}
28+
}

ReactiveXComponent/Common/StateMachineInstance.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace ReactiveXComponent.Common
55
[Serializable]
66
public class StateMachineInstance
77
{
8-
public long StateMachineId { get; set; }
8+
public string StateMachineId { get; set; }
99

1010
public int AgentId { get; set; }
1111

0 commit comments

Comments
 (0)