Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 300ecc8

Browse files
committed
add on error
1 parent 6047e23 commit 300ecc8

File tree

5 files changed

+24
-3
lines changed

5 files changed

+24
-3
lines changed

Analogy.Implementation.KafkaProvider.Example/AnalogyKafkaExampleDataProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ public void StartReceiving()
3434

3535
public void StopReceiving()
3636
{
37+
sim.Stop();
3738
Consumer.StopConsuming();
39+
Consumer.OnMessageReady -= Consumer_OnMessageReady;
40+
Consumer.OnError -= Consumer_OnError;
41+
3842
}
3943

4044
public void InitDataProvider()
@@ -43,11 +47,18 @@ public void InitDataProvider()
4347
Producer = new KafkaProducer<AnalogyLogMessage>(kafkaUrl, topic, new KafkaSerializer<AnalogyLogMessage>());
4448
Consumer = new KafkaConsumer<AnalogyLogMessage>(groupId, kafkaUrl, topic);
4549
Consumer.OnMessageReady += Consumer_OnMessageReady;
50+
Consumer.OnError += Consumer_OnError;
4651
sim = new TimerMessagesSimulator(async m => { await Producer.PublishAsync(m); });
4752
IsConnected = true;
4853

4954
}
5055

56+
private void Consumer_OnError(object sender, KafkaMessageArgs<string> e)
57+
{
58+
AnalogyLogMessage error = new AnalogyLogMessage(e.Message, AnalogyLogLevel.Error, AnalogyLogClass.General, Environment.MachineName);
59+
OnMessageReady?.Invoke(sender, new AnalogyLogMessageArgs(error, Environment.MachineName, Environment.MachineName, ID));
60+
}
61+
5162
private void Consumer_OnMessageReady(object sender, KafkaMessageArgs<AnalogyLogMessage> e)
5263
{
5364
OnMessageReady?.Invoke(sender, new AnalogyLogMessageArgs(e.Message, Environment.MachineName, Environment.MachineName, ID));

Analogy.Implementation.KafkaProvider.Example/TimerMessagesSimulator.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ public TimerMessagesSimulator(Action<AnalogyLogMessage> action)
2727
}
2828

2929
public void Start() => SimulateOnlineMessages.Start();
30+
public void Stop() => SimulateOnlineMessages.Stop();
3031
}
3132
}

Analogy.Implementation.KafkaProvider/Analogy.Implementation.KafkaProvider.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
5-
<Version>0.1.1</Version>
5+
<Version>0.1.2</Version>
66
<Authors>Lior Banai</Authors>
77
<Company>Lior Banai</Company>
88
<Product>Analogy.Implementation.KafkaProvider</Product>

Analogy.Implementation.KafkaProvider/Analogy.Implementation.KafkaProvider.nuspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
33
<metadata>
44
<id>Analogy.LogViewer.KafkaProvider</id>
5-
<version>0.1.1</version>
5+
<version>0.1.2</version>
66
<title>Analogy Log Viewer Kafka Provider</title>
77
<authors>Lior Banai</authors>
88
<owners>Lior Banai</owners>

Analogy.Implementation.KafkaProvider/KafkaConsumer.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class KafkaConsumer<T>
1414
private string Topic { get; set; }
1515
private ConsumerConfig Config { get; set; }
1616
public event EventHandler<KafkaMessageArgs<T>> OnMessageReady;
17+
public event EventHandler<KafkaMessageArgs<string>> OnError;
1718
public BlockingCollectionQueue<T> Queue;
1819
public BlockingCollectionQueue<string> ErrorsQueue;
1920
private readonly KafkaSerializer<T> serializer;
@@ -54,12 +55,19 @@ private Task ConsumeAsync()
5455
}
5556
catch (TaskCanceledException ce)
5657
{
58+
string error = $"TaskCanceledException occurred. Exception: {ce}";
59+
ErrorsQueue.Enqueue(error);
60+
OnError?.Invoke(this, new KafkaMessageArgs<string>(error));
5761
Queue.CompleteAdding();
5862
return;
5963
}
6064
catch (ConsumeException e)
6165
{
62-
ErrorsQueue.Enqueue($"Error occurred: {e.Error.Reason}");
66+
string error = $"Error occurred: {e.Error.Reason}. Exception: {e}";
67+
ErrorsQueue.Enqueue(error);
68+
OnError?.Invoke(this, new KafkaMessageArgs<string>(error));
69+
Queue.CompleteAdding();
70+
return;
6371
}
6472
}
6573
}
@@ -84,6 +92,7 @@ private Task ReadAsync() => Task.Factory.StartNew(() =>
8492
public void StopConsuming()
8593
{
8694
cts.Cancel();
95+
Queue.CompleteAdding();
8796
}
8897

8998

0 commit comments

Comments
 (0)