Skip to content

Commit d5e4a4c

Browse files
authored
[ServiceBus&EventHubs] api fixes (Azure#22418)
* api updates * fix mypy 3.7 errors * Annas comments * mypy literal variable error * lint * update type hint session_id * docstring * lint * add SBSessionFilter to public API * add typing_extensions to setup.py/add back kwargs with warnings * add kwargs + warn to amqp message * typing-extensions add to shared reqs * lint * add bool to body ED type * anna + adam comments * move to extras require * add back to install reqs
1 parent cdeb608 commit d5e4a4c

File tree

14 files changed

+124
-39
lines changed

14 files changed

+124
-39
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_common.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import json
88
import logging
9+
import uuid
910
from typing import (
1011
Union,
1112
Dict,
@@ -59,6 +60,17 @@
5960
if TYPE_CHECKING:
6061
import datetime
6162

63+
PrimitiveTypes = Optional[Union[
64+
int,
65+
float,
66+
bytes,
67+
bool,
68+
str,
69+
Dict,
70+
List,
71+
uuid.UUID,
72+
]]
73+
6274
_LOGGER = logging.getLogger(__name__)
6375

6476
# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
@@ -319,7 +331,7 @@ def system_properties(self):
319331

320332
@property
321333
def body(self):
322-
# type: () -> Any
334+
# type: () -> PrimitiveTypes
323335
"""The body of the Message. The format may vary depending on the body type:
324336
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.DATA<azure.eventhub.amqp.AmqpMessageBodyType.DATA>`,
325337
the body could be bytes or Iterable[bytes].
@@ -328,7 +340,7 @@ def body(self):
328340
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.VALUE<azure.eventhub.amqp.AmqpMessageBodyType.VALUE>`,
329341
the body could be any type.
330342
331-
:rtype: Any
343+
:rtype: int or bool or float or bytes or str or dict or list or uuid.UUID
332344
"""
333345
try:
334346
return self._raw_amqp_message.body

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This version and all future versions will require Python 3.7+. Python 2.7 and 3.
88

99
- Added support for fixed (linear) retry backoff:
1010
- Sync/async `ServiceBusClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
11+
- Added new enum class `ServiceBusSessionFilter`, which is the type of existing `NEXT_AVAILABLE_SESSION` value.
1112

1213
### Breaking Changes
1314

sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ._common.constants import (
2222
ServiceBusReceiveMode,
2323
ServiceBusSubQueue,
24+
ServiceBusSessionFilter,
2425
NEXT_AVAILABLE_SESSION,
2526
)
2627
from ._common.auto_lock_renewer import AutoLockRenewer
@@ -37,6 +38,7 @@
3738
"ServiceBusReceivedMessage",
3839
"NEXT_AVAILABLE_SESSION",
3940
"ServiceBusSubQueue",
41+
"ServiceBusSessionFilter",
4042
"ServiceBusReceiveMode",
4143
"ServiceBusClient",
4244
"ServiceBusReceiver",

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import Any, Union, Optional, TYPE_CHECKING
66
import logging
77
from weakref import WeakSet
8+
from typing_extensions import Literal
89

910
import uamqp
1011

@@ -25,6 +26,7 @@
2526
from ._common.constants import (
2627
ServiceBusSubQueue,
2728
ServiceBusReceiveMode,
29+
ServiceBusSessionFilter,
2830
)
2931

3032
if TYPE_CHECKING:
@@ -34,6 +36,8 @@
3436
AzureNamedKeyCredential,
3537
)
3638

39+
NextAvailableSessionType = Literal[ServiceBusSessionFilter.NEXT_AVAILABLE]
40+
3741

3842
_LOGGER = logging.getLogger(__name__)
3943

@@ -269,7 +273,7 @@ def get_queue_receiver(
269273
self,
270274
queue_name: str,
271275
*,
272-
session_id: Optional[str] = None,
276+
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
273277
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
274278
receive_mode: Union[
275279
ServiceBusReceiveMode, str
@@ -285,13 +289,14 @@ def get_queue_receiver(
285289
:keyword session_id: A specific session from which to receive. This must be specified for a
286290
sessionful queue, otherwise it must be None. In order to receive messages from the next available
287291
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
288-
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
289-
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
292+
:paramtype session_id: str or ~azure.servicebus.NEXT_AVAILABLE_SESSION
293+
:keyword sub_queue: If specified, the subqueue this receiver will
290294
connect to.
291295
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
292296
receiver or messages that can't be processed.
293297
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
294298
enum or equivalent string values "deadletter" and "transferdeadletter".
299+
:paramtype sub_queue: str or ~azure.servicebus.ServiceBusSubQueue
295300
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
296301
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
297302
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
@@ -421,7 +426,7 @@ def get_subscription_receiver(
421426
topic_name: str,
422427
subscription_name: str,
423428
*,
424-
session_id: Optional[str] = None,
429+
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
425430
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
426431
receive_mode: Union[
427432
ServiceBusReceiveMode, str
@@ -439,13 +444,14 @@ def get_subscription_receiver(
439444
:keyword session_id: A specific session from which to receive. This must be specified for a
440445
sessionful subscription, otherwise it must be None. In order to receive messages from the next available
441446
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
442-
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
443-
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
447+
:paramtype session_id: str or ~azure.servicebus.NEXT_AVAILABLE_SESSION
448+
:keyword sub_queue: If specified, the subqueue this receiver will
444449
connect to.
445450
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
446451
receiver or messages that can't be processed.
447452
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
448453
enum or equivalent string values "deadletter" and "transferdeadletter".
454+
:paramtype sub_queue: str or ~azure.servicebus.ServiceBusSubQueue
449455
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
450456
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
451457
lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import functools
88
import uuid
99
import datetime
10+
import warnings
1011
from typing import Any, List, Optional, Dict, Iterator, Union, TYPE_CHECKING
1112

1213
import six
@@ -660,7 +661,8 @@ def receive_deferred_messages(
660661
self,
661662
sequence_numbers: Union[int, List[int]],
662663
*,
663-
timeout: Optional[float] = None
664+
timeout: Optional[float] = None,
665+
**kwargs: Any
664666
) -> List[ServiceBusReceivedMessage]:
665667
"""Receive messages that have previously been deferred.
666668
@@ -683,6 +685,8 @@ def receive_deferred_messages(
683685
:caption: Receive deferred messages from ServiceBus.
684686
685687
"""
688+
if kwargs:
689+
warnings.warn(f"Unsupported keyword args: {kwargs}")
686690
self._check_live()
687691
if timeout is not None and timeout <= 0:
688692
raise ValueError("The timeout must be greater than 0.")
@@ -734,7 +738,8 @@ def peek_messages(
734738
max_message_count: int = 1,
735739
*,
736740
sequence_number: int = 0,
737-
timeout: Optional[float] = None
741+
timeout: Optional[float] = None,
742+
**kwargs: Any
738743
) -> List[ServiceBusReceivedMessage]:
739744
"""Browse messages currently pending in the queue.
740745
@@ -759,6 +764,8 @@ def peek_messages(
759764
:caption: Look at pending messages in the queue.
760765
761766
"""
767+
if kwargs:
768+
warnings.warn(f"Unsupported keyword args: {kwargs}")
762769
self._check_live()
763770
if timeout is not None and timeout <= 0:
764771
raise ValueError("The timeout must be greater than 0.")
@@ -899,7 +906,8 @@ def renew_message_lock(
899906
self,
900907
message: ServiceBusReceivedMessage,
901908
*,
902-
timeout: Optional[float] = None
909+
timeout: Optional[float] = None,
910+
**kwargs: Any
903911
) -> datetime.datetime:
904912
# pylint: disable=protected-access,no-member
905913
"""Renew the message lock.
@@ -933,6 +941,8 @@ def renew_message_lock(
933941
:caption: Renew the lock on a received message.
934942
935943
"""
944+
if kwargs:
945+
warnings.warn(f"Unsupported keyword args: {kwargs}")
936946
# type: ignore
937947
try:
938948
if self.session:

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import time
77
import uuid
88
import datetime
9+
import warnings
910
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast
1011

1112
import uamqp
@@ -278,7 +279,8 @@ def schedule_messages(
278279
messages: "MessageTypes",
279280
schedule_time_utc: datetime.datetime,
280281
*,
281-
timeout: Optional[float] = None
282+
timeout: Optional[float] = None,
283+
**kwargs: Any
282284
) -> List[int]:
283285
"""Send Message or multiple Messages to be enqueued at a specific time.
284286
Returns a list of the sequence numbers of the enqueued messages.
@@ -301,6 +303,8 @@ def schedule_messages(
301303
:dedent: 4
302304
:caption: Schedule a message to be sent in future
303305
"""
306+
if kwargs:
307+
warnings.warn(f"Unsupported keyword args: {kwargs}")
304308
# pylint: disable=protected-access
305309

306310
self._check_live()
@@ -332,7 +336,8 @@ def cancel_scheduled_messages(
332336
self,
333337
sequence_numbers: Union[int, List[int]],
334338
*,
335-
timeout: Optional[float] = None
339+
timeout: Optional[float] = None,
340+
**kwargs: Any
336341
) -> None:
337342
"""
338343
Cancel one or more messages that have previously been scheduled and are still pending.
@@ -354,6 +359,8 @@ def cancel_scheduled_messages(
354359
:dedent: 4
355360
:caption: Cancelling messages scheduled to be sent in future
356361
"""
362+
if kwargs:
363+
warnings.warn(f"Unsupported keyword args: {kwargs}")
357364
self._check_live()
358365
if timeout is not None and timeout <= 0:
359366
raise ValueError("The timeout must be greater than 0.")
@@ -375,7 +382,8 @@ def send_messages(
375382
self,
376383
message: Union["MessageTypes", ServiceBusMessageBatch],
377384
*,
378-
timeout: Optional[float] = None
385+
timeout: Optional[float] = None,
386+
**kwargs: Any
379387
) -> None:
380388
"""Sends message and blocks until acknowledgement is received or operation times out.
381389
@@ -407,6 +415,8 @@ def send_messages(
407415
:caption: Send message.
408416
409417
"""
418+
if kwargs:
419+
warnings.warn(f"Unsupported keyword args: {kwargs}")
410420
self._check_live()
411421
if timeout is not None and timeout <= 0:
412422
raise ValueError("The timeout must be greater than 0.")

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
# --------------------------------------------------------------------------------------------
55
import logging
66
import datetime
7-
from typing import TYPE_CHECKING, Union, Optional
7+
import warnings
8+
from typing import TYPE_CHECKING, Any, Union, Optional
89
import six
910

1011
from ._common.utils import utc_from_timestamp, utc_now
@@ -88,15 +89,15 @@ class ServiceBusSession(BaseSession):
8889
:caption: Get session from a receiver
8990
"""
9091

91-
def get_state(self, *, timeout: Optional[float] = None) -> bytes:
92+
def get_state(self, *, timeout: Optional[float] = None, **kwargs: Any) -> bytes:
9293
# pylint: disable=protected-access
9394
"""Get the session state.
9495
9596
Returns None if no state has been set.
9697
9798
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
9899
greater than 0 if specified. The default value is None, meaning no timeout.
99-
:rtype: str
100+
:rtype: bytes
100101
101102
.. admonition:: Example:
102103
@@ -107,6 +108,8 @@ def get_state(self, *, timeout: Optional[float] = None) -> bytes:
107108
:dedent: 4
108109
:caption: Get the session state
109110
"""
111+
if kwargs:
112+
warnings.warn(f"Unsupported keyword args: {kwargs}")
110113
self._receiver._check_live() # pylint: disable=protected-access
111114
if timeout is not None and timeout <= 0:
112115
raise ValueError("The timeout must be greater than 0.")
@@ -119,7 +122,7 @@ def get_state(self, *, timeout: Optional[float] = None) -> bytes:
119122
session_state = response.get(MGMT_RESPONSE_SESSION_STATE) # type: ignore
120123
return session_state
121124

122-
def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None) -> None:
125+
def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None, **kwargs: Any) -> None:
123126
# pylint: disable=protected-access
124127
"""Set the session state.
125128
@@ -137,6 +140,8 @@ def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[fl
137140
:dedent: 4
138141
:caption: Set the session state
139142
"""
143+
if kwargs:
144+
warnings.warn(f"Unsupported keyword args: {kwargs}")
140145
self._receiver._check_live() # pylint: disable=protected-access
141146
if timeout is not None and timeout <= 0:
142147
raise ValueError("The timeout must be greater than 0.")
@@ -153,7 +158,7 @@ def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[fl
153158
timeout=timeout,
154159
)
155160

156-
def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime:
161+
def renew_lock(self, *, timeout: Optional[float] = None, **kwargs: Any) -> datetime.datetime:
157162
# pylint: disable=protected-access
158163
"""Renew the session lock.
159164
@@ -179,6 +184,8 @@ def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime:
179184
:dedent: 4
180185
:caption: Renew the session lock before it expires
181186
"""
187+
if kwargs:
188+
warnings.warn(f"Unsupported keyword args: {kwargs}")
182189
self._receiver._check_live() # pylint: disable=protected-access
183190
if timeout is not None and timeout <= 0:
184191
raise ValueError("The timeout must be greater than 0.")

0 commit comments

Comments
 (0)