Skip to content

Commit 2f74e20

Browse files
authored
[Perf] Added EventPerfTest base class + sample (Azure#22002)
* Added Event base class * Added event sample test * Updated event counter * Some import cleanup * Removed extra import
1 parent 396fb73 commit 2f74e20

File tree

8 files changed

+354
-5
lines changed

8 files changed

+354
-5
lines changed

tools/azure-devtools/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
"GitPython",
5959
"requests>=2.0",
6060
],
61-
"systemperf": ["aiohttp>=3.0", "requests>=2.0", "tornado==6.0.3", "pycurl==7.43.0.5", "httpx>=0.21", "azure-core"],
61+
"systemperf": ["aiohttp>=3.0", "requests>=2.0", "tornado==6.0.3", "httpx>=0.21", "azure-core"],
6262
},
6363
package_dir={"": "src"},
6464
install_requires=DEPENDENCIES,

tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
from ._random_stream import RandomStream, WriteStream, get_random_bytes
1212
from ._async_random_stream import AsyncRandomStream
1313
from ._batch_perf_test import BatchPerfTest
14+
from ._event_perf_test import EventPerfTest
1415

1516
__all__ = [
1617
"PerfStressTest",
1718
"BatchPerfTest",
19+
"EventPerfTest",
1820
"RandomStream",
1921
"WriteStream",
2022
"AsyncRandomStream",

tools/azure-devtools/src/azure_devtools/perfstress_tests/_batch_perf_test.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import cProfile
77
import os
8-
import threading
98
import aiohttp
109
import time
1110
from typing import Optional, Any, Dict, List
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
import asyncio
7+
import threading
8+
import time
9+
10+
from ._repeated_timer import AtomicCounter
11+
from ._perf_stress_base import _PerfTestBase
12+
13+
14+
class EventPerfTest(_PerfTestBase):
15+
16+
def __init__(self, arguments):
17+
super().__init__(arguments)
18+
if self.args.profile:
19+
raise NotImplementedError("Profiler support for event tests pending.")
20+
21+
if self.args.sync:
22+
self._condition = threading.Condition()
23+
else:
24+
self._condition = asyncio.Condition()
25+
self._start_time = time.time()
26+
self._error = None
27+
self._processing = None
28+
self._completed_operations = AtomicCounter()
29+
30+
@property
31+
def completed_operations(self) -> int:
32+
"""
33+
Total number of operations completed by run_all().
34+
Reset after warmup.
35+
"""
36+
return self._completed_operations.value()
37+
38+
@property
39+
def last_completion_time(self) -> float:
40+
"""
41+
Elapsed time between start of warmup/run and last completed operation.
42+
Reset after warmup.
43+
"""
44+
return self._last_completion_time - self._start_time
45+
46+
def event_raised_sync(self):
47+
self._completed_operations.increment()
48+
self._last_completion_time = time.time()
49+
50+
def error_raised_sync(self, error):
51+
with self._condition:
52+
self._error = error
53+
self._condition.notify_all()
54+
55+
async def event_raised_async(self):
56+
self._completed_operations.increment()
57+
self._last_completion_time = time.time()
58+
59+
async def error_raised_async(self, error):
60+
async with self._condition:
61+
self._error = error
62+
self._condition.notify_all()
63+
64+
async def setup(self) -> None:
65+
"""
66+
Setup called once per parallel test instance.
67+
Used to setup state specific to this test instance.
68+
"""
69+
if self.args.sync:
70+
self._processing = threading.Thread(target=self.start_events_sync)
71+
self._processing.daemon = True
72+
self._processing.start()
73+
else:
74+
self._processing = asyncio.ensure_future(self.start_events_async())
75+
76+
async def cleanup(self) -> None:
77+
"""
78+
Cleanup called once per parallel test instance.
79+
Used to cleanup state specific to this test instance.
80+
"""
81+
if self.args.sync:
82+
self.stop_events_sync()
83+
self._processing.join()
84+
else:
85+
await self.stop_events_async()
86+
await self._processing
87+
try:
88+
raise self._error
89+
except TypeError:
90+
pass
91+
92+
def run_all_sync(self, duration: int) -> None:
93+
"""
94+
Run all sync tests, including both warmup and duration.
95+
"""
96+
with self._condition:
97+
self._completed_operations.reset()
98+
self._last_completion_time = 0.0
99+
self._start_time = time.time()
100+
self._condition.wait(timeout=duration)
101+
102+
async def run_all_async(self, duration: int) -> None:
103+
"""
104+
Run all async tests, including both warmup and duration.
105+
"""
106+
async with self._condition:
107+
self._completed_operations.reset()
108+
self._last_completion_time = 0.0
109+
self._start_time = time.time()
110+
try:
111+
await asyncio.wait_for(self._condition.wait(), timeout=duration)
112+
except asyncio.TimeoutError:
113+
pass
114+
115+
def start_events_sync(self) -> None:
116+
"""
117+
Start the process for receiving events.
118+
"""
119+
raise NotImplementedError("start_events_sync must be implemented for {}".format(self.__class__.__name__))
120+
121+
def stop_events_sync(self) -> None:
122+
"""
123+
Stop the process for receiving events.
124+
"""
125+
raise NotImplementedError("stop_events_sync must be implemented for {}".format(self.__class__.__name__))
126+
127+
async def start_events_async(self) -> None:
128+
"""
129+
Start the process for receiving events.
130+
"""
131+
raise NotImplementedError("start_events_async must be implemented for {}".format(self.__class__.__name__))
132+
133+
async def stop_events_async(self) -> None:
134+
"""
135+
Stop the process for receiving events.
136+
"""
137+
raise NotImplementedError("stop_events_async must be implemented for {}".format(self.__class__.__name__))

tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ async def setup(self) -> None:
155155
"""
156156
return
157157

158-
async def cleanup(self):
158+
async def cleanup(self) -> None:
159159
"""
160160
Cleanup called once per parallel test instance.
161161
Used to cleanup state specific to this test instance.

tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from ._perf_stress_base import _PerfTestABC
1818
from ._batch_perf_test import BatchPerfTest
19+
from ._event_perf_test import EventPerfTest
1920
from ._perf_stress_test import PerfStressTest
2021
from ._repeated_timer import RepeatedTimer
2122

@@ -120,6 +121,7 @@ def _parse_args(self):
120121
self.logger.info("")
121122

122123
def _discover_tests(self, test_folder_path):
124+
base_classes = [PerfStressTest, BatchPerfTest, EventPerfTest]
123125
self._test_classes = {}
124126
if os.path.isdir(os.path.join(test_folder_path, 'tests')):
125127
test_folder_path = os.path.join(test_folder_path, 'tests')
@@ -136,7 +138,7 @@ def _discover_tests(self, test_folder_path):
136138
if name.startswith("_"):
137139
continue
138140
if inspect.isclass(value):
139-
if issubclass(value, _PerfTestABC) and value not in [PerfStressTest, BatchPerfTest]:
141+
if issubclass(value, _PerfTestABC) and value not in base_classes:
140142
self.logger.info("Loaded test class: {}".format(name))
141143
self._test_classes[name] = value
142144

tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
from threading import Timer
6+
import itertools
7+
from threading import Timer, Lock
78

89
# Credit to https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
910
class RepeatedTimer(object):
@@ -32,3 +33,26 @@ def start(self):
3233
def stop(self):
3334
self._timer.cancel()
3435
self.is_running = False
36+
37+
38+
# Credit to https://julien.danjou.info/atomic-lock-free-counters-in-python/
39+
class AtomicCounter(object):
40+
41+
def __init__(self):
42+
self._number_of_read = 0
43+
self._counter = itertools.count()
44+
self._read_lock = Lock()
45+
46+
def increment(self):
47+
next(self._counter)
48+
49+
def reset(self):
50+
with self._read_lock:
51+
self._number_of_read = 0
52+
self._counter = itertools.count()
53+
54+
def value(self):
55+
with self._read_lock:
56+
value = next(self._counter) - self._number_of_read
57+
self._number_of_read += 1
58+
return value

0 commit comments

Comments
 (0)