Skip to content

Commit b9480a5

Browse files
authored
[ServiceBus] receive related methods/constructor raise error if given max_wait_time <= 0 (Azure#14925)
* raise error if max_wait_time <= 0 * add test for subscription receiver * Update sdk/servicebus/azure-servicebus/CHANGELOG.md
1 parent 215720f commit b9480a5

File tree

10 files changed

+82
-21
lines changed

10 files changed

+82
-21
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ now raise more concrete exception other than `MessageSettleFailed` and `ServiceB
3737
* Exceptions `MessageSendFailed`, `MessageSettleFailed` and `MessageLockExpired`
3838
now inherit from `azure.servicebus.exceptions.MessageError`.
3939
* `get_state` in `ServiceBusSession` now returns `bytes` instead of a `string`.
40+
* `ServiceBusReceiver.receive_messages/get_streaming_message_iter` and
41+
`ServiceBusClient.get_<queue/subscription>_receiver` now raises ValueError if the given `max_wait_time` is less than 0.
4042
* Message settlement methods are moved from `ServiceBusMessage` to `ServiceBusReceiver`:
4143
- Use `ServiceBusReceiver.complete_message` instead of `ServiceBusReceivedMessage.complete` to complete a message.
4244
- Use `ServiceBusReceiver.abandon_message` instead of `ServiceBusReceivedMessage.abandon` to abandon a message.

sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def _populate_attributes(self, **kwargs):
6969
# In large max_message_count case, like 5000, the pull receive would always return hundreds of messages limited
7070
# by the perf and time.
7171
self._further_pull_receive_timeout_ms = 200
72-
self._max_wait_time = kwargs.get("max_wait_time", None)
72+
max_wait_time = kwargs.get("max_wait_time", None)
73+
if max_wait_time is not None and max_wait_time <= 0:
74+
raise ValueError("The max_wait_time must be greater than 0.")
75+
self._max_wait_time = max_wait_time
7376

7477
def _build_message(self, received, message_type=ServiceBusReceivedMessage):
7578
message = message_type(message=received, receive_mode=self._receive_mode, receiver=self)

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
221221
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
222222
the client fails to process the message. The default receive_mode is PeekLock.
223223
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
224-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
225-
automatically stop receiving. The default value is 0, meaning no timeout.
224+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
225+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
226226
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
227227
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
228228
performance but increase the chance that messages will expire while they are cached if they're not
@@ -327,8 +327,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
327327
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
328328
the client fails to process the message. The default receive_mode is PeekLock.
329329
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
330-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
331-
automatically stop receiving. The default value is 0, meaning no timeout.
330+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
331+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
332332
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
333333
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
334334
performance but increase the chance that messages will expire while they are cached if they're not

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ class ServiceBusReceiver(BaseHandler, ReceiverMixin): # pylint: disable=too-man
7777
the client connects to.
7878
:keyword str subscription_name: The path of specific Service Bus Subscription under the
7979
specified Topic the client connects to.
80-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
81-
automatically shutdown. The default value is 0, meaning no timeout.
80+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
81+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
8282
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
8383
are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given
8484
lock period before they will be removed from the queue. Messages received with ReceiveAndDelete
@@ -381,15 +381,15 @@ def close(self):
381381
self._message_iter = None # pylint: disable=attribute-defined-outside-init
382382

383383
def get_streaming_message_iter(self, max_wait_time=None):
384-
# type: (float) -> Iterator[ServiceBusReceivedMessage]
384+
# type: (Optional[float]) -> Iterator[ServiceBusReceivedMessage]
385385
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
386386
such a timeout occurs.
387387
388388
:param max_wait_time: Maximum time to wait in seconds for the next message to arrive.
389389
If no messages arrive, and no timeout is specified, this call will not return
390390
until the connection is closed. If specified, and no messages arrive for the
391391
timeout period, the iterator will stop.
392-
:type max_wait_time: float
392+
:type max_wait_time: Optional[float]
393393
:rtype: Iterator[ServiceBusReceivedMessage]
394394
395395
.. admonition:: Example:
@@ -401,6 +401,8 @@ def get_streaming_message_iter(self, max_wait_time=None):
401401
:dedent: 4
402402
:caption: Receive indefinitely from an iterator in streaming fashion.
403403
"""
404+
if max_wait_time is not None and max_wait_time <= 0:
405+
raise ValueError("The max_wait_time must be greater than 0.")
404406
return self._iter_contextual_wrapper(max_wait_time)
405407

406408
@classmethod
@@ -426,8 +428,8 @@ def from_connection_string(
426428
if the client fails to process the message.
427429
The default mode is PeekLock.
428430
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
429-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
430-
automatically shutdown. The default value is 0, meaning no timeout.
431+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
432+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
431433
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
432434
:keyword transport_type: The type of transport protocol that will be used for communicating with
433435
the Service Bus service. Default is `TransportType.Amqp`.
@@ -484,9 +486,9 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):
484486
return as soon as at least one message is received and there is a gap in incoming messages regardless
485487
of the specified batch size.
486488
487-
:param int max_message_count: Maximum number of messages in the batch. Actual number
489+
:param Optional[int] max_message_count: Maximum number of messages in the batch. Actual number
488490
returned will depend on prefetch_count and incoming stream rate.
489-
:param float max_wait_time: Maximum time to wait in seconds for the first message to arrive.
491+
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
490492
If no messages arrive, and no timeout is specified, this call will not return
491493
until the connection is closed. If specified, an no messages arrive within the
492494
timeout period, an empty list will be returned.
@@ -503,6 +505,8 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):
503505
:caption: Receive messages from ServiceBus.
504506
505507
"""
508+
if max_wait_time is not None and max_wait_time <= 0:
509+
raise ValueError("The max_wait_time must be greater than 0.")
506510
self._check_live()
507511
return self._do_retryable_operation(
508512
self._receive,

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
214214
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
215215
the client fails to process the message. The default mode is PeekLock.
216216
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
217-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
218-
automatically stop receiving. The default value is 0, meaning no timeout.
217+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
218+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
219219
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
220220
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
221221
performance but increase the chance that messages will expire while they are cached if they're not
@@ -317,8 +317,8 @@ def get_subscription_receiver(self, topic_name: str, subscription_name: str, **k
317317
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
318318
the client fails to process the message. The default mode is PeekLock.
319319
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
320-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
321-
automatically stop receiving. The default value is 0, meaning no timeout.
320+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
321+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
322322
:keyword int prefetch_count: The maximum number of messages to cache with each request to the service.
323323
This setting is only for advanced performance tuning. Increasing this value will improve message throughput
324324
performance but increase the chance that messages will expire while they are cached if they're not

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMix
8181
if the client fails to process the message.
8282
The default mode is PeekLock.
8383
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
84-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
85-
automatically shutdown. The default value is 0, meaning no timeout.
84+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver
85+
will automatically stop receiving. The default value is None, meaning no timeout.
8686
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
8787
:keyword transport_type: The type of transport protocol that will be used for communicating with
8888
the Service Bus service. Default is `TransportType.Amqp`.
@@ -402,6 +402,8 @@ def get_streaming_message_iter(
402402
:dedent: 4
403403
:caption: Receive indefinitely from an iterator in streaming fashion.
404404
"""
405+
if max_wait_time is not None and max_wait_time <= 0:
406+
raise ValueError("The max_wait_time must be greater than 0.")
405407
return self._IterContextualWrapper(self, max_wait_time)
406408

407409
@classmethod
@@ -425,8 +427,8 @@ def from_connection_string(
425427
if the client fails to process the message.
426428
The default mode is PeekLock.
427429
:paramtype receive_mode: ~azure.servicebus.ReceiveMode
428-
:keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will
429-
automatically shutdown. The default value is 0, meaning no timeout.
430+
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
431+
receiver will automatically stop receiving. The default value is None, meaning no timeout.
430432
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
431433
:keyword transport_type: The type of transport protocol that will be used for communicating with
432434
the Service Bus service. Default is `TransportType.Amqp`.
@@ -504,6 +506,8 @@ async def receive_messages(
504506
:caption: Receive messages from ServiceBus.
505507
506508
"""
509+
if max_wait_time is not None and max_wait_time <= 0:
510+
raise ValueError("The max_wait_time must be greater than 0.")
507511
self._check_live()
508512
return await self._do_retryable_operation(
509513
self._receive,

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,17 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
6969
with pytest.raises(ServiceBusConnectionError):
7070
await (sb_client.get_queue_receiver(servicebus_queue.name, session_id="test", max_wait_time=5))._open_with_retry()
7171

72+
with pytest.raises(ValueError):
73+
sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=0)
74+
7275
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:
76+
77+
with pytest.raises(ValueError):
78+
await receiver.receive_messages(max_wait_time=0)
79+
80+
with pytest.raises(ValueError):
81+
await receiver.get_streaming_message_iter(max_wait_time=0)
82+
7383
count = 0
7484
async for message in receiver:
7585
print_message(_logger, message)

sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,25 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self,
4747
message = ServiceBusMessage(b"Sample topic message")
4848
await sender.send_messages(message)
4949

50+
with pytest.raises(ValueError):
51+
sb_client.get_subscription_receiver(
52+
topic_name=servicebus_topic.name,
53+
subscription_name=servicebus_subscription.name,
54+
max_wait_time=0
55+
)
56+
5057
async with sb_client.get_subscription_receiver(
5158
topic_name=servicebus_topic.name,
5259
subscription_name=servicebus_subscription.name,
5360
max_wait_time=5
5461
) as receiver:
62+
63+
with pytest.raises(ValueError):
64+
await receiver.receive_messages(max_wait_time=-1)
65+
66+
with pytest.raises(ValueError):
67+
await receiver.get_streaming_message_iter(max_wait_time=0)
68+
5569
count = 0
5670
async for message in receiver:
5771
count += 1

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,17 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu
131131
message.reply_to = 'reply_to'
132132
sender.send_messages(message)
133133

134+
with pytest.raises(ValueError):
135+
sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=0)
136+
134137
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
138+
139+
with pytest.raises(ValueError):
140+
receiver.receive_messages(max_wait_time=0)
141+
142+
with pytest.raises(ValueError):
143+
receiver.get_streaming_message_iter(max_wait_time=0)
144+
135145
count = 0
136146
for message in receiver:
137147
print_message(_logger, message)

sdk/servicebus/azure-servicebus/tests/test_subscriptions.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,25 @@ def test_subscription_by_subscription_client_conn_str_receive_basic(self, servic
4646
message = ServiceBusMessage(b"Sample topic message")
4747
sender.send_messages(message)
4848

49+
with pytest.raises(ValueError):
50+
sb_client.get_subscription_receiver(
51+
topic_name=servicebus_topic.name,
52+
subscription_name=servicebus_subscription.name,
53+
max_wait_time=0
54+
)
55+
4956
with sb_client.get_subscription_receiver(
5057
topic_name=servicebus_topic.name,
5158
subscription_name=servicebus_subscription.name,
5259
max_wait_time=5
5360
) as receiver:
61+
62+
with pytest.raises(ValueError):
63+
receiver.receive_messages(max_wait_time=-1)
64+
65+
with pytest.raises(ValueError):
66+
receiver.get_streaming_message_iter(max_wait_time=0)
67+
5468
count = 0
5569
for message in receiver:
5670
count += 1

0 commit comments

Comments
 (0)