Skip to content

Commit 05c65a8

Browse files
[ServiceBus] Initial commit of auto-auto-lock-renewal. (Azure#14956)
* Initial commit of auto-auto-lock-renewal. Adds parameter to get_*_receiver calls, and autoregisters at message receipt (and session at open). * Add tests for auto autolockrenewal * Shorten runtime of long-running autolockrenew tests * make clock skew margin slightly smaller as a lazy workaround to not break tests. * add changelog note for auto-autolockrenewer * add changelog stanza for max_lock_renew_duration change. * Make more clear error for when non-renewable messages (e.g. ReceiveAndDelete) are registered to autorenewer, and add validation for this scenario into tests. * Add check during receiver-getting to prevent attempting to autolockrenew a ReceiveAndDelete receiver (which would not work properly as those messages can't be renewed) and unit tests to validate this. * Normalize this failure mode to ValueError as technically the Message type is correct, and add a changelog note documenting this. Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
1 parent 32df120 commit 05c65a8

19 files changed

+535
-73
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
1111
* `azure.servicebus.exceptions.ServiceBusError` now inherits from `azure.core.exceptions.AzureError`.
1212
* Added a `parse_connection_string` method which parses a connection string into a properties bag containing its component parts
13+
* Add support for `auto_lock_renewer` parameter on `get_queue_receiver` and `get_subscription_receiver` calls to allow auto-registration of messages and sessions for auto-renewal.
1314

1415
**Breaking Changes**
1516

@@ -55,6 +56,8 @@ now raise more concrete exception other than `MessageSettleFailed` and `ServiceB
5556
* Sub-client (`ServiceBusSender` and `ServiceBusReceiver`) `from_connection_string` initializers have been made internal until needed. Clients should be initialized from root `ServiceBusClient`.
5657
* `ServiceBusMessage.label` has been renamed to `ServiceBusMessage.subject`.
5758
* `ServiceBusMessage.amqp_annotated_message` has had its type renamed from `AMQPMessage` to `AMQPAnnotatedMessage`
59+
* `AutoLockRenewer` `timeout` parameter is renamed to `max_lock_renew_duration`
60+
* Attempting to autorenew a non-renewable message, such as one received in `ReceiveAndDelete` mode, or configure auto-autorenewal on a `ReceiveAndDelete` receiver, will raise a `ValueError`.
5861

5962
**BugFixes**
6063

sdk/servicebus/azure-servicebus/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,8 @@ session_id = os.environ['SERVICE_BUS_SESSION_ID']
358358
renewer = AutoLockRenewer()
359359
with ServiceBusClient.from_connection_string(connstr) as client:
360360
with client.get_queue_receiver(session_queue_name, session_id=session_id) as receiver:
361-
renewer.register(receiver, receiver.session, timeout=300) # Timeout for how long to maintain the lock for, in seconds.
361+
renewer.register(receiver, receiver.session, max_lock_renewal_duration=300) # Duration for how long to maintain the lock for, in seconds.
362+
362363
for msg in receiver.receive_messages():
363364
# Do your application logic here
364365
receiver.complete_message(msg)

sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,29 @@
1515
from .._servicebus_session import ServiceBusSession
1616
from .message import ServiceBusReceivedMessage
1717
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
18-
from .utils import renewable_start_time, utc_now
18+
from .utils import get_renewable_start_time, utc_now, get_renewable_lock_duration
1919

2020
if TYPE_CHECKING:
21-
from typing import Callable, Union, Optional, Awaitable
22-
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage],
23-
Optional[Exception]], None]
21+
from typing import Callable, Union, Optional
2422
Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]
23+
LockRenewFailureCallback = Callable[[Renewable,
24+
Optional[Exception]], None]
2525

2626
_log = logging.getLogger(__name__)
2727

28+
SHORT_RENEW_OFFSET = .5 # Seconds that if a renew period is longer than lock duration + offset, it's "too long"
29+
SHORT_RENEW_SCALING_FACTOR = .75 # In this situation we need a "Short renew" and should scale by this factor.
30+
2831

2932
class AutoLockRenewer(object):
3033
"""Auto renew locks for messages and sessions using a background thread pool.
3134
35+
:param max_lock_renewal_duration: A time in seconds that locks registered to this renewer
36+
should be maintained for. Default value is 300 (5 minutes).
37+
:type max_lock_renewal_duration: float
38+
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
39+
that is being registered. Default value is None (no callback).
40+
:type on_lock_renew_failure: Optional[LockRenewFailureCallback]
3241
:param executor: A user-specified thread pool. This cannot be combined with
3342
setting `max_workers`.
3443
:type executor: ~concurrent.futures.ThreadPoolExecutor
@@ -55,10 +64,16 @@ class AutoLockRenewer(object):
5564
5665
"""
5766

58-
def __init__(self, executor=None, max_workers=None):
59-
# type: (ThreadPoolExecutor, int) -> None
67+
def __init__(self, max_lock_renewal_duration=300, on_lock_renew_failure=None, executor=None, max_workers=None):
68+
# type: (float, Optional[LockRenewFailureCallback], ThreadPoolExecutor, int) -> None
6069
"""Auto renew locks for messages and sessions using a background thread pool.
6170
71+
:param max_lock_renewal_duration: A time in seconds that locks registered to this renewer
72+
should be maintained for. Default value is 300 (5 minutes).
73+
:type max_lock_renewal_duration: float
74+
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
75+
that is being registered. Default value is None (no callback).
76+
:type on_lock_renew_failure: Optional[LockRenewFailureCallback]
6277
:param executor: A user-specified thread pool. This cannot be combined with
6378
setting `max_workers`.
6479
:type executor: ~concurrent.futures.ThreadPoolExecutor
@@ -71,6 +86,8 @@ def __init__(self, executor=None, max_workers=None):
7186
self._shutdown = threading.Event()
7287
self._sleep_time = 1
7388
self._renew_period = 10
89+
self._max_lock_renewal_duration = max_lock_renewal_duration
90+
self._on_lock_renew_failure = on_lock_renew_failure
7491

7592
def __enter__(self):
7693
if self._shutdown.is_set():
@@ -93,18 +110,26 @@ def _renewable(self, renewable):
93110
return False
94111
return True
95112

96-
def _auto_lock_renew(self, receiver, renewable, starttime, timeout, on_lock_renew_failure=None):
113+
def _auto_lock_renew(self,
114+
receiver,
115+
renewable,
116+
starttime,
117+
max_lock_renewal_duration,
118+
on_lock_renew_failure=None,
119+
renew_period_override=None):
97120
# pylint: disable=protected-access
98-
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
121+
_log.debug("Running lock auto-renew thread for %r seconds", max_lock_renewal_duration)
99122
error = None
100123
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
124+
renew_period = renew_period_override or self._renew_period
101125
try:
102126
while self._renewable(renewable):
103-
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
104-
_log.debug("Reached auto lock renew timeout - letting lock expire.")
105-
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
106-
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period):
107-
_log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period)
127+
if (utc_now() - starttime) >= datetime.timedelta(seconds=max_lock_renewal_duration):
128+
_log.debug("Reached max auto lock renew duration - letting lock expire.")
129+
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(
130+
max_lock_renewal_duration))
131+
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=renew_period):
132+
_log.debug("%r seconds or less until lock expires - auto renewing.", renew_period)
108133
try:
109134
# Renewable is a session
110135
renewable.renew_lock() # type: ignore
@@ -127,17 +152,18 @@ def _auto_lock_renew(self, receiver, renewable, starttime, timeout, on_lock_rene
127152
if on_lock_renew_failure and not clean_shutdown:
128153
on_lock_renew_failure(renewable, error)
129154

130-
def register(self, receiver, renewable, timeout=300, on_lock_renew_failure=None):
131-
# type: (ServiceBusReceiver, Renewable, float, Optional[LockRenewFailureCallback]) -> None
155+
def register(self, receiver, renewable, max_lock_renewal_duration=None, on_lock_renew_failure=None):
156+
# type: (ServiceBusReceiver, Renewable, Optional[float], Optional[LockRenewFailureCallback]) -> None
132157
"""Register a renewable entity for automatic lock renewal.
133158
134159
:param receiver: The ServiceBusReceiver instance that is associated with the message or the session to
135160
be auto-lock-renewed.
136161
:type receiver: ~azure.servicebus.ServiceBusReceiver
137162
:param renewable: A locked entity that needs to be renewed.
138163
:type renewable: Union[~azure.servicebus.ServiceBusReceivedMessage, ~azure.servicebus.ServiceBusSession]
139-
:param timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).
140-
:type timeout: float
164+
:param max_lock_renewal_duration: A time in seconds that the lock should be maintained for.
165+
Default value is 300 (5 minutes).
166+
:type max_lock_renewal_duration: Optional[float]
141167
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
142168
that is being registered. Default value is None (no callback).
143169
:type on_lock_renew_failure: Optional[LockRenewFailureCallback]
@@ -151,8 +177,27 @@ def register(self, receiver, renewable, timeout=300, on_lock_renew_failure=None)
151177
if self._shutdown.is_set():
152178
raise ServiceBusError("The AutoLockRenewer has already been shutdown. Please create a new instance for"
153179
" auto lock renewing.")
154-
starttime = renewable_start_time(renewable)
155-
self._executor.submit(self._auto_lock_renew, receiver, renewable, starttime, timeout, on_lock_renew_failure)
180+
if renewable.locked_until_utc is None:
181+
raise ValueError("Only azure.servicebus.ServiceBusReceivedMessage objects in PeekLock receive mode may"
182+
"be lock-renewed. (E.g. only messages received via receive() or the receiver iterator,"
183+
"not using ReceiveAndDelete receive mode, and not returned from Peek)")
184+
185+
starttime = get_renewable_start_time(renewable)
186+
187+
# This is a heuristic to compensate if it appears the user has a lock duration less than our base renew period
188+
time_until_expiry = get_renewable_lock_duration(renewable)
189+
renew_period_override = None
190+
# Default is 10 seconds, but let's leave ourselves a small margin of error because clock skew is a real problem
191+
if time_until_expiry <= datetime.timedelta(seconds=self._renew_period + SHORT_RENEW_OFFSET):
192+
renew_period_override = time_until_expiry.seconds * SHORT_RENEW_SCALING_FACTOR
193+
194+
self._executor.submit(self._auto_lock_renew,
195+
receiver,
196+
renewable,
197+
starttime,
198+
max_lock_renewal_duration or self._max_lock_renewal_duration,
199+
on_lock_renew_failure or self._on_lock_renew_failure,
200+
renew_period_override)
156201

157202
def close(self, wait=True):
158203
"""Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.

sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ def _populate_attributes(self, **kwargs):
7474
raise ValueError("The max_wait_time must be greater than 0.")
7575
self._max_wait_time = max_wait_time
7676

77+
self._auto_lock_renewer = kwargs.get("auto_lock_renewer", None)
78+
if self._auto_lock_renewer \
79+
and self._receive_mode == ReceiveMode.ReceiveAndDelete \
80+
and self._session_id is None:
81+
raise ValueError("Messages received in ReceiveAndDelete receive mode cannot have their locks removed "
82+
"as they have been deleted, providing an AutoLockRenewer in this mode is invalid.")
83+
7784
def _build_message(self, received, message_type=ServiceBusReceivedMessage):
7885
message = message_type(message=received, receive_mode=self._receive_mode, receiver=self)
7986
self._last_received_sequenced_number = message.sequence_number

sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import logging
1010
import functools
1111
import platform
12-
from typing import Optional, Dict, Tuple
12+
from typing import Optional, Dict, Tuple, Union, TYPE_CHECKING
1313
from msrest.serialization import UTC
1414

1515
try:
@@ -29,6 +29,10 @@
2929
USER_AGENT_PREFIX,
3030
)
3131

32+
if TYPE_CHECKING:
33+
from .message import ServiceBusReceivedMessage
34+
from .._servicebus_session import BaseSession
35+
3236
_log = logging.getLogger(__name__)
3337

3438

@@ -81,15 +85,26 @@ def create_properties(user_agent=None):
8185
return properties
8286

8387

84-
def renewable_start_time(renewable):
88+
def get_renewable_start_time(renewable):
8589
try:
8690
return renewable._received_timestamp_utc # pylint: disable=protected-access
8791
except AttributeError:
8892
pass
8993
try:
9094
return renewable._session_start # pylint: disable=protected-access
9195
except AttributeError:
92-
raise TypeError("Registered object is not renewable.")
96+
raise TypeError("Registered object is not renewable, renewable must be" +
97+
"a ServiceBusReceivedMessage or a ServiceBusSession from a sessionful ServiceBusReceiver.")
98+
99+
100+
def get_renewable_lock_duration(renewable):
101+
# type: (Union[ServiceBusReceivedMessage, BaseSession]) -> datetime.timedelta
102+
# pylint: disable=protected-access
103+
try:
104+
return max(renewable.locked_until_utc - utc_now(), datetime.timedelta(seconds=0))
105+
except AttributeError:
106+
raise TypeError("Registered object is not renewable, renewable must be" +
107+
"a ServiceBusReceivedMessage or a ServiceBusSession from a sessionful ServiceBusReceiver.")
93108

94109

95110
def create_authentication(client):

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
if TYPE_CHECKING:
2222
from azure.core.credentials import TokenCredential
23+
from ._common.auto_lock_renewer import AutoLockRenewer
2324

2425
_LOGGER = logging.getLogger(__name__)
2526

@@ -223,6 +224,9 @@ def get_queue_receiver(self, queue_name, **kwargs):
223224
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
224225
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
225226
receiver will automatically stop receiving. The default value is None, meaning no timeout.
227+
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
228+
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
229+
instead.
226230
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
227231
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
228232
performance but increase the chance that messages will expire while they are cached if they're not
@@ -329,6 +333,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
329333
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
330334
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
331335
receiver will automatically stop receiving. The default value is None, meaning no timeout.
336+
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
337+
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
338+
instead.
332339
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
333340
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
334341
performance but increase the chance that messages will expire while they are cached if they're not

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
if TYPE_CHECKING:
5050
import datetime
5151
from azure.core.credentials import TokenCredential
52+
from ._common.auto_lock_renewer import AutoLockRenewer
5253

5354
_LOGGER = logging.getLogger(__name__)
5455

@@ -94,6 +95,9 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man
9495
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
9596
Additionally the following keys may also be present: `'username', 'password'`.
9697
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
98+
:keyword Optional[AutoLockRenewer] auto_lock_renewer: An AutoLockRenewer can be provided such that messages are
99+
automatically registered on receipt. If the receiver is a session receiver, it will apply to the session
100+
instead.
97101
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
98102
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
99103
performance but increase the chance that messages will expire while they are cached if they're not
@@ -187,6 +191,8 @@ def _iter_next(self):
187191
self._message_iter = self._handler.receive_messages_iter()
188192
uamqp_message = next(self._message_iter)
189193
message = self._build_message(uamqp_message)
194+
if self._auto_lock_renewer and not self._session:
195+
self._auto_lock_renewer.register(self, message)
190196
return message
191197

192198
def _create_handler(self, auth):
@@ -227,6 +233,9 @@ def _open(self):
227233
self.close()
228234
raise
229235

236+
if self._auto_lock_renewer and self._session:
237+
self._auto_lock_renewer.register(self, self.session)
238+
230239
def _receive(self, max_message_count=None, timeout=None):
231240
# type: (Optional[int], Optional[float]) -> List[ServiceBusReceivedMessage]
232241
# pylint: disable=protected-access
@@ -510,12 +519,16 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):
510519
if max_wait_time is not None and max_wait_time <= 0:
511520
raise ValueError("The max_wait_time must be greater than 0.")
512521
self._check_live()
513-
return self._do_retryable_operation(
522+
messages = self._do_retryable_operation(
514523
self._receive,
515524
max_message_count=max_message_count,
516525
timeout=max_wait_time,
517526
operation_requires_timeout=True
518527
)
528+
if self._auto_lock_renewer and not self._session:
529+
for message in messages:
530+
self._auto_lock_renewer.register(self, message)
531+
return messages
519532

520533
def receive_deferred_messages(self, sequence_numbers, **kwargs):
521534
# type: (Union[int,List[int]], Any) -> List[ServiceBusReceivedMessage]
@@ -567,6 +580,9 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs):
567580
handler,
568581
timeout=timeout
569582
)
583+
if self._auto_lock_renewer and not self._session:
584+
for message in messages:
585+
self._auto_lock_renewer.register(self, message)
570586
return messages
571587

572588
def peek_messages(self, max_message_count=1, **kwargs):

0 commit comments

Comments
 (0)