Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 83694ae

Browse files
committed
unit tests for kafka data provider
1 parent 24e234c commit 83694ae

16 files changed

+290
-64
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,6 @@
55
/.vs/Analogy.LogViewer.KafkaProvider/v16
66
/Analogy.Implementation.KafkaProvider/bin/Debug/netstandard2.0
77
/Analogy.Implementation.KafkaProvider/obj
8+
/Analogy.Implementation.KafkaProvider.UnitTests/bin/Debug
9+
/Analogy.Implementation.KafkaProvider.UnitTests/obj/Debug
10+
/packages
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project ToolsVersion="15.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3+
<Import Project="..\packages\librdkafka.redist.1.2.1\build\librdkafka.redist.props" Condition="Exists('..\packages\librdkafka.redist.1.2.1\build\librdkafka.redist.props')" />
4+
<Import Project="..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props" Condition="Exists('..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props')" />
5+
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
6+
<PropertyGroup>
7+
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
8+
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
9+
<ProjectGuid>{6A9A54EE-2F10-4EC6-8340-E923AD027137}</ProjectGuid>
10+
<OutputType>Library</OutputType>
11+
<AppDesignerFolder>Properties</AppDesignerFolder>
12+
<RootNamespace>Analogy.Implementation.KafkaProvider.UnitTests</RootNamespace>
13+
<AssemblyName>Analogy.Implementation.KafkaProvider.UnitTests</AssemblyName>
14+
<TargetFrameworkVersion>v4.7.2</TargetFrameworkVersion>
15+
<FileAlignment>512</FileAlignment>
16+
<ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
17+
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">15.0</VisualStudioVersion>
18+
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
19+
<ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
20+
<IsCodedUITest>False</IsCodedUITest>
21+
<TestProjectType>UnitTest</TestProjectType>
22+
<NuGetPackageImportStamp>
23+
</NuGetPackageImportStamp>
24+
</PropertyGroup>
25+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
26+
<DebugSymbols>true</DebugSymbols>
27+
<DebugType>full</DebugType>
28+
<Optimize>false</Optimize>
29+
<OutputPath>bin\Debug\</OutputPath>
30+
<DefineConstants>DEBUG;TRACE</DefineConstants>
31+
<ErrorReport>prompt</ErrorReport>
32+
<WarningLevel>4</WarningLevel>
33+
</PropertyGroup>
34+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
35+
<DebugType>pdbonly</DebugType>
36+
<Optimize>true</Optimize>
37+
<OutputPath>bin\Release\</OutputPath>
38+
<DefineConstants>TRACE</DefineConstants>
39+
<ErrorReport>prompt</ErrorReport>
40+
<WarningLevel>4</WarningLevel>
41+
</PropertyGroup>
42+
<ItemGroup>
43+
<Reference Include="Analogy.Interfaces, Version=2.1.2.0, Culture=neutral, processorArchitecture=MSIL">
44+
<HintPath>..\packages\Analogy.LogViewer.Interfaces.2.1.2\lib\netstandard2.0\Analogy.Interfaces.dll</HintPath>
45+
</Reference>
46+
<Reference Include="Confluent.Kafka, Version=1.2.1.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e, processorArchitecture=MSIL">
47+
<HintPath>..\packages\Confluent.Kafka.1.2.1\lib\net46\Confluent.Kafka.dll</HintPath>
48+
</Reference>
49+
<Reference Include="Microsoft.VisualStudio.TestPlatform.TestFramework, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
50+
<HintPath>..\packages\MSTest.TestFramework.1.3.2\lib\net45\Microsoft.VisualStudio.TestPlatform.TestFramework.dll</HintPath>
51+
</Reference>
52+
<Reference Include="Microsoft.VisualStudio.TestPlatform.TestFramework.Extensions, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
53+
<HintPath>..\packages\MSTest.TestFramework.1.3.2\lib\net45\Microsoft.VisualStudio.TestPlatform.TestFramework.Extensions.dll</HintPath>
54+
</Reference>
55+
<Reference Include="System" />
56+
<Reference Include="System.Buffers, Version=4.0.2.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
57+
<HintPath>..\packages\System.Buffers.4.4.0\lib\netstandard2.0\System.Buffers.dll</HintPath>
58+
</Reference>
59+
<Reference Include="System.Core" />
60+
<Reference Include="System.Memory, Version=4.0.1.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
61+
<HintPath>..\packages\System.Memory.4.5.0\lib\netstandard2.0\System.Memory.dll</HintPath>
62+
</Reference>
63+
<Reference Include="System.Numerics" />
64+
<Reference Include="System.Numerics.Vectors, Version=4.1.3.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
65+
<HintPath>..\packages\System.Numerics.Vectors.4.4.0\lib\net46\System.Numerics.Vectors.dll</HintPath>
66+
</Reference>
67+
<Reference Include="System.Runtime.CompilerServices.Unsafe, Version=4.0.4.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
68+
<HintPath>..\packages\System.Runtime.CompilerServices.Unsafe.4.5.0\lib\netstandard2.0\System.Runtime.CompilerServices.Unsafe.dll</HintPath>
69+
</Reference>
70+
</ItemGroup>
71+
<ItemGroup>
72+
<Compile Include="CreationTests.cs" />
73+
<Compile Include="Properties\AssemblyInfo.cs" />
74+
</ItemGroup>
75+
<ItemGroup>
76+
<None Include="packages.config" />
77+
</ItemGroup>
78+
<ItemGroup>
79+
<ProjectReference Include="..\Analogy.Implementation.KafkaProvider\Analogy.Implementation.KafkaProvider.csproj">
80+
<Project>{512d664a-7f6f-412c-b47c-78c4f6d7bdaa}</Project>
81+
<Name>Analogy.Implementation.KafkaProvider</Name>
82+
</ProjectReference>
83+
</ItemGroup>
84+
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
85+
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
86+
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
87+
<PropertyGroup>
88+
<ErrorText>This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
89+
</PropertyGroup>
90+
<Error Condition="!Exists('..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.props'))" />
91+
<Error Condition="!Exists('..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets'))" />
92+
<Error Condition="!Exists('..\packages\librdkafka.redist.1.2.1\build\librdkafka.redist.props')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\librdkafka.redist.1.2.1\build\librdkafka.redist.props'))" />
93+
</Target>
94+
<Import Project="..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets" Condition="Exists('..\packages\MSTest.TestAdapter.1.3.2\build\net45\MSTest.TestAdapter.targets')" />
95+
</Project>
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Reflection;
5+
using Analogy.Interfaces.Factories;
6+
using Microsoft.VisualStudio.TestTools.UnitTesting;
7+
8+
namespace Analogy.Implementation.KafkaProvider.UnitTests
9+
{
10+
[TestClass]
11+
public class CreationTests
12+
{
13+
14+
15+
16+
[TestMethod]
17+
public void CreationTest()
18+
{
19+
20+
var factories = GetFactories();
21+
Assert.IsTrue(factories != null);
22+
}
23+
24+
private List<IAnalogyFactory> GetFactories()
25+
{
26+
List<IAnalogyFactory> factories = new List<IAnalogyFactory>();
27+
try
28+
{
29+
Assembly assembly = Assembly.LoadFile(Path.Combine(Environment.CurrentDirectory, "Analogy.Implementation.KafkaProvider.dll"));
30+
Type[] types = assembly.GetTypes();
31+
foreach (Type aType in types)
32+
{
33+
try
34+
{
35+
if (aType.GetInterface(nameof(IAnalogyFactory)) != null)
36+
{
37+
if (!(Activator.CreateInstance(aType) is IAnalogyFactory factory)) continue;
38+
factories.Add(factory);
39+
foreach (var provider in factory.DataProviders.Items)
40+
{
41+
provider.InitDataProvider();
42+
}
43+
44+
}
45+
}
46+
catch (Exception e)
47+
{
48+
Assert.Fail("Failed with error: " + e);
49+
}
50+
51+
}
52+
53+
return factories;
54+
}
55+
catch (Exception e)
56+
{
57+
Assert.Fail("Failed with error: " + e);
58+
return new List<IAnalogyFactory>(0);
59+
}
60+
}
61+
62+
public void TestStartConsuming()
63+
{
64+
65+
}
66+
}
67+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Reflection;
2+
using System.Runtime.CompilerServices;
3+
using System.Runtime.InteropServices;
4+
5+
[assembly: AssemblyTitle("Analogy.Implementation.KafkaProvider.UnitTests")]
6+
[assembly: AssemblyDescription("")]
7+
[assembly: AssemblyConfiguration("")]
8+
[assembly: AssemblyCompany("")]
9+
[assembly: AssemblyProduct("Analogy.Implementation.KafkaProvider.UnitTests")]
10+
[assembly: AssemblyCopyright("Copyright © 2019")]
11+
[assembly: AssemblyTrademark("")]
12+
[assembly: AssemblyCulture("")]
13+
14+
[assembly: ComVisible(false)]
15+
16+
[assembly: Guid("6a9a54ee-2f10-4ec6-8340-e923ad027137")]
17+
18+
// [assembly: AssemblyVersion("1.0.*")]
19+
[assembly: AssemblyVersion("1.0.0.0")]
20+
[assembly: AssemblyFileVersion("1.0.0.0")]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<packages>
3+
<package id="Analogy.LogViewer.Interfaces" version="2.1.2" targetFramework="net472" />
4+
<package id="Confluent.Kafka" version="1.2.1" targetFramework="net472" />
5+
<package id="librdkafka.redist" version="1.2.1" targetFramework="net472" />
6+
<package id="MSTest.TestAdapter" version="1.3.2" targetFramework="net472" />
7+
<package id="MSTest.TestFramework" version="1.3.2" targetFramework="net472" />
8+
<package id="System.Buffers" version="4.4.0" targetFramework="net472" />
9+
<package id="System.Memory" version="4.5.0" targetFramework="net472" />
10+
<package id="System.Numerics.Vectors" version="4.4.0" targetFramework="net472" />
11+
<package id="System.Runtime.CompilerServices.Unsafe" version="4.5.0" targetFramework="net472" />
12+
</packages>

Analogy.Implementation.KafkaProvider/Analogy.Implementation.KafkaProvider.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
5-
<Version>0.1.0</Version>
5+
<Version>0.1.1</Version>
66
<Authors>Lior Banai</Authors>
77
<Company>Lior Banai</Company>
88
<Product>Analogy.Implementation.KafkaProvider</Product>
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<PackageReference Include="Analogy.LogViewer.Interfaces" Version="2.1.1" />
12+
<PackageReference Include="Analogy.LogViewer.Interfaces" Version="2.1.2" />
1313
<PackageReference Include="Confluent.Kafka" Version="1.2.1" />
1414
</ItemGroup>
1515

Analogy.Implementation.KafkaProvider/AnalogyKafkaDataProvider.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,19 @@ public class AnalogyKafkaDataProvider : IAnalogyRealTimeDataProvider
1414
public event EventHandler<AnalogyLogMessagesArgs> OnManyMessagesReady;
1515
public IAnalogyOfflineDataProvider FileOperationsHandler { get; }
1616
public bool IsConnected { get; private set; }
17-
public KafkaConsumer Consumer { get; set; }
17+
public KafkaConsumer<AnalogyLogMessage> Consumer { get; set; }
18+
public string groupId = "AnalogyKafkaLogin";
1819
public string topic = "KafkaLog";
1920
public string kafkaUrl = "localhost:9092";
2021
public Task<bool> CanStartReceiving() => Task.FromResult(IsConnected);
2122
private Task Consuming;
22-
private Task Reading;
2323
public AnalogyKafkaDataProvider()
2424
{
2525

2626
}
2727
public void StartReceiving()
2828
{
2929
Consuming = Consumer.StartConsuming();
30-
Reading = Consumer.ReadMessages();
31-
3230
}
3331

3432
public void StopReceiving()
@@ -39,13 +37,13 @@ public void StopReceiving()
3937
public void InitDataProvider()
4038
{
4139

42-
Consumer = new KafkaConsumer(kafkaUrl, topic);
40+
Consumer = new KafkaConsumer<AnalogyLogMessage>(groupId, kafkaUrl, topic);
4341
Consumer.OnMessageReady += Consumer_OnMessageReady;
4442
IsConnected = true;
4543

4644
}
4745

48-
private void Consumer_OnMessageReady(object sender, AnalogyKafkaLogMessageArgs e)
46+
private void Consumer_OnMessageReady(object sender, KafkaMessageArgs<AnalogyLogMessage> e)
4947
{
5048
OnMessageReady?.Invoke(sender, new AnalogyLogMessageArgs(e.Message, Environment.MachineName, Environment.MachineName, ID));
5149
}

Analogy.Implementation.KafkaProvider/AnalogyKafkaFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class AnalogyKafkaFactory : IAnalogyFactory
1717

1818
public IEnumerable<IAnalogyChangeLog> ChangeLog { get; } = new List<AnalogyChangeLog>
1919
{
20+
new AnalogyChangeLog("Add multi topic subscription",AnalogChangeLogType.None, "Lior Banai",new DateTime(2019, 10, 31)),
2021
new AnalogyChangeLog("Create Initial implementation",AnalogChangeLogType.None, "Lior Banai",new DateTime(2019, 10, 19))
2122
};
2223
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using Analogy.Interfaces;
5+
6+
namespace Analogy.Implementation.KafkaProvider
7+
{
8+
class AnalogyKafkaProducer : KafkaProducer<AnalogyLogMessage>
9+
{
10+
public AnalogyKafkaProducer(string kafkaServerURL, string topic) : base(kafkaServerURL, topic, new KafkaSerializer<AnalogyLogMessage>())
11+
{
12+
}
13+
}
14+
}

Analogy.Implementation.KafkaProvider/Example/AnalogyKafkaExampleDataProvider.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ public class AnalogyKafkaExampleDataProvider : IAnalogyRealTimeDataProvider
1515
public event EventHandler<AnalogyLogMessagesArgs> OnManyMessagesReady;
1616
public IAnalogyOfflineDataProvider FileOperationsHandler { get; }
1717
public bool IsConnected { get; private set; }
18-
public KafkaConsumer Consumer { get; set; }
19-
public KafkaProducer Producer { get; set; }
18+
public KafkaConsumer<AnalogyLogMessage> Consumer { get; set; }
19+
public KafkaProducer<AnalogyLogMessage> Producer { get; set; }
20+
public string groupId = "AnalogyKafkaExample";
2021
public string topic = "KafkaLog";
2122
public string kafkaUrl = "localhost:9092";
2223
public Task<bool> CanStartReceiving() => Task.FromResult(IsConnected);
2324
private TimerMessagesSimulator sim;
2425
private Task Consuming;
25-
private Task Reading;
2626
public AnalogyKafkaExampleDataProvider()
2727
{
2828

@@ -31,7 +31,6 @@ public void StartReceiving()
3131
{
3232
sim.Start();
3333
Consuming = Consumer.StartConsuming();
34-
Reading = Consumer.ReadMessages();
3534

3635
}
3736

@@ -43,15 +42,15 @@ public void StopReceiving()
4342
public void InitDataProvider()
4443
{
4544

46-
Producer = new KafkaProducer(kafkaUrl, topic);
47-
Consumer = new KafkaConsumer(kafkaUrl, topic);
45+
Producer = new KafkaProducer<AnalogyLogMessage>(kafkaUrl, topic, new KafkaSerializer<AnalogyLogMessage>());
46+
Consumer = new KafkaConsumer<AnalogyLogMessage>(groupId, kafkaUrl, topic);
4847
Consumer.OnMessageReady += Consumer_OnMessageReady;
4948
sim = new TimerMessagesSimulator(async m => { await Producer.PublishAsync(m); });
5049
IsConnected = true;
5150

5251
}
5352

54-
private void Consumer_OnMessageReady(object sender, AnalogyKafkaLogMessageArgs e)
53+
private void Consumer_OnMessageReady(object sender, KafkaMessageArgs<AnalogyLogMessage> e)
5554
{
5655
OnMessageReady?.Invoke(sender, new AnalogyLogMessageArgs(e.Message, Environment.MachineName, Environment.MachineName, ID));
5756
}

0 commit comments

Comments
 (0)