Skip to content

Commit 9c9ec82

Browse files
Adding MessageState property to ServiceBusReceivedMessage (Azure#25446)
* Adding MessageState property to ServiceBusReceivedMessage * Rename enum * Hide previous method * Augment a few tests * Update API spec * Apply suggestions from code review Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> * Update ServiceBusModelFactory.cs Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
1 parent 44b2474 commit 9c9ec82

File tree

10 files changed

+107
-3
lines changed

10 files changed

+107
-3
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ internal ServiceBusMessageBatch() { }
185185
public void Dispose() { }
186186
public bool TryAddMessage(Azure.Messaging.ServiceBus.ServiceBusMessage message) { throw null; }
187187
}
188+
public enum ServiceBusMessageState
189+
{
190+
Active = 0,
191+
Deferred = 1,
192+
Scheduled = 2,
193+
}
188194
public static partial class ServiceBusModelFactory
189195
{
190196
public static Azure.Messaging.ServiceBus.Administration.QueueProperties QueueProperties(string name, System.TimeSpan lockDuration, long maxSizeInMegabytes, bool requiresDuplicateDetection, bool requiresSession, System.TimeSpan defaultMessageTimeToLive, System.TimeSpan autoDeleteOnIdle, bool deadLetteringOnMessageExpiration, System.TimeSpan duplicateDetectionHistoryTimeWindow, int maxDeliveryCount, bool enableBatchedOperations, Azure.Messaging.ServiceBus.Administration.EntityStatus status, string forwardTo, string forwardDeadLetteredMessagesTo, string userMetadata, bool enablePartitioning) { throw null; }
@@ -193,7 +199,9 @@ public static partial class ServiceBusModelFactory
193199
public static Azure.Messaging.ServiceBus.Administration.QueueRuntimeProperties QueueRuntimeProperties(string name, long activeMessageCount = (long)0, long scheduledMessageCount = (long)0, long deadLetterMessageCount = (long)0, long transferDeadLetterMessageCount = (long)0, long transferMessageCount = (long)0, long totalMessageCount = (long)0, long sizeInBytes = (long)0, System.DateTimeOffset createdAt = default(System.DateTimeOffset), System.DateTimeOffset updatedAt = default(System.DateTimeOffset), System.DateTimeOffset accessedAt = default(System.DateTimeOffset)) { throw null; }
194200
public static Azure.Messaging.ServiceBus.Administration.RuleProperties RuleProperties(string name, Azure.Messaging.ServiceBus.Administration.RuleFilter filter = null, Azure.Messaging.ServiceBus.Administration.RuleAction action = null) { throw null; }
195201
public static Azure.Messaging.ServiceBus.ServiceBusMessageBatch ServiceBusMessageBatch(long batchSizeBytes, System.Collections.Generic.IList<Azure.Messaging.ServiceBus.ServiceBusMessage> batchMessageStore, Azure.Messaging.ServiceBus.CreateMessageBatchOptions batchOptions = null, System.Func<Azure.Messaging.ServiceBus.ServiceBusMessage, bool> tryAddCallback = null) { throw null; }
196-
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary<string, object> properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset)) { throw null; }
202+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
203+
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body, string messageId, string partitionKey, string viaPartitionKey, string sessionId, string replyToSessionId, System.TimeSpan timeToLive, string correlationId, string subject, string to, string contentType, string replyTo, System.DateTimeOffset scheduledEnqueueTime, System.Collections.Generic.IDictionary<string, object> properties, System.Guid lockTokenGuid, int deliveryCount, System.DateTimeOffset lockedUntil, long sequenceNumber, string deadLetterSource, long enqueuedSequenceNumber, System.DateTimeOffset enqueuedTime) { throw null; }
204+
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ServiceBusReceivedMessage(System.BinaryData body = null, string messageId = null, string partitionKey = null, string viaPartitionKey = null, string sessionId = null, string replyToSessionId = null, System.TimeSpan timeToLive = default(System.TimeSpan), string correlationId = null, string subject = null, string to = null, string contentType = null, string replyTo = null, System.DateTimeOffset scheduledEnqueueTime = default(System.DateTimeOffset), System.Collections.Generic.IDictionary<string, object> properties = null, System.Guid lockTokenGuid = default(System.Guid), int deliveryCount = 0, System.DateTimeOffset lockedUntil = default(System.DateTimeOffset), long sequenceNumber = (long)-1, string deadLetterSource = null, long enqueuedSequenceNumber = (long)0, System.DateTimeOffset enqueuedTime = default(System.DateTimeOffset), Azure.Messaging.ServiceBus.ServiceBusMessageState serviceBusMessageState = Azure.Messaging.ServiceBus.ServiceBusMessageState.Active) { throw null; }
197205
public static Azure.Messaging.ServiceBus.Administration.SubscriptionProperties SubscriptionProperties(string topicName, string subscriptionName, System.TimeSpan lockDuration = default(System.TimeSpan), bool requiresSession = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), bool deadLetteringOnMessageExpiration = false, int maxDeliveryCount = 0, bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Administration.EntityStatus status = default(Azure.Messaging.ServiceBus.Administration.EntityStatus), string forwardTo = null, string forwardDeadLetteredMessagesTo = null, string userMetadata = null) { throw null; }
198206
public static Azure.Messaging.ServiceBus.Administration.SubscriptionRuntimeProperties SubscriptionRuntimeProperties(string topicName, string subscriptionName, long activeMessageCount = (long)0, long deadLetterMessageCount = (long)0, long transferDeadLetterMessageCount = (long)0, long transferMessageCount = (long)0, long totalMessageCount = (long)0, System.DateTimeOffset createdAt = default(System.DateTimeOffset), System.DateTimeOffset updatedAt = default(System.DateTimeOffset), System.DateTimeOffset accessedAt = default(System.DateTimeOffset)) { throw null; }
199207
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -270,6 +278,7 @@ internal ServiceBusReceivedMessage() { }
270278
public System.DateTimeOffset ScheduledEnqueueTime { get { throw null; } }
271279
public long SequenceNumber { get { throw null; } }
272280
public string SessionId { get { throw null; } }
281+
public Azure.Messaging.ServiceBus.ServiceBusMessageState State { get { throw null; } }
273282
public string Subject { get { throw null; } }
274283
public System.TimeSpan TimeToLive { get { throw null; } }
275284
public string To { get { throw null; } }

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal class AmqpMessageConstants
2020
internal const string PartitionIdName = "x-opt-partition-id";
2121
internal const string ViaPartitionKeyName = "x-opt-via-partition-key";
2222
internal const string DeadLetterSourceName = "x-opt-deadletter-source";
23+
internal const string MessageStateName = "x-opt-message-state";
2324
internal const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
2425
internal const string UriName = AmqpConstants.Vendor + ":uri";
2526
internal const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset";

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,13 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM
469469
case AmqpMessageConstants.DeadLetterSourceName:
470470
annotatedMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = pair.Value;
471471
break;
472+
case AmqpMessageConstants.MessageStateName:
473+
int enumValue = (int)pair.Value;
474+
if (Enum.IsDefined(typeof(ServiceBusMessageState), enumValue))
475+
{
476+
annotatedMessage.MessageAnnotations[AmqpMessageConstants.MessageStateName] = (ServiceBusMessageState)enumValue;
477+
}
478+
break;
472479
default:
473480
if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
474481
{

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage)
115115
{
116116
if (kvp.Key == AmqpMessageConstants.LockedUntilName || kvp.Key == AmqpMessageConstants.SequenceNumberName ||
117117
kvp.Key == AmqpMessageConstants.DeadLetterSourceName || kvp.Key == AmqpMessageConstants.EnqueueSequenceNumberName ||
118-
kvp.Key == AmqpMessageConstants.EnqueuedTimeUtcName)
118+
kvp.Key == AmqpMessageConstants.EnqueuedTimeUtcName || kvp.Key == AmqpMessageConstants.MessageStateName)
119119
{
120120
continue;
121121
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
namespace Azure.Messaging.ServiceBus
5+
{
6+
/// <summary>Represents the message state of the <see cref="ServiceBusReceivedMessage"/></summary>
7+
public enum ServiceBusMessageState
8+
{
9+
/// <summary>Specifies an active message state.</summary>
10+
Active = 0,
11+
12+
/// <summary>Specifies a deferred message state.</summary>
13+
Deferred = 1,
14+
15+
/// <summary>Specifies the scheduled message state.</summary>
16+
Scheduled = 2
17+
}
18+
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,36 @@ namespace Azure.Messaging.ServiceBus
1717
/// </summary>
1818
public static class ServiceBusModelFactory
1919
{
20+
/// <summary>
21+
/// Creates a new ServiceBusReceivedMessage instance for mocking.
22+
/// </summary>
23+
[EditorBrowsable(EditorBrowsableState.Never)]
24+
public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
25+
BinaryData body,
26+
string messageId,
27+
string partitionKey,
28+
string viaPartitionKey,
29+
string sessionId,
30+
string replyToSessionId,
31+
TimeSpan timeToLive,
32+
string correlationId,
33+
string subject,
34+
string to,
35+
string contentType,
36+
string replyTo,
37+
DateTimeOffset scheduledEnqueueTime,
38+
IDictionary<string, object> properties,
39+
Guid lockTokenGuid,
40+
int deliveryCount,
41+
DateTimeOffset lockedUntil,
42+
long sequenceNumber,
43+
string deadLetterSource,
44+
long enqueuedSequenceNumber,
45+
DateTimeOffset enqueuedTime) =>
46+
ServiceBusReceivedMessage(body, messageId, partitionKey, viaPartitionKey, sessionId, replyToSessionId,
47+
timeToLive, correlationId, subject, to, contentType, replyTo, scheduledEnqueueTime, properties,
48+
lockTokenGuid, deliveryCount, lockedUntil, sequenceNumber, deadLetterSource, enqueuedSequenceNumber, enqueuedTime, ServiceBusMessageState.Active);
49+
2050
/// <summary>
2151
/// Creates a new ServiceBusReceivedMessage instance for mocking.
2252
/// </summary>
@@ -41,7 +71,8 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
4171
long sequenceNumber = -1,
4272
string deadLetterSource = default,
4373
long enqueuedSequenceNumber = default,
44-
DateTimeOffset enqueuedTime = default)
74+
DateTimeOffset enqueuedTime = default,
75+
ServiceBusMessageState serviceBusMessageState = default)
4576
{
4677
var amqpMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(new ReadOnlyMemory<byte>[] { body }));
4778

@@ -98,6 +129,7 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
98129
amqpMessage.MessageAnnotations[AmqpMessageConstants.DeadLetterSourceName] = deadLetterSource;
99130
amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueueSequenceNumberName] = enqueuedSequenceNumber;
100131
amqpMessage.MessageAnnotations[AmqpMessageConstants.EnqueuedTimeUtcName] = enqueuedTime.UtcDateTime;
132+
amqpMessage.MessageAnnotations[AmqpMessageConstants.MessageStateName] = serviceBusMessageState;
101133

102134
return new ServiceBusReceivedMessage(amqpMessage)
103135
{

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusReceivedMessage.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,31 @@ public string DeadLetterErrorDescription
408408
}
409409
}
410410

411+
/// <summary>Gets the state of the message.</summary>
412+
/// <value>The state of the message. </value>
413+
/// <remarks>
414+
/// The state of the message can be Active, Deferred, or Scheduled. Deferred messages have Deferred state,
415+
/// scheduled messages have Scheduled state, all other messages have Active state.
416+
/// </remarks>
417+
public ServiceBusMessageState State
418+
{
419+
get
420+
{
421+
if (AmqpMessage.MessageAnnotations.TryGetValue(
422+
AmqpMessageConstants.MessageStateName,
423+
out object val))
424+
{
425+
return (ServiceBusMessageState)val;
426+
}
427+
428+
return default;
429+
}
430+
internal set
431+
{
432+
AmqpMessage.MessageAnnotations[AmqpMessageConstants.MessageStateName] = value;
433+
}
434+
}
435+
411436
/// <summary>Returns a string that represents the current message.</summary>
412437
/// <returns>The string representation of the current message.</returns>
413438
public override string ToString()

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public async Task CreateFromReceivedMessageCopiesProperties()
152152
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueueSequenceNumberName));
153153
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueuedTimeUtcName));
154154
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.DeadLetterSourceName));
155+
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.MessageStateName));
155156
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterReasonHeader));
156157
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterErrorDescriptionHeader));
157158

@@ -209,6 +210,7 @@ public async Task CreateFromReceivedMessageCopiesPropertiesTopic()
209210
Assert.IsTrue(rawReceived.MessageAnnotations.ContainsKey(AmqpMessageConstants.SequenceNumberName));
210211
Assert.IsTrue(rawReceived.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueueSequenceNumberName));
211212
Assert.IsTrue(rawReceived.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueuedTimeUtcName));
213+
Assert.IsTrue(rawReceived.MessageAnnotations.ContainsKey(AmqpMessageConstants.MessageStateName));
212214

213215
AssertMessagesEqual(msg, received);
214216
var toSend = new ServiceBusMessage(received);
@@ -222,6 +224,7 @@ public async Task CreateFromReceivedMessageCopiesPropertiesTopic()
222224
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueueSequenceNumberName));
223225
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueuedTimeUtcName));
224226
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.DeadLetterSourceName));
227+
Assert.IsFalse(rawSend.MessageAnnotations.ContainsKey(AmqpMessageConstants.MessageStateName));
225228
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterReasonHeader));
226229
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterErrorDescriptionHeader));
227230

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ public async Task DeferMessagesList()
490490
{
491491
Assert.AreEqual(messageList[i].MessageId, deferredMessages[i].MessageId);
492492
Assert.AreEqual(messageList[i].Body.ToArray(), deferredMessages[i].Body.ToArray());
493+
Assert.AreEqual(ServiceBusMessageState.Deferred, deferredMessages[i]);
493494
}
494495

495496
// verify that looking up a non-existent sequence number will throw
@@ -544,6 +545,7 @@ public async Task DeferMessagesArray()
544545
{
545546
Assert.AreEqual(messageList[i].MessageId, deferredMessages[i].MessageId);
546547
Assert.AreEqual(messageList[i].Body.ToArray(), deferredMessages[i].Body.ToArray());
548+
Assert.AreEqual(ServiceBusMessageState.Deferred, deferredMessages[i]);
547549
}
548550

549551
// verify that an empty array can be passed
@@ -600,6 +602,7 @@ IEnumerable<long> GetEnumerable()
600602
{
601603
Assert.AreEqual(messageList[i].MessageId, deferredMessages[i].MessageId);
602604
Assert.AreEqual(messageList[i].Body.ToArray(), deferredMessages[i].Body.ToArray());
605+
Assert.AreEqual(ServiceBusMessageState.Deferred, deferredMessages[i]);
603606
}
604607

605608
// verify that an empty enumerable can be passed
@@ -626,10 +629,12 @@ public async Task CanPeekADeferredMessage()
626629
var peekedMsg = await receiver.PeekMessageAsync();
627630
Assert.AreEqual(receivedMsg.MessageId, peekedMsg.MessageId);
628631
Assert.AreEqual(receivedMsg.SequenceNumber, peekedMsg.SequenceNumber);
632+
Assert.AreEqual(ServiceBusMessageState.Deferred, receivedMsg.State);
629633

630634
var deferredMsg = await receiver.ReceiveDeferredMessageAsync(peekedMsg.SequenceNumber);
631635
Assert.AreEqual(peekedMsg.MessageId, deferredMsg.MessageId);
632636
Assert.AreEqual(peekedMsg.SequenceNumber, deferredMsg.SequenceNumber);
637+
Assert.AreEqual(peekedMsg.State, receivedMsg.State);
633638
}
634639
}
635640

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderLiveTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ public async Task Schedule()
190190
await using var receiver = client.CreateReceiver(scope.QueueName);
191191
ServiceBusReceivedMessage msg = await receiver.PeekMessageAsync(seq);
192192
Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTime.Ticks).TotalSeconds));
193+
Assert.AreEqual(ServiceBusMessageState.Scheduled, msg.State);
193194

194195
await sender.CancelScheduledMessageAsync(seq);
195196
msg = await receiver.PeekMessageAsync(seq);
@@ -211,6 +212,7 @@ public async Task ScheduleMultipleArray()
211212
{
212213
ServiceBusReceivedMessage msg = await receiver.PeekMessageAsync(seq);
213214
Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTime.Ticks).TotalSeconds));
215+
Assert.AreEqual(ServiceBusMessageState.Scheduled, msg.State);
214216
}
215217
await sender.CancelScheduledMessagesAsync(sequenceNumbers: sequenceNums);
216218

@@ -244,6 +246,7 @@ public async Task ScheduleMultipleList()
244246
{
245247
ServiceBusReceivedMessage msg = await receiver.PeekMessageAsync(seq);
246248
Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTime.Ticks).TotalSeconds));
249+
Assert.AreEqual(ServiceBusMessageState.Scheduled, msg.State);
247250
}
248251
await sender.CancelScheduledMessagesAsync(sequenceNumbers: new List<long>(sequenceNums));
249252

@@ -277,6 +280,7 @@ public async Task ScheduleMultipleEnumerable()
277280
{
278281
ServiceBusReceivedMessage msg = await receiver.PeekMessageAsync(seq);
279282
Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTime.Ticks).TotalSeconds));
283+
Assert.AreEqual(ServiceBusMessageState.Scheduled, msg.State);
280284
}
281285

282286
// use an enumerable

0 commit comments

Comments
 (0)