Skip to content

Commit 356d974

Browse files
[ServiceBus] improve memory usage of service bus client (Azure#19915)
* improve memory usage of service bus client * remove set * Update sdk/servicebus/azure-servicebus/CHANGELOG.md Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * revert set * update impl to avoid access private ivars * fix bug * use weakref * fix pylint Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>
1 parent c935c3d commit 356d974

File tree

5 files changed

+121
-27
lines changed

5 files changed

+121
-27
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
# Release History
22

3-
## 7.3.3 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 7.3.3 (2021-09-08)
84

95
### Bugs Fixed
106

11-
### Other Changes
12-
7+
- Improved memory usage of `ServiceBusClient` to automatically discard spawned `ServiceBusSender` or `ServiceBusReceiver` from its handler set when no strong reference to the sender or receiver exists anymore.
138
- Reduced CPU load of `azure.servicebus.AutoLockRenewer` during lock renewal.
149

1510
## 7.3.2 (2021-08-10)

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5-
from typing import Any, List, Union, TYPE_CHECKING
5+
from typing import Any, Union, TYPE_CHECKING
66
import logging
7+
from weakref import WeakSet
78

89
import uamqp
910

1011
from ._base_handler import (
1112
_parse_conn_str,
1213
ServiceBusSharedKeyCredential,
1314
ServiceBusSASTokenCredential,
14-
BaseHandler,
1515
)
1616
from ._servicebus_sender import ServiceBusSender
1717
from ._servicebus_receiver import ServiceBusReceiver
@@ -89,7 +89,7 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs):
8989
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
9090
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
9191
self._connection_sharing = False
92-
self._handlers = [] # type: List[BaseHandler]
92+
self._handlers = WeakSet() # type: WeakSet
9393

9494
def __enter__(self):
9595
if self._connection_sharing:
@@ -124,7 +124,8 @@ def close(self):
124124
handler._container_id, # pylint: disable=protected-access
125125
exception,
126126
)
127-
del self._handlers[:]
127+
128+
self._handlers.clear()
128129

129130
if self._connection_sharing and self._connection:
130131
self._connection.destroy()
@@ -216,7 +217,7 @@ def get_queue_sender(self, queue_name, **kwargs):
216217
retry_backoff_max=self._config.retry_backoff_max,
217218
**kwargs
218219
)
219-
self._handlers.append(handler)
220+
self._handlers.add(handler)
220221
return handler
221222

222223
def get_queue_receiver(self, queue_name, **kwargs):
@@ -307,7 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
307308
retry_backoff_max=self._config.retry_backoff_max,
308309
**kwargs
309310
)
310-
self._handlers.append(handler)
311+
self._handlers.add(handler)
311312
return handler
312313

313314
def get_topic_sender(self, topic_name, **kwargs):
@@ -348,7 +349,7 @@ def get_topic_sender(self, topic_name, **kwargs):
348349
retry_backoff_max=self._config.retry_backoff_max,
349350
**kwargs
350351
)
351-
self._handlers.append(handler)
352+
self._handlers.add(handler)
352353
return handler
353354

354355
def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
@@ -457,5 +458,5 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
457458
retry_backoff_max=self._config.retry_backoff_max,
458459
**kwargs
459460
)
460-
self._handlers.append(handler)
461+
self._handlers.add(handler)
461462
return handler

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5-
from typing import Any, List, Union, TYPE_CHECKING
5+
from typing import Any, Union, TYPE_CHECKING
66
import logging
7+
from weakref import WeakSet
78

89
import uamqp
910
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@@ -12,7 +13,6 @@
1213
from ._base_handler_async import (
1314
ServiceBusSharedKeyCredential,
1415
ServiceBusSASTokenCredential,
15-
BaseHandler,
1616
)
1717
from ._servicebus_sender_async import ServiceBusSender
1818
from ._servicebus_receiver_async import ServiceBusReceiver
@@ -90,7 +90,7 @@ def __init__(
9090
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
9191
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
9292
self._connection_sharing = False
93-
self._handlers = [] # type: List[BaseHandler]
93+
self._handlers = WeakSet() # type: WeakSet
9494

9595
async def __aenter__(self):
9696
if self._connection_sharing:
@@ -171,7 +171,7 @@ async def close(self) -> None:
171171
handler._container_id, # pylint: disable=protected-access
172172
exception,
173173
)
174-
del self._handlers[:]
174+
self._handlers.clear()
175175

176176
if self._connection_sharing and self._connection:
177177
await self._connection.destroy_async()
@@ -214,7 +214,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
214214
retry_backoff_max=self._config.retry_backoff_max,
215215
**kwargs
216216
)
217-
self._handlers.append(handler)
217+
self._handlers.add(handler)
218218
return handler
219219

220220
def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver:
@@ -303,7 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
303303
retry_backoff_max=self._config.retry_backoff_max,
304304
**kwargs
305305
)
306-
self._handlers.append(handler)
306+
self._handlers.add(handler)
307307
return handler
308308

309309
def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
@@ -343,7 +343,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
343343
retry_backoff_max=self._config.retry_backoff_max,
344344
**kwargs
345345
)
346-
self._handlers.append(handler)
346+
self._handlers.add(handler)
347347
return handler
348348

349349
def get_subscription_receiver(
@@ -453,5 +453,5 @@ def get_subscription_receiver(
453453
retry_backoff_max=self._config.retry_backoff_max,
454454
**kwargs
455455
)
456-
self._handlers.append(handler)
456+
self._handlers.add(handler)
457457
return handler

sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
ServiceBusQueuePreparer,
2626
ServiceBusNamespaceAuthorizationRulePreparer,
2727
ServiceBusQueueAuthorizationRulePreparer,
28-
CachedServiceBusQueuePreparer
28+
CachedServiceBusQueuePreparer,
29+
CachedServiceBusTopicPreparer,
30+
CachedServiceBusSubscriptionPreparer
2931
)
3032
from utilities import get_logger
3133

@@ -137,7 +139,9 @@ async def test_sb_client_writeonly_credentials_async(self, servicebus_authorizat
137139
@CachedResourceGroupPreparer()
138140
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
139141
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
140-
async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
142+
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
143+
@CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest')
144+
async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs):
141145
client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string)
142146

143147
await client.close()
@@ -174,6 +178,51 @@ async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace
174178
assert not receiver._handler and not receiver._running
175179
assert len(client._handlers) == 0
176180

181+
queue_sender = client.get_queue_sender(servicebus_queue.name)
182+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
183+
assert len(client._handlers) == 2
184+
queue_sender = client.get_queue_sender(servicebus_queue.name)
185+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
186+
# the previous sender/receiver can not longer be referenced, there might be a delay in CPython
187+
# to remove the reference, so len of handlers should be less than 4
188+
assert len(client._handlers) < 4
189+
await client.close()
190+
191+
queue_sender = client.get_queue_sender(servicebus_queue.name)
192+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
193+
assert len(client._handlers) == 2
194+
queue_sender = None
195+
queue_receiver = None
196+
assert len(client._handlers) < 2
197+
198+
await client.close()
199+
topic_sender = client.get_topic_sender(servicebus_topic.name)
200+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
201+
assert len(client._handlers) == 2
202+
topic_sender = None
203+
subscription_receiver = None
204+
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
205+
assert len(client._handlers) < 4
206+
207+
await client.close()
208+
topic_sender = client.get_topic_sender(servicebus_topic.name)
209+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
210+
assert len(client._handlers) == 2
211+
topic_sender = client.get_topic_sender(servicebus_topic.name)
212+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
213+
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
214+
assert len(client._handlers) < 4
215+
216+
await client.close()
217+
for _ in range(5):
218+
queue_sender = client.get_queue_sender(servicebus_queue.name)
219+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
220+
topic_sender = client.get_topic_sender(servicebus_topic.name)
221+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name,
222+
servicebus_subscription.name)
223+
assert len(client._handlers) < 15
224+
await client.close()
225+
177226
@pytest.mark.liveTest
178227
@pytest.mark.live_test_only
179228
@CachedResourceGroupPreparer(name_prefix='servicebustest')

sdk/servicebus/azure-servicebus/tests/test_sb_client.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
ServiceBusQueuePreparer,
3030
ServiceBusNamespaceAuthorizationRulePreparer,
3131
ServiceBusQueueAuthorizationRulePreparer,
32-
CachedServiceBusQueuePreparer
32+
CachedServiceBusQueuePreparer,
33+
CachedServiceBusTopicPreparer,
34+
CachedServiceBusSubscriptionPreparer
3335
)
3436

3537
class ServiceBusClientTests(AzureMgmtTestCase):
@@ -201,7 +203,9 @@ def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization
201203
@CachedResourceGroupPreparer()
202204
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
203205
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
204-
def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
206+
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
207+
@CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest')
208+
def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs):
205209
client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string)
206210

207211
client.close()
@@ -238,6 +242,51 @@ def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_
238242
assert not receiver._handler and not receiver._running
239243
assert len(client._handlers) == 0
240244

245+
queue_sender = client.get_queue_sender(servicebus_queue.name)
246+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
247+
assert len(client._handlers) == 2
248+
queue_sender = client.get_queue_sender(servicebus_queue.name)
249+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
250+
# the previous sender/receiver can not longer be referenced, there might be a delay in CPython
251+
# to remove the reference, so len of handlers should be less than 4
252+
assert len(client._handlers) < 4
253+
client.close()
254+
255+
queue_sender = client.get_queue_sender(servicebus_queue.name)
256+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
257+
assert len(client._handlers) == 2
258+
queue_sender = None
259+
queue_receiver = None
260+
assert len(client._handlers) < 2
261+
262+
client.close()
263+
topic_sender = client.get_topic_sender(servicebus_topic.name)
264+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
265+
assert len(client._handlers) == 2
266+
topic_sender = None
267+
subscription_receiver = None
268+
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
269+
assert len(client._handlers) < 4
270+
271+
client.close()
272+
topic_sender = client.get_topic_sender(servicebus_topic.name)
273+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
274+
assert len(client._handlers) == 2
275+
topic_sender = client.get_topic_sender(servicebus_topic.name)
276+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
277+
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
278+
assert len(client._handlers) < 4
279+
280+
client.close()
281+
for _ in range(5):
282+
queue_sender = client.get_queue_sender(servicebus_queue.name)
283+
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
284+
topic_sender = client.get_topic_sender(servicebus_topic.name)
285+
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name,
286+
servicebus_subscription.name)
287+
assert len(client._handlers) < 15
288+
client.close()
289+
241290
@pytest.mark.liveTest
242291
@pytest.mark.live_test_only
243292
@CachedResourceGroupPreparer()

0 commit comments

Comments
 (0)