Skip to content

Commit 3386082

Browse files
authored
[ServiceBus] Improve auto lock renewer implementation (Azure#20071)
* imrpove auto lock renewer implementation * small fixes * fix bug * update renewer so that it would not be blocking the main thread * update changelog and test * update code to infer max worker * detect max worker * small fix * update comment * improvement max_workers value infering * fix pylint * revert time setting in test * review feedback * small fix * PR review feedback
1 parent a9e6633 commit 3386082

File tree

4 files changed

+159
-20
lines changed

4 files changed

+159
-20
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
### Bugs Fixed
1010
- Fixed a bug that `azure.servicebus.aio.AutoLockRenewer` crashes on disposal if no messages have been registered (#19642).
11+
- Fixed a bug that `azure.servicebus.AutoLockRenewer` only supports auto lock renewal for `max_workers` amount of messages/sessions at a time (#19362).
1112

1213
### Other Changes
1314

sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
import threading
1010
import time
11-
from concurrent.futures import ThreadPoolExecutor
11+
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
1212
from typing import TYPE_CHECKING
1313

1414
from .._servicebus_receiver import ServiceBusReceiver
@@ -23,6 +23,11 @@
2323
Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]
2424
LockRenewFailureCallback = Callable[[Renewable, Optional[Exception]], None]
2525

26+
try:
27+
import queue
28+
except ImportError:
29+
import Queue as queue # type: ignore
30+
2631
_log = logging.getLogger(__name__)
2732

2833
SHORT_RENEW_OFFSET = 0.5 # Seconds that if a renew period is longer than lock duration + offset, it's "too long"
@@ -31,7 +36,7 @@
3136
)
3237

3338

34-
class AutoLockRenewer(object):
39+
class AutoLockRenewer(object): # pylint:disable=too-many-instance-attributes
3540
"""Auto renew locks for messages and sessions using a background thread pool.
3641
3742
:param max_lock_renewal_duration: A time in seconds that locks registered to this renewer
@@ -74,10 +79,12 @@ def __init__(
7479
max_workers=None,
7580
):
7681
# type: (float, Optional[LockRenewFailureCallback], Optional[ThreadPoolExecutor], Optional[int]) -> None
77-
"""Auto renew locks for messages and sessions using a background thread pool.
82+
"""Auto renew locks for messages and sessions using a background thread pool. It is recommended
83+
setting max_worker to a large number or passing ThreadPoolExecutor of large max_workers number when
84+
AutoLockRenewer is supposed to deal with multiple messages or sessions simultaneously.
7885
7986
:param max_lock_renewal_duration: A time in seconds that locks registered to this renewer
80-
should be maintained for. Default value is 300 (5 minutes).
87+
should be maintained for. Default value is 300 (5 minutes).
8188
:type max_lock_renewal_duration: float
8289
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
8390
that is being registered. Default value is None (no callback).
@@ -91,23 +98,54 @@ def __init__(
9198
:type max_workers: Optional[int]
9299
"""
93100
self._executor = executor or ThreadPoolExecutor(max_workers=max_workers)
101+
# None indicates it's unknown whether the provided executor has max workers > 1
102+
self._is_max_workers_greater_than_one = None if executor else (max_workers is None or max_workers > 1)
94103
self._shutdown = threading.Event()
95-
self._sleep_time = 1
104+
self._sleep_time = 0.5
96105
self._renew_period = 10
106+
self._running_dispatcher = threading.Event() # indicate whether the dispatcher is running
107+
self._last_activity_timestamp = None # the last timestamp when the dispatcher is active dealing with tasks
108+
self._dispatcher_timeout = 5 # the idle time that dispatcher should exit if there's no activity
97109
self._max_lock_renewal_duration = max_lock_renewal_duration
98110
self._on_lock_renew_failure = on_lock_renew_failure
111+
self._renew_tasks = queue.Queue() # type: ignore
112+
self._infer_max_workers_time = 1
99113

100114
def __enter__(self):
101115
if self._shutdown.is_set():
102116
raise ServiceBusError(
103117
"The AutoLockRenewer has already been shutdown. Please create a new instance for"
104118
" auto lock renewing."
105119
)
120+
121+
self._init_workers()
106122
return self
107123

108124
def __exit__(self, *args):
109125
self.close()
110126

127+
def _init_workers(self):
128+
if not self._running_dispatcher.is_set():
129+
self._infer_max_workers_greater_than_one_if_needed()
130+
self._running_dispatcher.set()
131+
self._executor.submit(self._dispatch_worker)
132+
133+
def _infer_max_workers_greater_than_one_if_needed(self):
134+
# infer max_workers value if executor is passed in
135+
if self._is_max_workers_greater_than_one is None:
136+
max_wokers_checker = self._executor.submit(self._infer_max_workers_value_worker)
137+
max_wokers_checker.result()
138+
139+
def _infer_max_workers_value_worker(self):
140+
max_workers_checker = self._executor.submit(pow, 1, 1)
141+
# This will never complete because there is only one worker thread and
142+
# it is executing this function.
143+
try:
144+
max_workers_checker.result(timeout=self._infer_max_workers_time)
145+
self._is_max_workers_greater_than_one = True
146+
except FuturesTimeoutError:
147+
self._is_max_workers_greater_than_one = False
148+
111149
def _renewable(self, renewable):
112150
# pylint: disable=protected-access
113151
if self._shutdown.is_set():
@@ -120,7 +158,26 @@ def _renewable(self, renewable):
120158
return False
121159
return True
122160

123-
def _auto_lock_renew(
161+
def _dispatch_worker(self):
162+
self._last_activity_timestamp = time.time()
163+
while not self._shutdown.is_set() and self._running_dispatcher.is_set():
164+
while not self._renew_tasks.empty():
165+
renew_task = self._renew_tasks.get()
166+
if self._is_max_workers_greater_than_one:
167+
self._executor.submit(self._auto_lock_renew_task, *renew_task)
168+
else:
169+
self._auto_lock_renew_task(*renew_task)
170+
self._renew_tasks.task_done()
171+
self._last_activity_timestamp = time.time()
172+
# If there's no activity in the past self._idle_timeout seconds, exit the method
173+
# This ensures the dispatching thread could exit, not blocking the main python thread
174+
# the main worker thread could be started again if new tasks get registered
175+
if time.time() - self._last_activity_timestamp >= self._dispatcher_timeout:
176+
self._running_dispatcher.clear()
177+
self._last_activity_timestamp = None
178+
return
179+
180+
def _auto_lock_renew_task(
124181
self,
125182
receiver,
126183
renewable,
@@ -130,14 +187,11 @@ def _auto_lock_renew(
130187
renew_period_override=None,
131188
):
132189
# pylint: disable=protected-access
133-
_log.debug(
134-
"Running lock auto-renew thread for %r seconds", max_lock_renewal_duration
135-
)
136190
error = None
137191
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
138192
renew_period = renew_period_override or self._renew_period
139193
try:
140-
while self._renewable(renewable):
194+
if self._renewable(renewable):
141195
if (utc_now() - starttime) >= datetime.timedelta(
142196
seconds=max_lock_renewal_duration
143197
):
@@ -163,6 +217,18 @@ def _auto_lock_renew(
163217
# Renewable is a message
164218
receiver.renew_message_lock(renewable) # type: ignore
165219
time.sleep(self._sleep_time)
220+
# enqueue a new task, keeping renewing the renewable
221+
if self._renewable(renewable):
222+
self._renew_tasks.put(
223+
(
224+
receiver,
225+
renewable,
226+
starttime,
227+
max_lock_renewal_duration,
228+
on_lock_renew_failure,
229+
renew_period_override
230+
)
231+
)
166232
clean_shutdown = not renewable._lock_expired
167233
except AutoLockRenewTimeout as e:
168234
error = e
@@ -231,14 +297,21 @@ def register(
231297
time_until_expiry.seconds * SHORT_RENEW_SCALING_FACTOR
232298
)
233299

234-
self._executor.submit(
235-
self._auto_lock_renew,
236-
receiver,
237-
renewable,
238-
starttime,
239-
max_lock_renewal_duration or self._max_lock_renewal_duration,
240-
on_lock_renew_failure or self._on_lock_renew_failure,
241-
renew_period_override,
300+
_log.debug(
301+
"Running lock auto-renew for %r for %r seconds", renewable, max_lock_renewal_duration
302+
)
303+
304+
self._init_workers()
305+
306+
self._renew_tasks.put(
307+
(
308+
receiver,
309+
renewable,
310+
starttime,
311+
max_lock_renewal_duration or self._max_lock_renewal_duration,
312+
on_lock_renew_failure or self._on_lock_renew_failure,
313+
renew_period_override
314+
)
242315
)
243316

244317
def close(self, wait=True):
@@ -249,5 +322,6 @@ def close(self, wait=True):
249322
250323
:rtype: None
251324
"""
325+
self._running_dispatcher.clear()
252326
self._shutdown.set()
253327
self._executor.shutdown(wait=wait)

sdk/servicebus/azure-servicebus/tests/mocks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def renew_message_lock(self, message):
1818
class MockReceivedMessage(ServiceBusReceivedMessage):
1919
def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False):
2020
self._lock_duration = 2
21-
21+
self._raw_amqp_message = None
2222
self._received_timestamp_utc = utc_now()
2323
self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
2424
self._settled = False
@@ -27,7 +27,6 @@ def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False):
2727
self._prevent_renew_lock = prevent_renew_lock
2828
self._exception_on_renew_lock = exception_on_renew_lock
2929

30-
3130
@property
3231
def _lock_expired(self):
3332
if self.locked_until_utc and self.locked_until_utc <= utc_now():

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import sys
99
import os
10+
from concurrent.futures import ThreadPoolExecutor
1011
import types
1112
import pytest
1213
import time
@@ -1002,6 +1003,70 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_autolockrenew(self,
10021003
renewer.close()
10031004
assert len(messages) == 11
10041005

1006+
renewer = AutoLockRenewer(max_workers=8)
1007+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1008+
for i in range(10):
1009+
message = ServiceBusMessage("{}".format(i))
1010+
sender.send_messages(message)
1011+
1012+
with sb_client.get_queue_receiver(servicebus_queue.name,
1013+
max_wait_time=5,
1014+
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
1015+
prefetch_count=10) as receiver:
1016+
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
1017+
for msg in received_msgs:
1018+
renewer.register(receiver, msg, max_lock_renewal_duration=10)
1019+
time.sleep(10)
1020+
1021+
for msg in received_msgs:
1022+
receiver.complete_message(msg)
1023+
assert len(received_msgs) == 10
1024+
renewer.close()
1025+
1026+
executor = ThreadPoolExecutor(max_workers=1)
1027+
renewer = AutoLockRenewer(executor=executor)
1028+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1029+
for i in range(2):
1030+
message = ServiceBusMessage("{}".format(i))
1031+
sender.send_messages(message)
1032+
1033+
with sb_client.get_queue_receiver(servicebus_queue.name,
1034+
max_wait_time=5,
1035+
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
1036+
prefetch_count=3) as receiver:
1037+
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=5)
1038+
for msg in received_msgs:
1039+
renewer.register(receiver, msg, max_lock_renewal_duration=10)
1040+
time.sleep(10)
1041+
1042+
for msg in received_msgs:
1043+
receiver.complete_message(msg)
1044+
assert len(received_msgs) == 2
1045+
assert not renewer._is_max_workers_greater_than_one
1046+
renewer.close()
1047+
1048+
executor = ThreadPoolExecutor(max_workers=2)
1049+
renewer = AutoLockRenewer(executor=executor)
1050+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1051+
for i in range(3):
1052+
message = ServiceBusMessage("{}".format(i))
1053+
sender.send_messages(message)
1054+
1055+
with sb_client.get_queue_receiver(servicebus_queue.name,
1056+
max_wait_time=5,
1057+
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
1058+
prefetch_count=3) as receiver:
1059+
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=5)
1060+
for msg in received_msgs:
1061+
renewer.register(receiver, msg, max_lock_renewal_duration=10)
1062+
time.sleep(10)
1063+
1064+
for msg in received_msgs:
1065+
receiver.complete_message(msg)
1066+
assert len(received_msgs) == 3
1067+
assert renewer._is_max_workers_greater_than_one
1068+
renewer.close()
1069+
10051070
@pytest.mark.liveTest
10061071
@pytest.mark.live_test_only
10071072
@CachedResourceGroupPreparer(name_prefix='servicebustest')

0 commit comments

Comments
 (0)