Skip to content

Commit a912eec

Browse files
[ServiceBus] Add additional stress test coverage to ensure parity with cross-language priorities (Azure#14437)
* Add additional stress test coverage to ensure parity with cross-language priorities. (Renew, $management, and re-opening scenarios primarily) * Add send_session_id and some additional hooks (batch/post receive, etc) to the stress test harness to allow this. * Fix session_id population in stress test framework (was on send instead of message) * PR fixes; make session test actually test sessions, adjust naming to normalize with our conventions now that this tool is more formalized, and touch up the autorenew logic to be more robust against delays. Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
1 parent 4254203 commit a912eec

File tree

2 files changed

+173
-40
lines changed

2 files changed

+173
-40
lines changed

sdk/servicebus/azure-servicebus/tests/stress_tests/stress_test_base.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
class ReceiveType:
2828
push="push"
2929
pull="pull"
30+
none=None
3031

3132

3233
class StressTestResults(object):
@@ -55,7 +56,7 @@ def __init__(self):
5556
def __repr__(self):
5657
return str(vars(self))
5758

58-
def PopulateProcessStats(self):
59+
def populate_process_stats(self):
5960
self.timestamp = datetime.utcnow()
6061
try:
6162
self.cpu_percent = psutil.cpu_percent()
@@ -79,6 +80,7 @@ def __init__(self,
7980
receive_delay = 0,
8081
should_complete_messages = True,
8182
max_message_count = 1,
83+
send_session_id = None,
8284
fail_on_exception = True):
8385
self.senders = senders
8486
self.receivers = receivers
@@ -92,6 +94,7 @@ def __init__(self,
9294
self.should_complete_messages = should_complete_messages
9395
self.max_message_count = max_message_count
9496
self.fail_on_exception = fail_on_exception
97+
self.send_session_id = send_session_id
9598

9699
# Because of pickle we need to create a state object and not just pass around ourselves.
97100
# If we ever require multiple runs of this one after another, just make Run() reset this.
@@ -106,72 +109,82 @@ def __init__(self,
106109

107110

108111
# Plugin functions the caller can override to further tailor the test.
109-
def OnSend(self, state, sent_message):
110-
'''Called on every successful send'''
112+
def on_send(self, state, sent_message, sender):
113+
'''Called on every successful send, per message'''
111114
pass
112115

113-
def OnReceive(self, state, received_message):
114-
'''Called on every successful receive'''
116+
def on_receive(self, state, received_message, receiver):
117+
'''Called on every successful receive, per message'''
115118
pass
116119

120+
def on_receive_batch(self, state, batch, receiver):
121+
'''Called on every successful receive, at the batch or iterator level rather than per-message'''
122+
pass
123+
124+
def post_receive(self, state, receiver):
125+
'''Called after completion of every successful receive'''
126+
pass
117127

118-
def OnComplete(self, send_results=[], receive_results=[]):
128+
def on_complete(self, send_results=[], receive_results=[]):
119129
'''Called on stress test run completion'''
120130
pass
121131

122132

123-
def PreProcessMessage(self, message):
133+
def pre_process_message(self, message):
124134
'''Allows user to transform the message before batching or sending it.'''
125135
pass
126136

127137

128-
def PreProcessMessageBatch(self, message):
138+
def pre_process_message_batch(self, message):
129139
'''Allows user to transform the batch before sending it.'''
130140
pass
131141

132142

133-
def PreProcessMessageBody(self, payload):
143+
def pre_process_message_body(self, payload):
134144
'''Allows user to transform message payload before sending it.'''
135145
return payload
136146

137147

138-
def _ScheduleIntervalLogger(self, end_time, description="", interval_seconds=30):
139-
def _doIntervalLogging():
148+
def _schedule_interval_logger(self, end_time, description="", interval_seconds=30):
149+
def _do_interval_logging():
140150
if end_time > datetime.utcnow() and not self._should_stop:
141-
self._state.PopulateProcessStats()
151+
self._state.populate_process_stats()
142152
_logger.critical("{} RECURRENT STATUS:".format(description))
143153
_logger.critical(self._state)
144-
self._ScheduleIntervalLogger(end_time, description, interval_seconds)
154+
self._schedule_interval_logger(end_time, description, interval_seconds)
145155

146-
t = threading.Timer(interval_seconds, _doIntervalLogging)
156+
t = threading.Timer(interval_seconds, _do_interval_logging)
147157
t.start()
148158

149159

150-
def _ConstructMessage(self):
160+
def _construct_message(self):
151161
if self.send_batch_size != None:
152162
batch = ServiceBusMessageBatch()
153163
for _ in range(self.send_batch_size):
154-
message = ServiceBusMessage(self.PreProcessMessageBody("a" * self.message_size))
155-
self.PreProcessMessage(message)
164+
message = ServiceBusMessage(self.pre_process_message_body("a" * self.message_size))
165+
self.pre_process_message(message)
156166
batch.add_message(message)
157167
self.PreProcessMessageBatch(batch)
158168
return batch
159169
else:
160-
message = ServiceBusMessage(self.PreProcessMessageBody("a" * self.message_size))
161-
self.PreProcessMessage(message)
170+
message = ServiceBusMessage(self.pre_process_message_body("a" * self.message_size))
171+
self.pre_process_message(message)
162172
return message
163173

164-
def _Send(self, sender, end_time):
165-
self._ScheduleIntervalLogger(end_time, "Sender " + str(self))
174+
175+
def _send(self, sender, end_time):
176+
self._schedule_interval_logger(end_time, "Sender " + str(self))
166177
try:
167178
_logger.info("STARTING SENDER")
168179
with sender:
169180
while end_time > datetime.utcnow() and not self._should_stop:
170181
_logger.info("SENDING")
171182
try:
172-
message = self._ConstructMessage()
183+
message = self._construct_message()
184+
if self.send_session_id != None:
185+
message.session_id = self.send_session_id
173186
sender.send_messages(message)
174-
self.OnSend(self._state, message)
187+
self.on_send(self._state, message, sender)
175188
except Exception as e:
176189
_logger.exception("Exception during send: {}".format(e))
177190
self._state.exceptions.append(e)
@@ -186,8 +199,8 @@ def _Send(self, sender, end_time):
186199
self._should_stop = True
187200
raise
188201

189-
def _Receive(self, receiver, end_time):
190-
self._ScheduleIntervalLogger(end_time, "Receiver " + str(self))
202+
def _receive(self, receiver, end_time):
203+
self._schedule_interval_logger(end_time, "Receiver " + str(self))
191204
try:
192205
with receiver:
193206
while end_time > datetime.utcnow() and not self._should_stop:
@@ -199,7 +212,7 @@ def _Receive(self, receiver, end_time):
199212
batch = receiver.get_streaming_message_iter(max_wait_time=self.max_wait_time)
200213

201214
for message in batch:
202-
self.OnReceive(self._state, message)
215+
self.on_receive(self._state, message, receiver)
203216
try:
204217
if self.should_complete_messages:
205218
receiver.complete_message(message)
@@ -210,6 +223,7 @@ def _Receive(self, receiver, end_time):
210223
if end_time <= datetime.utcnow():
211224
break
212225
time.sleep(self.receive_delay)
226+
self.post_receive(self._state, receiver)
213227
except Exception as e:
214228
_logger.exception("Exception during receive: {}".format(e))
215229
self._state.exceptions.append(e)
@@ -223,15 +237,15 @@ def _Receive(self, receiver, end_time):
223237
raise
224238

225239

226-
def Run(self):
240+
def run(self):
227241
start_time = datetime.utcnow()
228242
end_time = start_time + (self._duration_override or self.duration)
229243
sent_messages = 0
230244
received_messages = 0
231245
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as proc_pool:
232246
_logger.info("STARTING PROC POOL")
233-
senders = [proc_pool.submit(self._Send, sender, end_time) for sender in self.senders]
234-
receivers = [proc_pool.submit(self._Receive, receiver, end_time) for receiver in self.receivers]
247+
senders = [proc_pool.submit(self._send, sender, end_time) for sender in self.senders]
248+
receivers = [proc_pool.submit(self._receive, receiver, end_time) for receiver in self.receivers]
235249

236250
result = StressTestResults()
237251
for each in concurrent.futures.as_completed(senders + receivers):

sdk/servicebus/azure-servicebus/tests/stress_tests/test_stress_queues.py

Lines changed: 130 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import sys
1111
import time
1212

13-
from azure.servicebus import ServiceBusClient
13+
from azure.servicebus import ServiceBusClient, AutoLockRenewer
1414
from azure.servicebus._common.constants import ReceiveMode
1515

1616
from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
@@ -35,7 +35,7 @@ def test_stress_queue_send_and_receive(self, servicebus_namespace_connection_str
3535
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
3636
duration=timedelta(seconds=60))
3737

38-
result = stress_test.Run()
38+
result = stress_test.run()
3939
assert(result.total_sent > 0)
4040
assert(result.total_received > 0)
4141

@@ -54,7 +54,7 @@ def test_stress_queue_send_and_pull_receive(self, servicebus_namespace_connectio
5454
receive_type=ReceiveType.pull,
5555
duration=timedelta(seconds=60))
5656

57-
result = stress_test.Run()
57+
result = stress_test.run()
5858
assert(result.total_sent > 0)
5959
assert(result.total_received > 0)
6060

@@ -73,7 +73,7 @@ def test_stress_queue_batch_send_and_receive(self, servicebus_namespace_connecti
7373
duration=timedelta(seconds=60),
7474
send_batch_size=5)
7575

76-
result = stress_test.Run()
76+
result = stress_test.run()
7777
assert(result.total_sent > 0)
7878
assert(result.total_received > 0)
7979

@@ -92,7 +92,7 @@ def test_stress_queue_slow_send_and_receive(self, servicebus_namespace_connectio
9292
duration=timedelta(seconds=3501*3),
9393
send_delay=3500)
9494

95-
result = stress_test.Run()
95+
result = stress_test.run()
9696
assert(result.total_sent > 0)
9797
assert(result.total_received > 0)
9898

@@ -110,7 +110,7 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s
110110
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, receive_mode=ReceiveMode.ReceiveAndDelete)],
111111
duration=timedelta(seconds=60))
112112

113-
result = stress_test.Run()
113+
result = stress_test.run()
114114
assert(result.total_sent > 0)
115115
assert(result.total_received > 0)
116116

@@ -129,7 +129,7 @@ def test_stress_queue_unsettled_messages(self, servicebus_namespace_connection_s
129129
duration = timedelta(seconds=350),
130130
should_complete_messages = False)
131131

132-
result = stress_test.Run()
132+
result = stress_test.run()
133133
# This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache
134134
# and prevent further receipt.
135135
assert(result.total_sent > 2500)
@@ -150,15 +150,16 @@ def test_stress_queue_receive_large_batch_size(self, servicebus_namespace_connec
150150
duration = timedelta(seconds=60),
151151
max_message_count = 50)
152152

153-
result = stress_test.Run()
153+
result = stress_test.run()
154154
assert(result.total_sent > 0)
155155
assert(result.total_received > 0)
156156

157157
# Cannot be defined at local scope due to pickling into multiproc runner.
158158
class ReceiverTimeoutStressTestRunner(StressTestRunner):
159-
def OnSend(self, state, sent_message):
159+
def on_send(self, state, sent_message, sender):
160160
'''Called on every successful send'''
161161
if state.total_sent % 10 == 0:
162+
# To make receive time out, in push mode this delay would trigger receiver reconnection
162163
time.sleep(self.max_wait_time + 5)
163164

164165
@pytest.mark.liveTest
@@ -177,6 +178,124 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection
177178
receive_type=ReceiveType.pull,
178179
duration=timedelta(seconds=600))
179180

180-
result = stress_test.Run()
181+
result = stress_test.run()
181182
assert(result.total_sent > 0)
182-
assert(result.total_received > 0)
183+
assert(result.total_received > 0)
184+
185+
186+
class LongRenewStressTestRunner(StressTestRunner):
187+
def on_receive(self, state, received_message, receiver):
188+
'''Called on every successful receive'''
189+
renewer = AutoLockRenew()
190+
renewer.register(received_message, timeout=300)
191+
time.sleep(300)
192+
193+
@pytest.mark.liveTest
194+
@pytest.mark.live_test_only
195+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
196+
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
197+
@ServiceBusQueuePreparer(name_prefix='servicebustest')
198+
def test_stress_queue_long_renew_send_and_receive(self, servicebus_namespace_connection_string, servicebus_queue):
199+
sb_client = ServiceBusClient.from_connection_string(
200+
servicebus_namespace_connection_string, debug=False)
201+
202+
stress_test = ServiceBusQueueStressTests.LongRenewStressTestRunner(
203+
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
204+
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
205+
duration=timedelta(seconds=3000),
206+
send_delay=300)
207+
208+
result = stress_test.run()
209+
assert(result.total_sent > 0)
210+
assert(result.total_received > 0)
211+
212+
213+
class LongSessionRenewStressTestRunner(StressTestRunner):
214+
def on_receive(self, state, received_message, receiver):
215+
'''Called on every successful receive'''
216+
renewer = AutoLockRenewer()
217+
def on_fail(renewable, error):
218+
print("FAILED AUTOLOCKRENEW: " + str(error))
219+
renewer.register(receiver.session, timeout=600, on_lock_renew_failure=on_fail)
220+
221+
@pytest.mark.liveTest
222+
@pytest.mark.live_test_only
223+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
224+
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
225+
@ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
226+
def test_stress_queue_long_renew_session_send_and_receive(self, servicebus_namespace_connection_string, servicebus_queue):
227+
sb_client = ServiceBusClient.from_connection_string(
228+
servicebus_namespace_connection_string, debug=False)
229+
230+
session_id = 'test_stress_queue_long_renew_send_and_receive'
231+
232+
stress_test = ServiceBusQueueStressTests.LongSessionRenewStressTestRunner(
233+
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
234+
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id)],
235+
duration=timedelta(seconds=3000),
236+
send_delay=300,
237+
send_session_id=session_id)
238+
239+
result = stress_test.run()
240+
assert(result.total_sent > 0)
241+
assert(result.total_received > 0)
242+
243+
244+
class Peekon_receiveStressTestRunner(StressTestRunner):
245+
def on_receive_batch(self, state, received_message, receiver):
246+
'''Called on every successful receive'''
247+
assert receiver.peek_messages()[0]
248+
249+
@pytest.mark.liveTest
250+
@pytest.mark.live_test_only
251+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
252+
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
253+
@ServiceBusQueuePreparer(name_prefix='servicebustest')
254+
def test_stress_queue_peek_messages(self, servicebus_namespace_connection_string, servicebus_queue):
255+
sb_client = ServiceBusClient.from_connection_string(
256+
servicebus_namespace_connection_string, debug=False)
257+
258+
stress_test = ServiceBusQueueStressTests.Peekon_receiveStressTestRunner(
259+
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
260+
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
261+
duration = timedelta(seconds=300),
262+
receive_delay = 30,
263+
receive_type = ReceiveType.none)
264+
265+
result = stress_test.run()
266+
assert(result.total_sent > 0)
267+
# TODO: This merits better validation, to be implemented alongside full metric spread.
268+
269+
270+
class RestartHandlerStressTestRunner(StressTestRunner):
271+
def post_receive(self, state, receiver):
272+
'''Called after completion of every successful receive'''
273+
if state.total_received % 3 == 0:
274+
receiver.__exit__()
275+
receiver.__enter__()
276+
277+
def on_send(self, state, sent_message, sender):
278+
'''Called after completion of every successful receive'''
279+
if state.total_sent % 3 == 0:
280+
sender.__exit__()
281+
sender.__enter__()
282+
283+
@pytest.mark.liveTest
284+
@pytest.mark.live_test_only
285+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
286+
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
287+
@ServiceBusQueuePreparer(name_prefix='servicebustest')
288+
def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_string, servicebus_queue):
289+
sb_client = ServiceBusClient.from_connection_string(
290+
servicebus_namespace_connection_string, debug=False)
291+
292+
stress_test = ServiceBusQueueStressTests.RestartHandlerStressTestRunner(
293+
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
294+
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
295+
duration = timedelta(seconds=300),
296+
receive_delay = 30,
297+
send_delay = 10)
298+
299+
result = stress_test.run()
300+
assert(result.total_sent > 0)
301+
assert(result.total_received > 0)

0 commit comments

Comments
 (0)