Skip to content

Commit 09f0ab0

Browse files
authored
[AMQP] Fix Filter Set Encoding For 2 Char Length Session id (Azure#32860)
* fix for len 2 string * fix for char length * fix pylint * fix to keep the right data value * pylint * switch order * raise error * encode unit tests * get behavior in line with uamqp * modified to add any value * narrow exception * live test * changelog
1 parent 1568f5f commit 09f0ab0

File tree

5 files changed

+72
-22
lines changed

5 files changed

+72
-22
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_encode.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -765,14 +765,19 @@ def encode_filter_set(value):
765765
else:
766766
if isinstance(name, str):
767767
name = name.encode("utf-8") # type: ignore
768-
try:
769-
descriptor, filter_value = data
770-
described_filter = {
771-
TYPE: AMQPTypes.described,
772-
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
773-
}
774-
except ValueError:
775-
described_filter = data
768+
if isinstance(data, (str, bytes)):
769+
described_filter = data # type: ignore
770+
# handle the situation when data is a tuple or list of length 2
771+
else:
772+
try:
773+
descriptor, filter_value = data
774+
described_filter = {
775+
TYPE: AMQPTypes.described,
776+
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
777+
}
778+
# if its not a type that is known, raise the error from the server
779+
except (ValueError, TypeError):
780+
described_filter = data
776781

777782
cast(List, fields[VALUE]).append(
778783
({TYPE: AMQPTypes.symbol, VALUE: name}, described_filter)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import pytest
2+
from azure.eventhub._pyamqp._encode import encode_filter_set
3+
4+
@pytest.mark.parametrize("value,expected", [
5+
({b'com.microsoft:session-filter': 'ababa'}, 'ababa'),
6+
({b'com.microsoft:session-filter': 'abab'}, 'abab'),
7+
({b'com.microsoft:session-filter': 'aba'}, 'aba'),
8+
({b'com.microsoft:session-filter': 'ab'}, 'ab'),
9+
({b'com.microsoft:session-filter': 'a'}, 'a'),
10+
({b'com.microsoft:session-filter': 1}, 1),
11+
])
12+
def test_valid_filter_encode(value, expected):
13+
fields = encode_filter_set(value)
14+
assert len(fields) == 2
15+
assert fields['VALUE'][0][1] == expected
16+
17+

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
# Release History
22

3-
## 7.11.4 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 7.11.4 (2023-11-13)
84

95
### Bugs Fixed
106

11-
### Other Changes
7+
- Fixed a bug where a two character count session id was being incorrectly parsed by azure amqp.
128

139
## 7.11.3 (2023-10-11)
1410

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -765,14 +765,19 @@ def encode_filter_set(value):
765765
else:
766766
if isinstance(name, str):
767767
name = name.encode("utf-8") # type: ignore
768-
try:
769-
descriptor, filter_value = data
770-
described_filter = {
771-
TYPE: AMQPTypes.described,
772-
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
773-
}
774-
except ValueError:
775-
described_filter = data
768+
if isinstance(data, (str, bytes)):
769+
described_filter = data # type: ignore
770+
# handle the situation when data is a tuple or list of length 2
771+
else:
772+
try:
773+
descriptor, filter_value = data
774+
described_filter = {
775+
TYPE: AMQPTypes.described,
776+
VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value),
777+
}
778+
# if its not a type that is known, raise the error from the server
779+
except (ValueError, TypeError):
780+
described_filter = data
776781

777782
cast(List, fields[VALUE]).append(
778783
({TYPE: AMQPTypes.symbol, VALUE: name}, described_filter)

sdk/servicebus/azure-servicebus/tests/test_sessions.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,3 +1300,30 @@ def test_session_non_session_send_to_session_queue_should_fail(self, uamqp_trans
13001300
message = ServiceBusMessage("This should be an invalid non session message")
13011301
with pytest.raises(ServiceBusError):
13021302
sender.send_messages(message)
1303+
1304+
@pytest.mark.liveTest
1305+
@pytest.mark.live_test_only
1306+
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
1307+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
1308+
@ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
1309+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
1310+
@ArgPasser()
1311+
def test_session_id_str_bytes(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
1312+
1313+
with ServiceBusClient.from_connection_string(
1314+
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:
1315+
1316+
sessions = []
1317+
start_time = utc_now()
1318+
for i in range(5):
1319+
sessions.append('a' * (i + 1))
1320+
1321+
for session_id in sessions:
1322+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1323+
message = ServiceBusMessage("Test message no. {}".format(i), session_id=session_id)
1324+
sender.send_messages(message)
1325+
for session_id in sessions:
1326+
with sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id) as receiver:
1327+
messages = receiver.receive_messages(max_wait_time=10)
1328+
assert len(messages) == 1
1329+
assert messages[0].session_id == session_id

0 commit comments

Comments
 (0)