Skip to content

Commit cc7287c

Browse files
authored
[Perf] Add EventPerfTest to framework, add tests for EventHubs and EventHubs.Processor (Azure#22760)
- Only change to perf framework is adding EventPerfTest - Adds sample event-based perf tests under Azure.Sample.Perf - Adds real-world perf tests for EventHubs and EventHubs.Processor - Builds on Azure#24653 - Fixes Azure#24322
1 parent 5baa6d5 commit cc7287c

30 files changed

+945
-61
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
6+
namespace Azure.Sample.Perf.Event
7+
{
8+
public class MockErrorEventArgs
9+
{
10+
public int Partition { get; }
11+
public Exception Exception { get; }
12+
13+
public MockErrorEventArgs(int partition, Exception exception)
14+
{
15+
Partition = partition;
16+
Exception = exception;
17+
}
18+
}
19+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
namespace Azure.Sample.Perf.Event
5+
{
6+
public class MockEventArgs
7+
{
8+
public int Partition { get; }
9+
public string Data { get; }
10+
11+
public MockEventArgs(int partition, string data)
12+
{
13+
Partition = partition;
14+
Data = data;
15+
}
16+
}
17+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Diagnostics;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Azure.Sample.Perf.Event
10+
{
11+
public class MockEventProcessor
12+
{
13+
public int Partitions { get; }
14+
public int? MaxEventsPerSecond { get; }
15+
private double MaxEventsPerSecondPerPartition => ((double)MaxEventsPerSecond) / Partitions;
16+
17+
public TimeSpan? ErrorAfter { get; }
18+
private bool _errorRaised;
19+
private object _errorLock = new object();
20+
21+
private readonly MockEventArgs[] _eventArgs;
22+
private readonly int[] _eventsRaised;
23+
private readonly Stopwatch _sw = new Stopwatch();
24+
25+
private Func<MockEventArgs, Task> _processEventAsync;
26+
private Func<MockErrorEventArgs, Task> _processErrorAsync;
27+
28+
public MockEventProcessor(int partitions, int? maxEventsPerSecond, TimeSpan? errorAfter)
29+
{
30+
Partitions = partitions;
31+
MaxEventsPerSecond = maxEventsPerSecond;
32+
ErrorAfter = errorAfter;
33+
34+
_eventArgs = new MockEventArgs[partitions];
35+
for (var i=0; i < partitions; i++)
36+
{
37+
_eventArgs[i] = new MockEventArgs(partition: i, data: "hello");
38+
}
39+
40+
_eventsRaised = new int[partitions];
41+
}
42+
43+
public event Func<MockEventArgs, Task> ProcessEventAsync
44+
{
45+
add
46+
{
47+
_processEventAsync = value;
48+
}
49+
remove
50+
{
51+
_processEventAsync = default;
52+
}
53+
}
54+
55+
public event Func<MockErrorEventArgs, Task> ProcessErrorAsync
56+
{
57+
add
58+
{
59+
_processErrorAsync = value;
60+
}
61+
remove
62+
{
63+
_processErrorAsync = default;
64+
}
65+
}
66+
67+
public Task StartProcessingAsync()
68+
{
69+
_sw.Start();
70+
71+
for (var i=0; i < Partitions; i++)
72+
{
73+
var j = i;
74+
Task.Run(() => Process(j));
75+
}
76+
77+
return Task.CompletedTask;
78+
}
79+
80+
private void Process(int partition)
81+
{
82+
var eventArgs = _eventArgs[partition];
83+
84+
if (MaxEventsPerSecond.HasValue)
85+
{
86+
while (true)
87+
{
88+
if (ErrorAfter.HasValue && !_errorRaised && _sw.Elapsed > ErrorAfter)
89+
{
90+
lock (_errorLock)
91+
{
92+
if (!_errorRaised)
93+
{
94+
_processErrorAsync(new MockErrorEventArgs(partition, new InvalidOperationException("test exception")));
95+
_errorRaised = true;
96+
}
97+
}
98+
}
99+
else
100+
{
101+
var eventsRaised = _eventsRaised[partition];
102+
var targetEventsRaised = _sw.Elapsed.TotalSeconds * MaxEventsPerSecondPerPartition;
103+
104+
if (eventsRaised < targetEventsRaised)
105+
{
106+
_processEventAsync(eventArgs).Wait();
107+
_eventsRaised[partition]++;
108+
}
109+
else
110+
{
111+
Thread.Sleep(TimeSpan.FromSeconds(1 / MaxEventsPerSecondPerPartition));
112+
}
113+
}
114+
}
115+
}
116+
else
117+
{
118+
while (true)
119+
{
120+
if (ErrorAfter.HasValue && !_errorRaised && _sw.Elapsed > ErrorAfter)
121+
{
122+
lock (_errorLock)
123+
{
124+
if (!_errorRaised)
125+
{
126+
_processErrorAsync(new MockErrorEventArgs(partition, new InvalidOperationException("test exception")));
127+
_errorRaised = true;
128+
}
129+
}
130+
}
131+
else
132+
{
133+
_processEventAsync(eventArgs).Wait();
134+
_eventsRaised[partition]++;
135+
}
136+
}
137+
}
138+
}
139+
}
140+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Test.Perf;
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Diagnostics;
8+
using System.Linq;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace Azure.Sample.Perf.Event
13+
{
14+
public class MockEventProcessorBaseTest : PerfTestBase<MockEventProcessorOptions>
15+
{
16+
private readonly Stopwatch _stopwatch;
17+
private readonly MockEventProcessor _eventProcessor;
18+
19+
private long[] _eventsProcessed;
20+
public override long CompletedOperations => _eventsProcessed.Sum();
21+
22+
public override IList<TimeSpan> Latencies => throw new NotImplementedException();
23+
public override IList<TimeSpan> CorrectedLatencies => throw new NotImplementedException();
24+
25+
public MockEventProcessorBaseTest(MockEventProcessorOptions options) : base(options)
26+
{
27+
_stopwatch = new Stopwatch();
28+
29+
_eventProcessor = new MockEventProcessor(options.Partitions, options.MaxEventsPerSecond,
30+
options.ErrorAfterSeconds.HasValue ? TimeSpan.FromSeconds(options.ErrorAfterSeconds.Value) : null);
31+
32+
_eventProcessor.ProcessEventAsync += ProcessEventAsync;
33+
34+
_eventsProcessed = new long[options.Partitions];
35+
}
36+
37+
private Task ProcessEventAsync(MockEventArgs arg)
38+
{
39+
Interlocked.Increment(ref _eventsProcessed[arg.Partition]);
40+
LastCompletionTime = _stopwatch.Elapsed;
41+
42+
return Task.CompletedTask;
43+
}
44+
45+
public override async Task SetupAsync()
46+
{
47+
await base.SetupAsync();
48+
await _eventProcessor.StartProcessingAsync();
49+
}
50+
51+
public override void RunAll(CancellationToken cancellationToken)
52+
{
53+
for (var i = 0; i < _eventsProcessed.Length; i++)
54+
{
55+
Interlocked.Exchange(ref _eventsProcessed[i], 0);
56+
}
57+
LastCompletionTime = default;
58+
59+
RunAllAsync(cancellationToken).Wait();
60+
}
61+
62+
public override async Task RunAllAsync(CancellationToken cancellationToken)
63+
{
64+
for (var i = 0; i < _eventsProcessed.Length; i++)
65+
{
66+
Interlocked.Exchange(ref _eventsProcessed[i], 0);
67+
}
68+
LastCompletionTime = default;
69+
70+
_stopwatch.Restart();
71+
72+
try
73+
{
74+
await Task.Delay(Timeout.Infinite, cancellationToken);
75+
}
76+
catch (OperationCanceledException)
77+
{
78+
}
79+
}
80+
}
81+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Test.Perf;
5+
using System;
6+
using System.Threading.Tasks;
7+
8+
namespace Azure.Sample.Perf.Event
9+
{
10+
public class MockEventProcessorEventTest : EventPerfTest<MockEventProcessorOptions>
11+
{
12+
private readonly MockEventProcessor _eventProcessor;
13+
14+
public MockEventProcessorEventTest(MockEventProcessorOptions options) : base(options)
15+
{
16+
_eventProcessor = new MockEventProcessor(options.Partitions, options.MaxEventsPerSecond,
17+
options.ErrorAfterSeconds.HasValue ? TimeSpan.FromSeconds(options.ErrorAfterSeconds.Value) : null);
18+
19+
_eventProcessor.ProcessEventAsync += ProcessEventAsync;
20+
_eventProcessor.ProcessErrorAsync += ProcessErrorAsync;
21+
}
22+
23+
private Task ProcessEventAsync(MockEventArgs arg)
24+
{
25+
EventRaised();
26+
return Task.CompletedTask;
27+
}
28+
29+
private Task ProcessErrorAsync(MockErrorEventArgs arg)
30+
{
31+
ErrorRaised(arg.Exception);
32+
return Task.CompletedTask;
33+
}
34+
35+
public override async Task SetupAsync()
36+
{
37+
await base.SetupAsync();
38+
await _eventProcessor.StartProcessingAsync();
39+
}
40+
}
41+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Test.Perf;
5+
using CommandLine;
6+
7+
namespace Azure.Sample.Perf.Event
8+
{
9+
public class MockEventProcessorOptions : PerfOptions
10+
{
11+
[Option("error-after-seconds", HelpText = "Raise error after this number of seconds")]
12+
public int? ErrorAfterSeconds { get; set; }
13+
14+
[Option("max-events-per-second", HelpText = "Maximum events per second across all partitions")]
15+
public int? MaxEventsPerSecond { get; set; }
16+
17+
[Option("partitions", Default = 8)]
18+
public int Partitions { get; set; }
19+
}
20+
}

0 commit comments

Comments
 (0)