Skip to content

Commit 52a74ed

Browse files
Add messsage state to SB received message (Azure#22837)
* initial commit * working * lint * tests * comments * Update sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com> * version * typo * int enum * oops Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
1 parent 666f950 commit 52a74ed

File tree

7 files changed

+140
-4
lines changed

7 files changed

+140
-4
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# Release History
22

3-
## 7.5.1 (Unreleased)
3+
## 7.6.0 (Unreleased)
44

55
### Features Added
66

7+
- Introduce `ServiceBusMessageState` enum that can assume the values of `active`, `scheduled` or `deferred`.
8+
- Add `ServiceBusMessageState` property in `ServiceBusReceivedMessage`.
9+
710
### Breaking Changes
811

912
### Bugs Fixed

sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ._common.constants import (
2222
ServiceBusReceiveMode,
2323
ServiceBusSubQueue,
24+
ServiceBusMessageState,
2425
ServiceBusSessionFilter,
2526
NEXT_AVAILABLE_SESSION,
2627
)
@@ -35,6 +36,7 @@
3536
__all__ = [
3637
"ServiceBusMessage",
3738
"ServiceBusMessageBatch",
39+
"ServiceBusMessageState",
3840
"ServiceBusReceivedMessage",
3941
"NEXT_AVAILABLE_SESSION",
4042
"ServiceBusSubQueue",

sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,16 @@
165165
MAX_DURATION_VALUE = 922337203685477
166166
# equivalent to .NET Date("9999-12-31T07:59:59.000Z").getTime() in ms
167167
MAX_ABSOLUTE_EXPIRY_TIME = 253402243199000
168-
168+
MESSAGE_STATE_NAME = b"x-opt-message-state"
169169

170170
class ServiceBusReceiveMode(str, Enum):
171171
PEEK_LOCK = "peeklock"
172172
RECEIVE_AND_DELETE = "receiveanddelete"
173173

174+
class ServiceBusMessageState(int, Enum):
175+
ACTIVE = 0
176+
DEFERRED = 1
177+
SCHEDULED = 2
174178

175179
# To enable extensible string enums for the public facing parameter, and translate to the "real" uamqp constants.
176180
ServiceBusToAMQPReceiveModeMap = {

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from .constants import (
2020
_BATCH_MESSAGE_OVERHEAD_COST,
2121
ServiceBusReceiveMode,
22+
ServiceBusMessageState,
2223
_X_OPT_ENQUEUED_TIME,
2324
_X_OPT_SEQUENCE_NUMBER,
2425
_X_OPT_ENQUEUE_SEQUENCE_NUMBER,
@@ -35,6 +36,7 @@
3536
MESSAGE_PROPERTY_MAX_LENGTH,
3637
MAX_ABSOLUTE_EXPIRY_TIME,
3738
MAX_DURATION_VALUE,
39+
MESSAGE_STATE_NAME
3840
)
3941
from ..amqp import (
4042
AmqpAnnotatedMessage,
@@ -967,6 +969,23 @@ def dead_letter_source(self):
967969
pass
968970
return None
969971

972+
@property
973+
def message_state(self):
974+
# type: () -> ServiceBusMessageState
975+
"""
976+
Defaults to Active. Represents the message state of the message. Can be Active, Deferred.
977+
or Scheduled.
978+
979+
:rtype: ~azure.servicebus.ServiceBusMessageState
980+
"""
981+
try:
982+
message_state = self._raw_amqp_message.annotations.get(MESSAGE_STATE_NAME)
983+
if not message_state:
984+
return ServiceBusMessageState.ACTIVE
985+
return ServiceBusMessageState(message_state)
986+
except AttributeError:
987+
return ServiceBusMessageState.ACTIVE
988+
970989
@property
971990
def delivery_count(self):
972991
# type: () -> Optional[int]

sdk/servicebus/azure-servicebus/azure/servicebus/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
# Licensed under the MIT License.
44
# ------------------------------------
55

6-
VERSION = "7.5.1"
6+
VERSION = "7.6.0"

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
TransportType,
2929
ServiceBusReceiveMode,
3030
ServiceBusSubQueue,
31+
ServiceBusMessageState
3132
)
3233
from azure.servicebus.amqp import (
3334
AmqpMessageHeader,
@@ -2123,3 +2124,57 @@ async def test_queue_async_send_amqp_annotated_message(self, servicebus_namespac
21232124
assert recv_sequence_msg == 3
21242125
assert recv_value_msg == 3
21252126
assert normal_msg == 4
2127+
2128+
2129+
@pytest.mark.liveTest
2130+
@pytest.mark.live_test_only
2131+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
2132+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
2133+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
2134+
async def test_message_state_scheduled_async(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
2135+
async with ServiceBusClient.from_connection_string(
2136+
servicebus_namespace_connection_string) as sb_client:
2137+
2138+
sender = sb_client.get_queue_sender(servicebus_queue.name)
2139+
async with sender:
2140+
for i in range(10):
2141+
message = ServiceBusMessage("message no. {}".format(i))
2142+
scheduled_time_utc = datetime.utcnow() + timedelta(seconds=30)
2143+
sequence_number = await sender.schedule_messages(message, scheduled_time_utc)
2144+
2145+
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
2146+
async with receiver:
2147+
messages = await receiver.peek_messages()
2148+
for msg in messages:
2149+
assert msg.message_state == ServiceBusMessageState.SCHEDULED
2150+
2151+
2152+
@pytest.mark.liveTest
2153+
@pytest.mark.live_test_only
2154+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
2155+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
2156+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
2157+
async def test_message_state_deferred_async(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
2158+
async with ServiceBusClient.from_connection_string(
2159+
servicebus_namespace_connection_string) as sb_client:
2160+
2161+
sender = sb_client.get_queue_sender(servicebus_queue.name)
2162+
async with sender:
2163+
for i in range(10):
2164+
message = ServiceBusMessage("message no. {}".format(i))
2165+
await sender.send_messages(message)
2166+
2167+
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
2168+
deferred_messages = []
2169+
async with receiver:
2170+
received_msgs = await receiver.receive_messages()
2171+
for message in received_msgs:
2172+
assert message.message_state == ServiceBusMessageState.ACTIVE
2173+
deferred_messages.append(message.sequence_number)
2174+
await receiver.defer_message(message)
2175+
if deferred_messages:
2176+
received_deferred_msg = await receiver.receive_deferred_messages(
2177+
sequence_numbers=deferred_messages
2178+
)
2179+
for message in received_deferred_msg:
2180+
assert message.message_state == ServiceBusMessageState.DEFERRED

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
ServiceBusReceivedMessage,
2929
ServiceBusReceiveMode,
3030
ServiceBusSubQueue,
31+
ServiceBusMessageState
3132
)
3233
from azure.servicebus.amqp import (
3334
AmqpMessageHeader,
@@ -39,7 +40,8 @@
3940
_X_OPT_LOCK_TOKEN,
4041
_X_OPT_PARTITION_KEY,
4142
_X_OPT_VIA_PARTITION_KEY,
42-
_X_OPT_SCHEDULED_ENQUEUE_TIME
43+
_X_OPT_SCHEDULED_ENQUEUE_TIME,
44+
ServiceBusMessageState
4345
)
4446
from azure.servicebus._common.utils import utc_now
4547
from azure.servicebus.management._models import DictMixin
@@ -2581,3 +2583,54 @@ def test_queue_send_amqp_annotated_message(self, servicebus_namespace_connection
25812583
assert recv_data_msg == 3
25822584
assert recv_value_msg == 3
25832585
assert normal_msg == 4
2586+
2587+
2588+
@pytest.mark.liveTest
2589+
@pytest.mark.live_test_only
2590+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
2591+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
2592+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
2593+
def test_message_state_scheduled(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
2594+
with ServiceBusClient.from_connection_string(
2595+
servicebus_namespace_connection_string) as sb_client:
2596+
2597+
sender = sb_client.get_queue_sender(servicebus_queue.name)
2598+
for i in range(10):
2599+
message = ServiceBusMessage("message no. {}".format(i))
2600+
scheduled_time_utc = datetime.utcnow() + timedelta(seconds=30)
2601+
sequence_number = sender.schedule_messages(message, scheduled_time_utc)
2602+
2603+
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
2604+
with receiver:
2605+
for msg in receiver.peek_messages():
2606+
assert msg.message_state == ServiceBusMessageState.SCHEDULED
2607+
2608+
2609+
@pytest.mark.liveTest
2610+
@pytest.mark.live_test_only
2611+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
2612+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
2613+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
2614+
def test_message_state_deferred(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
2615+
with ServiceBusClient.from_connection_string(
2616+
servicebus_namespace_connection_string) as sb_client:
2617+
2618+
sender = sb_client.get_queue_sender(servicebus_queue.name)
2619+
for i in range(10):
2620+
message = ServiceBusMessage("message no. {}".format(i))
2621+
sender.send_messages(message)
2622+
2623+
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
2624+
deferred_messages = []
2625+
with receiver:
2626+
received_msgs = receiver.receive_messages()
2627+
for message in received_msgs:
2628+
assert message.message_state == ServiceBusMessageState.ACTIVE
2629+
deferred_messages.append(message.sequence_number)
2630+
receiver.defer_message(message)
2631+
if deferred_messages:
2632+
received_deferred_msg = receiver.receive_deferred_messages(
2633+
sequence_numbers=deferred_messages
2634+
)
2635+
for message in received_deferred_msg:
2636+
assert message.message_state == ServiceBusMessageState.DEFERRED

0 commit comments

Comments
 (0)