Skip to content

Commit 31e6898

Browse files
authored
fix bug in session id being empty string and partition key fails to parse empty string (Azure#21160)
1 parent ec2292e commit 31e6898

File tree

6 files changed

+93
-6
lines changed

6 files changed

+93
-6
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
### Bugs Fixed
1010

11+
- Fixed bug that `ServiceBusReceiver` can not connect to sessionful entity with session id being empty string.
12+
- Fixed bug that `ServiceBusMessage.partition_key` can not parse empty string properly.
13+
1114
### Other Changes
1215

1316
## 7.3.4 (2021-10-06)

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ def partition_key(self):
293293
"""
294294
p_key = None
295295
try:
296-
p_key = self._raw_amqp_message.annotations.get( # type: ignore
297-
_X_OPT_PARTITION_KEY # type: ignore
298-
) or self._raw_amqp_message.annotations.get(ANNOTATION_SYMBOL_PARTITION_KEY) # type: ignore
296+
# opt_p_key is used on the incoming message
297+
opt_p_key = self._raw_amqp_message.annotations.get(_X_OPT_PARTITION_KEY) # type: ignore
298+
if opt_p_key is not None:
299+
p_key = opt_p_key
300+
# symbol_p_key is used on the outgoing message
301+
symbol_p_key = self._raw_amqp_message.annotations.get(ANNOTATION_SYMBOL_PARTITION_KEY) # type: ignore
302+
if symbol_p_key is not None:
303+
p_key = symbol_p_key
304+
299305
return p_key.decode("UTF-8") # type: ignore
300306
except (AttributeError, UnicodeDecodeError):
301307
return p_key
@@ -310,7 +316,7 @@ def partition_key(self, value):
310316
)
311317
)
312318

313-
if value and self.session_id and value != self.session_id:
319+
if value and self.session_id is not None and value != self.session_id:
314320
raise ValueError(
315321
"partition_key:{} cannot be set to a different value than session_id:{}".format(
316322
value, self.session_id

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs):
156156

157157
self._populate_attributes(**kwargs)
158158
self._session = (
159-
ServiceBusSession(self._session_id, self) if self._session_id else None
159+
None if self._session_id is None else ServiceBusSession(self._session_id, self)
160160
)
161161

162162
def __iter__(self):

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def __init__(
160160

161161
self._populate_attributes(**kwargs)
162162
self._session = (
163-
ServiceBusSession(self._session_id, self) if self._session_id else None
163+
None if self._session_id is None else ServiceBusSession(self._session_id, self)
164164
)
165165

166166
# Python 3.5 does not allow for yielding from a coroutine, so instead of the try-finally functional wrapper

sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ async def test_async_session_by_session_client_conn_str_receive_handler_peeklock
7878

7979
assert count == 3
8080

81+
session_id = ""
82+
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
83+
for i in range(3):
84+
message = ServiceBusMessage("Handler message no. {}".format(i), session_id=session_id)
85+
await sender.send_messages(message)
86+
87+
with pytest.raises(ServiceBusError):
88+
await sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)._open_with_retry()
89+
90+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id, max_wait_time=5)
91+
count = 0
92+
async for message in receiver:
93+
print_message(_logger, message)
94+
assert message.session_id == session_id
95+
count += 1
96+
await receiver.complete_message(message)
97+
98+
await receiver.close()
99+
100+
assert count == 3
101+
81102
@pytest.mark.liveTest
82103
@pytest.mark.live_test_only
83104
@CachedResourceGroupPreparer(name_prefix='servicebustest')

sdk/servicebus/azure-servicebus/tests/test_sessions.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,63 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi
113113
assert received_cnt_dic['0'] == 2 and received_cnt_dic['1'] == 2 and received_cnt_dic['2'] == 2
114114
assert count == 6
115115

116+
session_id = ""
117+
sender = sb_client.get_queue_sender(servicebus_queue.name)
118+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id, max_wait_time=5)
119+
120+
with sender, receiver:
121+
for i in range(3):
122+
message = ServiceBusMessage("Handler message no. {}".format(i))
123+
124+
message.partition_key = 'pkey'
125+
126+
message.session_id = session_id
127+
message.partition_key = session_id
128+
message.application_properties = {'key': 'value'}
129+
message.subject = 'label'
130+
message.content_type = 'application/text'
131+
message.correlation_id = 'cid'
132+
message.message_id = str(i)
133+
message.to = 'to'
134+
message.reply_to = 'reply_to'
135+
message.reply_to_session_id = 'reply_to_session_id'
136+
137+
with pytest.raises(ValueError):
138+
message.partition_key = 'pkey'
139+
140+
sender.send_messages(message)
141+
142+
with pytest.raises(ServiceBusError):
143+
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)._open_with_retry()
144+
145+
count = 0
146+
received_cnt_dic = {}
147+
for message in receiver:
148+
print_message(_logger, message)
149+
assert message.delivery_count == 0
150+
assert message.application_properties
151+
assert message.application_properties[b'key'] == b'value'
152+
assert message.subject == 'label'
153+
assert message.content_type == 'application/text'
154+
assert message.correlation_id == 'cid'
155+
assert message.partition_key == session_id
156+
assert message.to == 'to'
157+
assert message.reply_to == 'reply_to'
158+
assert message.sequence_number
159+
assert message.enqueued_time_utc
160+
assert message.session_id == session_id
161+
assert message.reply_to_session_id == 'reply_to_session_id'
162+
count += 1
163+
receiver.complete_message(message)
164+
if message.message_id not in received_cnt_dic:
165+
received_cnt_dic[message.message_id] = 1
166+
sender.send_messages(message)
167+
else:
168+
received_cnt_dic[message.message_id] += 1
169+
170+
assert received_cnt_dic['0'] == 2 and received_cnt_dic['1'] == 2 and received_cnt_dic['2'] == 2
171+
assert count == 6
172+
116173
@pytest.mark.liveTest
117174
@pytest.mark.live_test_only
118175
@CachedResourceGroupPreparer(name_prefix='servicebustest')

0 commit comments

Comments
 (0)