Skip to content

Commit 2fce666

Browse files
authored
[EventHubs] add fixed backoff retry mode (Azure#21884)
* sync retry mode impl * async impl * add test * pylint * lint * adams comments * lint * adams comments * fix test
1 parent 811cf03 commit 2fce666

File tree

14 files changed

+223
-66
lines changed

14 files changed

+223
-66
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
### Features Added
66

7+
- Added support for fixed (linear) retry backoff:
8+
- Sync/async `EventHubProducerClient` and `EventHubConsumerClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
9+
- `RetryMode` enum has been added to `azure.eventhub`, with values `FIXED` and `EXPONENTIAL`.
10+
711
### Breaking Changes
812

913
### Bugs Fixed

sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
parse_connection_string,
1919
EventHubConnectionStringProperties
2020
)
21+
from ._retry import RetryMode
2122

2223
TransportType = constants.TransportType
2324

@@ -33,5 +34,6 @@
3334
"LoadBalancingStrategy",
3435
"PartitionContext",
3536
"parse_connection_string",
36-
"EventHubConnectionStringProperties"
37+
"EventHubConnectionStringProperties",
38+
"RetryMode"
3739
]

sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@
2020

2121
from uamqp import AMQPClient, Message, authentication, constants, errors, compat, utils
2222
import six
23-
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
23+
from azure.core.credentials import (
24+
AccessToken,
25+
AzureSasCredential,
26+
AzureNamedKeyCredential,
27+
)
2428
from azure.core.utils import parse_connection_string as core_parse_connection_string
2529

2630

2731
from .exceptions import _handle_exception, ClientClosedError, ConnectError
2832
from ._configuration import Configuration
33+
from ._retry import RetryMode
2934
from ._utils import utc_from_timestamp, parse_sas_credential
3035
from ._connection_manager import get_connection_manager
3136
from ._constants import (
@@ -34,7 +39,7 @@
3439
MGMT_OPERATION,
3540
MGMT_PARTITION_OPERATION,
3641
MGMT_STATUS_CODE,
37-
MGMT_STATUS_DESC
42+
MGMT_STATUS_DESC,
3843
)
3944

4045
if TYPE_CHECKING:
@@ -52,9 +57,11 @@ def _parse_conn_str(conn_str, **kwargs):
5257
entity_path = None # type: Optional[str]
5358
shared_access_signature = None # type: Optional[str]
5459
shared_access_signature_expiry = None
55-
eventhub_name = kwargs.pop("eventhub_name", None) # type: Optional[str]
56-
check_case = kwargs.pop("check_case", False) # type: bool
57-
conn_settings = core_parse_connection_string(conn_str, case_sensitive_keys=check_case)
60+
eventhub_name = kwargs.pop("eventhub_name", None) # type: Optional[str]
61+
check_case = kwargs.pop("check_case", False) # type: bool
62+
conn_settings = core_parse_connection_string(
63+
conn_str, case_sensitive_keys=check_case
64+
)
5865
if check_case:
5966
shared_access_key = conn_settings.get("SharedAccessKey")
6067
shared_access_key_name = conn_settings.get("SharedAccessKeyName")
@@ -79,7 +86,7 @@ def _parse_conn_str(conn_str, **kwargs):
7986
try:
8087
# Expiry can be stored in the "se=<timestamp>" clause of the token. ('&'-separated key-value pairs)
8188
shared_access_signature_expiry = int(
82-
shared_access_signature.split("se=")[1].split("&")[0] # type: ignore
89+
shared_access_signature.split("se=")[1].split("&")[0] # type: ignore
8390
)
8491
except (
8592
IndexError,
@@ -117,12 +124,14 @@ def _parse_conn_str(conn_str, **kwargs):
117124
"At least one of the SharedAccessKey or SharedAccessSignature must be present."
118125
)
119126

120-
return (host,
121-
str(shared_access_key_name) if shared_access_key_name else None,
122-
str(shared_access_key) if shared_access_key else None,
123-
entity,
124-
str(shared_access_signature) if shared_access_signature else None,
125-
shared_access_signature_expiry)
127+
return (
128+
host,
129+
str(shared_access_key_name) if shared_access_key_name else None,
130+
str(shared_access_key) if shared_access_key else None,
131+
entity,
132+
str(shared_access_signature) if shared_access_signature else None,
133+
shared_access_signature_expiry,
134+
)
126135

127136

128137
def _generate_sas_token(uri, policy, key, expiry=None):
@@ -154,6 +163,14 @@ def _build_uri(address, entity):
154163
return address
155164

156165

166+
def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
167+
if retry_mode == RetryMode.FIXED:
168+
backoff_value = backoff_factor
169+
else:
170+
backoff_value = backoff_factor * (2 ** retried_times)
171+
return min(backoff_max, backoff_value)
172+
173+
157174
class EventHubSharedKeyCredential(object):
158175
"""The shared access key credential used for authentication.
159176
@@ -200,6 +217,7 @@ class EventHubSASTokenCredential(object):
200217
:param str token: The shared access token string
201218
:param int expiry: The epoch timestamp
202219
"""
220+
203221
def __init__(self, token, expiry):
204222
# type: (str, int) -> None
205223
"""
@@ -225,6 +243,7 @@ class EventhubAzureSasTokenCredential(object):
225243
:param azure_sas_credential: The credential to be used for authentication.
226244
:type azure_sas_credential: ~azure.core.credentials.AzureSasCredential
227245
"""
246+
228247
def __init__(self, azure_sas_credential):
229248
# type: (AzureSasCredential) -> None
230249
"""The shared access token credential used for authentication
@@ -257,9 +276,9 @@ def __init__(self, fully_qualified_namespace, eventhub_name, credential, **kwarg
257276
if isinstance(credential, AzureSasCredential):
258277
self._credential = EventhubAzureSasTokenCredential(credential)
259278
elif isinstance(credential, AzureNamedKeyCredential):
260-
self._credential = EventhubAzureNamedKeyTokenCredential(credential) # type: ignore
279+
self._credential = EventhubAzureNamedKeyTokenCredential(credential) # type: ignore
261280
else:
262-
self._credential = credential #type: ignore
281+
self._credential = credential # type: ignore
263282
self._keep_alive = kwargs.get("keep_alive", 30)
264283
self._auto_reconnect = kwargs.get("auto_reconnect", True)
265284
self._mgmt_target = "amqps://{}/{}".format(
@@ -274,7 +293,9 @@ def __init__(self, fully_qualified_namespace, eventhub_name, credential, **kwarg
274293
@staticmethod
275294
def _from_connection_string(conn_str, **kwargs):
276295
# type: (str, Any) -> Dict[str, Any]
277-
host, policy, key, entity, token, token_expiry = _parse_conn_str(conn_str, **kwargs)
296+
host, policy, key, entity, token, token_expiry = _parse_conn_str(
297+
conn_str, **kwargs
298+
)
278299
kwargs["fully_qualified_namespace"] = host
279300
kwargs["eventhub_name"] = entity
280301
if token and token_expiry:
@@ -291,7 +312,7 @@ def _create_auth(self):
291312
"""
292313
try:
293314
# ignore mypy's warning because token_type is Optional
294-
token_type = self._credential.token_type # type: ignore
315+
token_type = self._credential.token_type # type: ignore
295316
except AttributeError:
296317
token_type = b"jwt"
297318
if token_type == b"servicebus.windows.net:sastoken":
@@ -305,7 +326,7 @@ def _create_auth(self):
305326
transport_type=self._config.transport_type,
306327
custom_endpoint_hostname=self._config.custom_endpoint_hostname,
307328
port=self._config.connection_port,
308-
verify=self._config.connection_verify
329+
verify=self._config.connection_verify,
309330
)
310331
auth.update_token()
311332
return auth
@@ -319,7 +340,7 @@ def _create_auth(self):
319340
transport_type=self._config.transport_type,
320341
custom_endpoint_hostname=self._config.custom_endpoint_hostname,
321342
port=self._config.connection_port,
322-
verify=self._config.connection_verify
343+
verify=self._config.connection_verify,
323344
)
324345

325346
def _close_connection(self):
@@ -331,7 +352,12 @@ def _backoff(
331352
):
332353
# type: (int, Exception, Optional[int], Optional[str]) -> None
333354
entity_name = entity_name or self._container_id
334-
backoff = self._config.backoff_factor * 2 ** retried_times
355+
backoff = _get_backoff_time(
356+
self._config.retry_mode,
357+
self._config.backoff_factor,
358+
self._config.backoff_max,
359+
retried_times,
360+
)
335361
if backoff <= self._config.backoff_max and (
336362
timeout_time is None or time.time() + backoff <= timeout_time
337363
): # pylint:disable=no-else-return
@@ -360,7 +386,7 @@ def _management_request(self, mgmt_msg, op_type):
360386
self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing
361387
)
362388
try:
363-
conn = self._conn_manager.get_connection(
389+
conn = self._conn_manager.get_connection( # pylint:disable=assignment-from-none
364390
self._address.hostname, mgmt_auth
365391
)
366392
mgmt_client.open(connection=conn)
@@ -373,29 +399,28 @@ def _management_request(self, mgmt_msg, op_type):
373399
description_fields=MGMT_STATUS_DESC,
374400
)
375401
status_code = int(response.application_properties[MGMT_STATUS_CODE])
376-
description = response.application_properties.get(MGMT_STATUS_DESC) # type: Optional[Union[str, bytes]]
402+
description = response.application_properties.get(
403+
MGMT_STATUS_DESC
404+
) # type: Optional[Union[str, bytes]]
377405
if description and isinstance(description, six.binary_type):
378-
description = description.decode('utf-8')
406+
description = description.decode("utf-8")
379407
if status_code < 400:
380408
return response
381409
if status_code in [401]:
382410
raise errors.AuthenticationException(
383411
"Management authentication failed. Status code: {}, Description: {!r}".format(
384-
status_code,
385-
description
412+
status_code, description
386413
)
387414
)
388415
if status_code in [404]:
389416
raise ConnectError(
390417
"Management connection failed. Status code: {}, Description: {!r}".format(
391-
status_code,
392-
description
418+
status_code, description
393419
)
394420
)
395421
raise errors.AMQPConnectionError(
396422
"Management request error. Status code: {}, Description: {!r}".format(
397-
status_code,
398-
description
423+
status_code, description
399424
)
400425
)
401426
except Exception as exception: # pylint: disable=broad-except
@@ -491,9 +516,7 @@ def _check_closed(self):
491516
)
492517

493518
def _open(self):
494-
"""Open the EventHubConsumer/EventHubProducer using the supplied connection.
495-
496-
"""
519+
"""Open the EventHubConsumer/EventHubProducer using the supplied connection."""
497520
# pylint: disable=protected-access
498521
if not self.running:
499522
if self._handler:

sdk/eventhub/azure-eventhub/azure/eventhub/_common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def _from_message(cls, message, raw_amqp_message=None):
182182
"""
183183
event_data = cls(body="")
184184
event_data.message = message
185+
# pylint: disable=protected-access
185186
event_data._raw_amqp_message = raw_amqp_message if raw_amqp_message else AmqpAnnotatedMessage(message=message)
186187
return event_data
187188

sdk/eventhub/azure-eventhub/azure/eventhub/_configuration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
from urllib.parse import urlparse
1111

1212
from uamqp.constants import TransportType, DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT
13+
from ._retry import RetryMode
1314

1415

1516
class Configuration(object): # pylint:disable=too-many-instance-attributes
1617
def __init__(self, **kwargs):
1718
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
1819
self.retry_total = kwargs.get("retry_total", 3) # type: int
1920
self.max_retries = self.retry_total # type: int
21+
self.retry_mode = kwargs.get("retry_mode", RetryMode.EXPONENTIAL)
2022
self.backoff_factor = kwargs.get("retry_backoff_factor", 0.8) # type: float
2123
self.backoff_max = kwargs.get("retry_backoff_max", 120) # type: int
2224
self.network_tracing = kwargs.get("network_tracing", False) # type: bool

sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ class EventHubConsumerClient(ClientBase):
7272
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
7373
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
7474
receiving.
75+
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
76+
(most errors are resolved immediately by a second try without a delay).
77+
In fixed mode, retry policy will always sleep for {backoff factor}.
78+
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
79+
seconds. If the backoff_factor is 0.1, then the retry will sleep
80+
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
81+
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
82+
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
83+
:paramtype retry_mode: ~azure.eventhub.RetryMode
7584
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
7685
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
7786
inactivity unless initiated by the service.
@@ -219,6 +228,15 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs):
219228
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
220229
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
221230
provided) to resume receiving.
231+
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
232+
(most errors are resolved immediately by a second try without a delay).
233+
In fixed mode, retry policy will always sleep for {backoff factor}.
234+
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
235+
seconds. If the backoff_factor is 0.1, then the retry will sleep
236+
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
237+
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
238+
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
239+
:paramtype retry_mode: ~azure.eventhub.RetryMode
222240
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
223241
if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due
224242
to inactivity unless initiated by the service.

sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ class EventHubProducerClient(ClientBase):
4747
:keyword str user_agent: If specified, this will be added in front of the user agent string.
4848
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
4949
value is 3.
50+
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
51+
(most errors are resolved immediately by a second try without a delay).
52+
In fixed mode, retry policy will always sleep for {backoff factor}.
53+
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
54+
seconds. If the backoff_factor is 0.1, then the retry will sleep
55+
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
56+
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
57+
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
58+
:paramtype retry_mode: ~azure.eventhub.RetryMode
5059
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
5160
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
5261
unless initiated by the service.
@@ -185,6 +194,15 @@ def from_connection_string(cls, conn_str, **kwargs):
185194
:keyword str user_agent: If specified, this will be added in front of the user agent string.
186195
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
187196
Default value is 3.
197+
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
198+
(most errors are resolved immediately by a second try without a delay).
199+
In fixed mode, retry policy will always sleep for {backoff factor}.
200+
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
201+
seconds. If the backoff_factor is 0.1, then the retry will sleep
202+
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
203+
:keyword float retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
204+
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
205+
:paramtype retry_mode: ~azure.eventhub.RetryMode
188206
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
189207
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
190208
inactivity unless initiated by the service.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
from typing import Optional, Dict, Any
6+
7+
from enum import Enum
8+
9+
class RetryMode(str, Enum):
10+
EXPONENTIAL = 'exponential'
11+
FIXED = 'fixed'

sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,14 +311,14 @@ def decode_with_recurse(data, encoding="UTF-8"):
311311
return data
312312
if isinstance(data, six.binary_type):
313313
return data.decode(encoding)
314-
if isinstance(data, Mapping):
314+
if isinstance(data, Mapping): # pylint:disable=isinstance-second-argument-not-valid-type
315315
decoded_mapping = {}
316316
for k, v in data.items():
317317
decoded_key = decode_with_recurse(k, encoding)
318318
decoded_val = decode_with_recurse(v, encoding)
319319
decoded_mapping[decoded_key] = decoded_val
320320
return decoded_mapping
321-
if isinstance(data, Iterable):
321+
if isinstance(data, Iterable): # pylint:disable=isinstance-second-argument-not-valid-type
322322
decoded_list = []
323323
for d in data:
324324
decoded_list.append(decode_with_recurse(d, encoding))

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_async_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import sys
88

9-
109
def get_dict_with_loop_if_needed(loop):
1110
if sys.version_info >= (3, 10):
1211
if loop:

0 commit comments

Comments
 (0)