Skip to content

Commit c0eab07

Browse files
[ServiceBus] Make sub-client initializers internal (add '_' to from_conn_str for receiver/sender) (Azure#14502)
* Make sub-client initializers internal (add '_' to from_conn_str for receiver/sender) Remove via_partition_key until we add transactions as a whole. Rename properties to application_properties Rename amqp_message to amqp_annotated_message for consistency * Fix remaining unit tests (an amqp_annotated_message and internal conn str rename were missed) * fix test_sb_client from_connection_string naming * PR fixes; remove debug prints from a test, and add a line to changelog for label vs subject * lint fix, unneeded import * Update sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py * Apply suggestions from code review Rename Message->ServiceBusMessage in documentation to align with concurrent changes in master. Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com> * Unify Breaking Changes sections in changelog for b8 Rename AMQPMessage to AMQPAnnotatedMessage to match property name and other SDKs. Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
1 parent 57110ff commit c0eab07

17 files changed

+138
-184
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ now raise more concrete exception other than `MessageSettleFailed` and `ServiceB
4848
- Changed `ServiceBusReceivedMessage.renew_lock` to `ServiceBusReceiver.renew_message_lock`
4949
* `AutoLockRenewer.register` now takes `ServiceBusReceiver` as a positional parameter.
5050
* Removed `encoding` support from `ServiceBusMessage`.
51+
* `ServiceBusMessage.amqp_message` has been renamed to `ServiceBusMessage.amqp_annotated_message` for cross-sdk consistency.
52+
* All `name` parameters in `ServiceBusAdministrationClient` are now precisely specified ala `queue_name` or `rule_name`
53+
* `ServiceBusMessage.via_partition_key` is no longer exposed, this is pending a full implementation of transactions as it has no external use. If needed, the underlying value can still be accessed in `ServiceBusMessage.amqp_annotated_message.annotations`.
54+
* `ServiceBusMessage.properties` has been renamed to `ServiceBusMessage.application_properties` for consistency with service verbiage.
55+
* Sub-client (`ServiceBusSender` and `ServiceBusReceiver`) `from_connection_string` initializers have been made internal until needed. Clients should be initialized from root `ServiceBusClient`.
56+
* `ServiceBusMessage.label` has been renamed to `ServiceBusMessage.subject`.
57+
* `ServiceBusMessage.amqp_annotated_message` has had its type renamed from `AMQPMessage` to `AMQPAnnotatedMessage`
5158

5259
**BugFixes**
5360

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 20 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
_X_OPT_SEQUENCE_NUMBER,
2323
_X_OPT_ENQUEUE_SEQUENCE_NUMBER,
2424
_X_OPT_PARTITION_KEY,
25-
_X_OPT_VIA_PARTITION_KEY,
2625
_X_OPT_LOCKED_UNTIL,
2726
_X_OPT_LOCK_TOKEN,
2827
_X_OPT_SCHEDULED_ENQUEUE_TIME,
2928
_X_OPT_DEAD_LETTER_SOURCE,
3029
PROPERTIES_DEAD_LETTER_REASON,
3130
PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION,
3231
ANNOTATION_SYMBOL_PARTITION_KEY,
33-
ANNOTATION_SYMBOL_VIA_PARTITION_KEY,
3432
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME,
3533
ANNOTATION_SYMBOL_KEY_MAP
3634
)
@@ -49,22 +47,21 @@ class ServiceBusMessage(object): # pylint: disable=too-many-public-methods,too-
4947
:param body: The data to send in a single message.
5048
:type body: Union[str, bytes]
5149
52-
:keyword dict properties: The user defined properties on the message.
50+
:keyword dict application_properties: The user defined properties on the message.
5351
:keyword str session_id: The session identifier of the message for a sessionful entity.
5452
:keyword str message_id: The id to identify the message.
5553
:keyword datetime.datetime scheduled_enqueue_time_utc: The utc scheduled enqueue time to the message.
5654
:keyword datetime.timedelta time_to_live: The life duration of a message.
5755
:keyword str content_type: The content type descriptor.
5856
:keyword str correlation_id: The correlation identifier.
59-
:keyword str label: The application specific label.
57+
:keyword str subject: The application specific subject, sometimes referred to as label.
6058
:keyword str partition_key: The partition key for sending a message to a partitioned entity.
61-
:keyword str via_partition_key: The partition key for sending a message into an entity via a partitioned
62-
transfer queue.
6359
:keyword str to: The `to` address used for auto_forward chaining scenarios.
6460
:keyword str reply_to: The address of an entity to send replies to.
6561
:keyword str reply_to_session_id: The session identifier augmenting the `reply_to` address.
6662
67-
:ivar AMQPMessage amqp_message: Advanced use only. The internal AMQP message payload that is sent or received.
63+
:ivar AMQPAnnotatedMessage amqp_annotated_message: Advanced use only.
64+
The internal AMQP message payload that is sent or received.
6865
6966
.. admonition:: Example:
7067
@@ -92,21 +89,21 @@ def __init__(self, body, **kwargs):
9289
self._amqp_header = self.message.header
9390
else:
9491
self._build_message(body)
95-
self.properties = kwargs.pop("properties", None)
92+
self.application_properties = kwargs.pop("application_properties", None)
9693
self.session_id = kwargs.pop("session_id", None)
9794
self.message_id = kwargs.get("message_id", None)
9895
self.content_type = kwargs.pop("content_type", None)
9996
self.correlation_id = kwargs.pop("correlation_id", None)
10097
self.to = kwargs.pop("to", None)
10198
self.reply_to = kwargs.pop("reply_to", None)
10299
self.reply_to_session_id = kwargs.pop("reply_to_session_id", None)
103-
self.label = kwargs.pop("label", None)
100+
self.subject = kwargs.pop("subject", None)
104101
self.scheduled_enqueue_time_utc = kwargs.pop("scheduled_enqueue_time_utc", None)
105102
self.time_to_live = kwargs.pop("time_to_live", None)
106103
self.partition_key = kwargs.pop("partition_key", None)
107-
self.via_partition_key = kwargs.pop("via_partition_key", None)
108-
# If message is the full message, amqp_message is the "public facing interface" for what we expose.
109-
self.amqp_message = AMQPMessage(self.message) # type: AMQPMessage
104+
105+
# If message is the full message, amqp_annotated_message is the "public facing interface" for what we expose.
106+
self.amqp_annotated_message = AMQPAnnotatedMessage(self.message) # type: AMQPAnnotatedMessage
110107

111108
def __str__(self):
112109
return str(self.message)
@@ -167,16 +164,16 @@ def session_id(self, value):
167164
self._amqp_properties.group_id = value
168165

169166
@property
170-
def properties(self):
167+
def application_properties(self):
171168
# type: () -> dict
172169
"""The user defined properties on the message.
173170
174171
:rtype: dict
175172
"""
176173
return self.message.application_properties
177174

178-
@properties.setter
179-
def properties(self, value):
175+
@application_properties.setter
176+
def application_properties(self, value):
180177
# type: (dict) -> None
181178
self.message.application_properties = value
182179

@@ -207,33 +204,6 @@ def partition_key(self, value):
207204
# type: (str) -> None
208205
self._set_message_annotations(_X_OPT_PARTITION_KEY, value)
209206

210-
@property
211-
def via_partition_key(self):
212-
# type: () -> Optional[str]
213-
""" The partition key for sending a message into an entity via a partitioned transfer queue.
214-
215-
If a message is sent via a transfer queue in the scope of a transaction, this value selects the transfer
216-
queue partition: This is functionally equivalent to `partition_key` and ensures that messages are kept
217-
together and in order as they are transferred.
218-
219-
See Transfers and Send Via in
220-
`https://docs.microsoft.com/azure/service-bus-messaging/service-bus-transactions#transfers-and-send-via`.
221-
222-
:rtype: str
223-
"""
224-
via_p_key = None
225-
try:
226-
via_p_key = self.message.annotations.get(_X_OPT_VIA_PARTITION_KEY) or \
227-
self.message.annotations.get(ANNOTATION_SYMBOL_VIA_PARTITION_KEY)
228-
return via_p_key.decode('UTF-8')
229-
except (AttributeError, UnicodeDecodeError):
230-
return via_p_key
231-
232-
@via_partition_key.setter
233-
def via_partition_key(self, value):
234-
# type: (str) -> None
235-
self._set_message_annotations(_X_OPT_VIA_PARTITION_KEY, value)
236-
237207
@property
238208
def time_to_live(self):
239209
# type: () -> Optional[datetime.timedelta]
@@ -349,9 +319,9 @@ def correlation_id(self, val):
349319
self._amqp_properties.correlation_id = val
350320

351321
@property
352-
def label(self):
322+
def subject(self):
353323
# type: () -> str
354-
"""The application specific label.
324+
"""The application specific subject, sometimes referred to as a label.
355325
356326
This property enables the application to indicate the purpose of the message to the receiver in a standardized
357327
fashion, similar to an email subject line.
@@ -363,8 +333,8 @@ def label(self):
363333
except (AttributeError, UnicodeDecodeError):
364334
return self._amqp_properties.subject
365335

366-
@label.setter
367-
def label(self, val):
336+
@subject.setter
337+
def subject(self, val):
368338
# type: (str) -> None
369339
self._amqp_properties.subject = val
370340

@@ -624,17 +594,16 @@ def _to_outgoing_message(self):
624594
body=body,
625595
content_type=self.content_type,
626596
correlation_id=self.correlation_id,
627-
label=self.label,
597+
subject=self.subject,
628598
message_id=self.message_id,
629599
partition_key=self.partition_key,
630-
properties=self.properties,
600+
application_properties=self.application_properties,
631601
reply_to=self.reply_to,
632602
reply_to_session_id=self.reply_to_session_id,
633603
session_id=self.session_id,
634604
scheduled_enqueue_time_utc=self.scheduled_enqueue_time_utc,
635605
time_to_live=self.time_to_live,
636-
to=self.to,
637-
via_partition_key=self.via_partition_key
606+
to=self.to
638607
)
639608

640609
@property
@@ -798,7 +767,7 @@ def locked_until_utc(self):
798767
return self._expiry
799768

800769

801-
class AMQPMessage(object):
770+
class AMQPAnnotatedMessage(object):
802771
"""
803772
The internal AMQP message that this ServiceBusMessage represents. Is read-only.
804773
"""

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5-
import datetime
65
import time
76
import logging
87
import functools
@@ -48,6 +47,7 @@
4847

4948

5049
if TYPE_CHECKING:
50+
import datetime
5151
from azure.core.credentials import TokenCredential
5252

5353
_LOGGER = logging.getLogger(__name__)
@@ -152,6 +152,8 @@ def __iter__(self):
152152
return self._iter_contextual_wrapper()
153153

154154
def _iter_contextual_wrapper(self, max_wait_time=None):
155+
"""The purpose of this wrapper is to allow both state restoration (for multiple concurrent iteration)
156+
and per-iter argument passing that requires the former."""
155157
# pylint: disable=protected-access
156158
original_timeout = None
157159
while True:
@@ -406,7 +408,7 @@ def get_streaming_message_iter(self, max_wait_time=None):
406408
return self._iter_contextual_wrapper(max_wait_time)
407409

408410
@classmethod
409-
def from_connection_string(
411+
def _from_connection_string(
410412
cls,
411413
conn_str,
412414
**kwargs

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
MGMT_REQUEST_MESSAGE,
2828
MGMT_REQUEST_MESSAGES,
2929
MGMT_REQUEST_MESSAGE_ID,
30-
MGMT_REQUEST_PARTITION_KEY,
31-
MGMT_REQUEST_VIA_PARTITION_KEY
30+
MGMT_REQUEST_PARTITION_KEY
3231
)
3332

3433
if TYPE_CHECKING:
@@ -76,8 +75,6 @@ def _build_schedule_request(cls, schedule_time_utc, *messages):
7675
message_data[MGMT_REQUEST_SESSION_ID] = message.session_id
7776
if message.partition_key:
7877
message_data[MGMT_REQUEST_PARTITION_KEY] = message.partition_key
79-
if message.via_partition_key:
80-
message_data[MGMT_REQUEST_VIA_PARTITION_KEY] = message.via_partition_key
8178
message_data[MGMT_REQUEST_MESSAGE] = bytearray(message.message.encode_message())
8279
request_body[MGMT_REQUEST_MESSAGES].append(message_data)
8380
return request_body
@@ -271,7 +268,7 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
271268
)
272269

273270
@classmethod
274-
def from_connection_string(
271+
def _from_connection_string(
275272
cls,
276273
conn_str,
277274
**kwargs

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ def get_streaming_message_iter(
407407
return self._IterContextualWrapper(self, max_wait_time)
408408

409409
@classmethod
410-
def from_connection_string(
410+
def _from_connection_string(
411411
cls,
412412
conn_str: str,
413413
**kwargs: Any

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async def cancel_scheduled_messages(self, sequence_numbers: Union[int, List[int]
217217
)
218218

219219
@classmethod
220-
def from_connection_string(
220+
def _from_connection_string(
221221
cls,
222222
conn_str: str,
223223
**kwargs: Any

sdk/servicebus/azure-servicebus/migration_guide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ semantics with the sender or receiver lifetime.
8484
|---|---|---|
8585
| `azure.servicebus.AutoLockRenew().shutdown()` | `azure.servicebus.AutoLockRenewer().close()` | [Close an auto-lock-renewer](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py) |
8686

87+
### Working with Message properties
88+
| In v0.50 | Equivalent in v7 | Sample |
89+
|---|---|---|
90+
| `azure.servicebus.Message.user_properties` | `azure.servicebus.ServiceBusMessage.application_properties` | Some message properties have been renamed, e.g. accessing the application specific properties of a message. |
91+
8792

8893
## Migration samples
8994

sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def example_create_servicebus_sender_async():
5757
from azure.servicebus.aio import ServiceBusSender
5858
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
5959
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
60-
queue_sender = ServiceBusSender.from_connection_string(
60+
queue_sender = ServiceBusSender._from_connection_string(
6161
conn_str=servicebus_connection_str,
6262
queue_name=queue_name
6363
)
@@ -111,7 +111,7 @@ async def example_create_servicebus_receiver_async():
111111
from azure.servicebus.aio import ServiceBusReceiver
112112
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
113113
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
114-
queue_receiver = ServiceBusReceiver.from_connection_string(
114+
queue_receiver = ServiceBusReceiver._from_connection_string(
115115
conn_str=servicebus_connection_str,
116116
queue_name=queue_name
117117
)

sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def example_create_servicebus_sender_sync():
5353
from azure.servicebus import ServiceBusSender
5454
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
5555
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
56-
queue_sender = ServiceBusSender.from_connection_string(
56+
queue_sender = ServiceBusSender._from_connection_string(
5757
conn_str=servicebus_connection_str,
5858
queue_name=queue_name
5959
)
@@ -107,7 +107,7 @@ def example_create_servicebus_receiver_sync():
107107
from azure.servicebus import ServiceBusReceiver
108108
servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR']
109109
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
110-
queue_receiver = ServiceBusReceiver.from_connection_string(
110+
queue_receiver = ServiceBusReceiver._from_connection_string(
111111
conn_str=servicebus_connection_str,
112112
queue_name=queue_name
113113
)
@@ -244,7 +244,7 @@ def example_send_and_receive_sync():
244244
print("Sequence number: {}".format(message.sequence_number))
245245
print("Enqueued Sequence numger: {}".format(message.enqueued_sequence_number))
246246
print("Partition Key: {}".format(message.partition_key))
247-
print("Properties: {}".format(message.properties))
247+
print("Application Properties: {}".format(message.application_properties))
248248
print("Delivery count: {}".format(message.delivery_count))
249249
print("Message ID: {}".format(message.message_id))
250250
print("Locked until: {}".format(message.locked_until_utc))

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -432,8 +432,8 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe
432432
print_message(_logger, message)
433433
assert message.dead_letter_reason == 'Testing reason'
434434
assert message.dead_letter_error_description == 'Testing description'
435-
assert message.properties[b'DeadLetterReason'] == b'Testing reason'
436-
assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description'
435+
assert message.application_properties[b'DeadLetterReason'] == b'Testing reason'
436+
assert message.application_properties[b'DeadLetterErrorDescription'] == b'Testing description'
437437
await receiver.complete_message(message)
438438
assert count == 10
439439

@@ -549,8 +549,8 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_deadletter(se
549549
count += 1
550550
assert message.dead_letter_reason == 'Testing reason'
551551
assert message.dead_letter_error_description == 'Testing description'
552-
assert message.properties[b'DeadLetterReason'] == b'Testing reason'
553-
assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description'
552+
assert message.application_properties[b'DeadLetterReason'] == b'Testing reason'
553+
assert message.application_properties[b'DeadLetterErrorDescription'] == b'Testing description'
554554
assert count == 10
555555

556556
@pytest.mark.liveTest
@@ -591,8 +591,8 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_retrieve_dead
591591
print_message(_logger, message)
592592
assert message.dead_letter_reason == 'Testing reason'
593593
assert message.dead_letter_error_description == 'Testing description'
594-
assert message.properties[b'DeadLetterReason'] == b'Testing reason'
595-
assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description'
594+
assert message.application_properties[b'DeadLetterReason'] == b'Testing reason'
595+
assert message.application_properties[b'DeadLetterErrorDescription'] == b'Testing description'
596596
await receiver.complete_message(message)
597597
count += 1
598598
assert count == 10
@@ -1265,7 +1265,7 @@ def message_content():
12651265
for i in range(20):
12661266
yield ServiceBusMessage(
12671267
body="ServiceBusMessage no. {}".format(i),
1268-
label='1st'
1268+
subject='1st'
12691269
)
12701270

12711271
sender = sb_client.get_queue_sender(servicebus_queue.name)
@@ -1291,12 +1291,12 @@ def message_content():
12911291
receive_counter += 1
12921292
for message in messages:
12931293
print_message(_logger, message)
1294-
if message.label == '1st':
1294+
if message.subject == '1st':
12951295
message_1st_received_cnt += 1
12961296
await receiver.complete_message(message)
1297-
message.label = '2nd'
1297+
message.subject = '2nd'
12981298
await sender.send_messages(message) # resending received message
1299-
elif message.label == '2nd':
1299+
elif message.subject == '2nd':
13001300
message_2nd_received_cnt += 1
13011301
await receiver.complete_message(message)
13021302

0 commit comments

Comments
 (0)