Skip to content

Commit 269268d

Browse files
Support serializing received message to/from AMQP bytes (Azure#33682)
* Support serializing received message to/from AMQP bytes * Use ToMemory to avoid extra allocation * PR fb * PR fb * Update test * Add back TryRead * Add endianness check * Fix comment * Update WebJobs version and remove nuget config change * Fix build break
1 parent 100701e commit 269268d

File tree

12 files changed

+308
-72
lines changed

12 files changed

+308
-72
lines changed

sdk/core/Azure.Core.TestFramework/src/TestEnvironment.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private async Task ExtendResourceGroupExpirationAsync()
309309
string clientSecret = GetOptionalVariable("CLIENT_SECRET");
310310
string authorityHost = GetOptionalVariable("AZURE_AUTHORITY_HOST");
311311

312-
if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null)
312+
if (tenantId == null || clientId == null || clientSecret == null || authorityHost == null || ResourceManagerUrl == null)
313313
{
314314
return;
315315
}

sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,14 @@ public TrueRuleFilter() : base (default(string)) { }
10071007
public override string ToString() { throw null; }
10081008
}
10091009
}
1010+
namespace Azure.Messaging.ServiceBus.Primitives
1011+
{
1012+
public static partial class ServiceBusAmqpExtensions
1013+
{
1014+
public static Azure.Messaging.ServiceBus.ServiceBusReceivedMessage FromAmqpBytes(System.BinaryData messageBytes, System.BinaryData lockTokenBytes) { throw null; }
1015+
public static System.BinaryData ToAmqpBytes(this Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message) { throw null; }
1016+
}
1017+
}
10101018
namespace Microsoft.Extensions.Azure
10111019
{
10121020
public static partial class ServiceBusClientBuilderExtensions

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs

Lines changed: 78 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -171,26 +171,30 @@ private static ArraySegment<byte> ReadStreamToArraySegment(Stream stream)
171171
}
172172
}
173173

174-
public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
174+
public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) => AmqpAnnotatedMessageToAmqpMessage(sbMessage.AmqpMessage);
175+
176+
public virtual AmqpMessage AmqpAnnotatedMessageToAmqpMessage(AmqpAnnotatedMessage annotatedMessage)
175177
{
178+
Argument.AssertNotNull(annotatedMessage, nameof(annotatedMessage));
179+
176180
// body
177-
var amqpMessage = sbMessage.ToAmqpMessage();
181+
AmqpMessage amqpMessage = annotatedMessage.ToAmqpMessage();
178182

179183
// properties
180-
amqpMessage.Properties.MessageId = sbMessage.MessageId;
181-
amqpMessage.Properties.CorrelationId = sbMessage.CorrelationId;
182-
amqpMessage.Properties.ContentType = sbMessage.ContentType;
183-
amqpMessage.Properties.ContentEncoding = sbMessage.AmqpMessage.Properties.ContentEncoding;
184-
amqpMessage.Properties.Subject = sbMessage.Subject;
185-
amqpMessage.Properties.To = sbMessage.To;
186-
amqpMessage.Properties.ReplyTo = sbMessage.ReplyTo;
187-
amqpMessage.Properties.GroupId = sbMessage.SessionId;
188-
amqpMessage.Properties.ReplyToGroupId = sbMessage.ReplyToSessionId;
189-
amqpMessage.Properties.GroupSequence = sbMessage.AmqpMessage.Properties.GroupSequence;
190-
191-
if (sbMessage.AmqpMessage.Properties.UserId.HasValue)
192-
{
193-
ReadOnlyMemory<byte> userId = sbMessage.AmqpMessage.Properties.UserId.Value;
184+
amqpMessage.Properties.MessageId = annotatedMessage.Properties.MessageId?.ToString();
185+
amqpMessage.Properties.CorrelationId = annotatedMessage.Properties.CorrelationId?.ToString();
186+
amqpMessage.Properties.ContentType = annotatedMessage.Properties.ContentType;
187+
amqpMessage.Properties.ContentEncoding = annotatedMessage.Properties.ContentEncoding;
188+
amqpMessage.Properties.Subject = annotatedMessage.Properties.Subject;
189+
amqpMessage.Properties.To = annotatedMessage.Properties.To?.ToString();
190+
amqpMessage.Properties.ReplyTo = annotatedMessage.Properties.ReplyTo?.ToString();
191+
amqpMessage.Properties.GroupId = annotatedMessage.Properties.GroupId;
192+
amqpMessage.Properties.ReplyToGroupId = annotatedMessage.Properties.ReplyToGroupId;
193+
amqpMessage.Properties.GroupSequence = annotatedMessage.Properties.GroupSequence;
194+
195+
if (annotatedMessage.Properties.UserId.HasValue)
196+
{
197+
ReadOnlyMemory<byte> userId = annotatedMessage.Properties.UserId.Value;
194198
if (MemoryMarshal.TryGetArray(userId, out ArraySegment<byte> segment))
195199
{
196200
amqpMessage.Properties.UserId = segment;
@@ -202,14 +206,15 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
202206
}
203207

204208
// If TTL is set, it is used to calculate AbsoluteExpiryTime and CreationTime
205-
if (sbMessage.TimeToLive != TimeSpan.MaxValue)
209+
TimeSpan ttl = annotatedMessage.GetTimeToLive();
210+
if (ttl != TimeSpan.MaxValue)
206211
{
207-
amqpMessage.Header.Ttl = (uint)sbMessage.TimeToLive.TotalMilliseconds;
212+
amqpMessage.Header.Ttl = (uint)ttl.TotalMilliseconds;
208213
amqpMessage.Properties.CreationTime = DateTime.UtcNow;
209214

210-
if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > sbMessage.TimeToLive)
215+
if (AmqpConstants.MaxAbsoluteExpiryTime - amqpMessage.Properties.CreationTime.Value > ttl)
211216
{
212-
amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + sbMessage.TimeToLive;
217+
amqpMessage.Properties.AbsoluteExpiryTime = amqpMessage.Properties.CreationTime.Value + ttl;
213218
}
214219
else
215220
{
@@ -218,38 +223,41 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
218223
}
219224
else
220225
{
221-
if (sbMessage.AmqpMessage.Properties.CreationTime.HasValue)
226+
if (annotatedMessage.Properties.CreationTime.HasValue)
222227
{
223-
amqpMessage.Properties.CreationTime = sbMessage.AmqpMessage.Properties.CreationTime.Value.UtcDateTime;
228+
amqpMessage.Properties.CreationTime = annotatedMessage.Properties.CreationTime.Value.UtcDateTime;
224229
}
225-
if (sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.HasValue)
230+
if (annotatedMessage.Properties.AbsoluteExpiryTime.HasValue)
226231
{
227-
amqpMessage.Properties.AbsoluteExpiryTime = sbMessage.AmqpMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
232+
amqpMessage.Properties.AbsoluteExpiryTime = annotatedMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
228233
}
229234
}
230235

231236
// message annotations
232237

233-
foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.MessageAnnotations)
238+
foreach (KeyValuePair<string, object> kvp in annotatedMessage.MessageAnnotations)
234239
{
235240
switch (kvp.Key)
236241
{
237242
case AmqpMessageConstants.ScheduledEnqueueTimeUtcName:
238-
if ((sbMessage.ScheduledEnqueueTime != null) && sbMessage.ScheduledEnqueueTime > DateTimeOffset.MinValue)
243+
DateTimeOffset scheduledEnqueueTime = annotatedMessage.GetScheduledEnqueueTime();
244+
if (scheduledEnqueueTime != default)
239245
{
240-
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, sbMessage.ScheduledEnqueueTime.UtcDateTime);
246+
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ScheduledEnqueueTimeUtcName, scheduledEnqueueTime.UtcDateTime);
241247
}
242248
break;
243249
case AmqpMessageConstants.PartitionKeyName:
244-
if (sbMessage.PartitionKey != null)
250+
string partitionKey = annotatedMessage.GetPartitionKey();
251+
if (partitionKey != null)
245252
{
246-
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, sbMessage.PartitionKey);
253+
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.PartitionKeyName, partitionKey);
247254
}
248255
break;
249256
case AmqpMessageConstants.ViaPartitionKeyName:
250-
if (sbMessage.TransactionPartitionKey != null)
257+
string viaPartitionKey = annotatedMessage.GetViaPartitionKey();
258+
if (viaPartitionKey != null)
251259
{
252-
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, sbMessage.TransactionPartitionKey);
260+
amqpMessage.MessageAnnotations.Map.Add(AmqpMessageConstants.ViaPartitionKeyName, viaPartitionKey);
253261
}
254262
break;
255263
default:
@@ -260,14 +268,14 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
260268

261269
// application properties
262270

263-
if (sbMessage.ApplicationProperties != null && sbMessage.ApplicationProperties.Count > 0)
271+
if (annotatedMessage.ApplicationProperties.Count > 0)
264272
{
265273
if (amqpMessage.ApplicationProperties == null)
266274
{
267275
amqpMessage.ApplicationProperties = new ApplicationProperties();
268276
}
269277

270-
foreach (KeyValuePair<string, object> pair in sbMessage.ApplicationProperties)
278+
foreach (KeyValuePair<string, object> pair in annotatedMessage.ApplicationProperties)
271279
{
272280
if (TryGetAmqpObjectFromNetObject(pair.Value, MappingType.ApplicationProperty, out var amqpObject))
273281
{
@@ -282,7 +290,7 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
282290

283291
// delivery annotations
284292

285-
foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.DeliveryAnnotations)
293+
foreach (KeyValuePair<string, object> kvp in annotatedMessage.DeliveryAnnotations)
286294
{
287295
if (TryGetAmqpObjectFromNetObject(kvp.Value, MappingType.ApplicationProperty, out var amqpObject))
288296
{
@@ -292,38 +300,37 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
292300

293301
// header - except for ttl which is set above with the properties
294302

295-
if (sbMessage.AmqpMessage.Header.DeliveryCount != null)
303+
if (annotatedMessage.Header.DeliveryCount != null)
296304
{
297-
amqpMessage.Header.DeliveryCount = sbMessage.AmqpMessage.Header.DeliveryCount;
305+
amqpMessage.Header.DeliveryCount = annotatedMessage.Header.DeliveryCount;
298306
}
299-
if (sbMessage.AmqpMessage.Header.Durable != null)
307+
if (annotatedMessage.Header.Durable != null)
300308
{
301-
amqpMessage.Header.Durable = sbMessage.AmqpMessage.Header.Durable;
309+
amqpMessage.Header.Durable = annotatedMessage.Header.Durable;
302310
}
303-
if (sbMessage.AmqpMessage.Header.FirstAcquirer != null)
311+
if (annotatedMessage.Header.FirstAcquirer != null)
304312
{
305-
amqpMessage.Header.FirstAcquirer = sbMessage.AmqpMessage.Header.FirstAcquirer;
313+
amqpMessage.Header.FirstAcquirer = annotatedMessage.Header.FirstAcquirer;
306314
}
307-
if (sbMessage.AmqpMessage.Header.Priority != null)
315+
if (annotatedMessage.Header.Priority != null)
308316
{
309-
amqpMessage.Header.Priority = sbMessage.AmqpMessage.Header.Priority;
317+
amqpMessage.Header.Priority = annotatedMessage.Header.Priority;
310318
}
311319

312320
// footer
313321

314-
foreach (KeyValuePair<string, object> kvp in sbMessage.AmqpMessage.Footer)
322+
foreach (KeyValuePair<string, object> kvp in annotatedMessage.Footer)
315323
{
316324
amqpMessage.Footer.Map.Add(kvp.Key, kvp.Value);
317325
}
318326

319327
return amqpMessage;
320328
}
321329

322-
public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
330+
private static AmqpAnnotatedMessage AmqpMessageToAnnotatedMessage(AmqpMessage amqpMessage, bool isPeeked)
323331
{
324332
Argument.AssertNotNull(amqpMessage, nameof(amqpMessage));
325333
AmqpAnnotatedMessage annotatedMessage;
326-
327334
// body
328335

329336
if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null)
@@ -351,7 +358,6 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp
351358
{
352359
annotatedMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(Enumerable.Empty<ReadOnlyMemory<byte>>()));
353360
}
354-
ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);
355361

356362
SectionFlag sections = amqpMessage.Sections;
357363

@@ -540,22 +546,39 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqp
540546
}
541547
}
542548

549+
return annotatedMessage;
550+
}
551+
552+
public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMessage amqpMessage, bool isPeeked = false)
553+
{
554+
AmqpAnnotatedMessage annotatedMessage = AmqpMessageToAnnotatedMessage(amqpMessage, isPeeked);
555+
556+
ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);
557+
543558
// lock token
544559

545-
if (amqpMessage.DeliveryTag.Count == GuidSizeInBytes)
560+
sbMessage.LockTokenGuid = ParseGuidBytes(amqpMessage.DeliveryTag);
561+
562+
amqpMessage.Dispose();
563+
564+
return sbMessage;
565+
}
566+
567+
public virtual Guid ParseGuidBytes(ReadOnlyMemory<byte> bytes)
568+
{
569+
if (bytes.Length == GuidSizeInBytes)
546570
{
547-
Span<byte> guidBytes = stackalloc byte[GuidSizeInBytes];
548-
amqpMessage.DeliveryTag.AsSpan().CopyTo(guidBytes);
549-
if (!MemoryMarshal.TryRead<Guid>(guidBytes, out var lockTokenGuid))
571+
// Use TryRead to avoid allocating an array if we are on a little endian machine.
572+
if (!BitConverter.IsLittleEndian || !MemoryMarshal.TryRead<Guid>(bytes.Span, out var lockTokenGuid))
550573
{
551-
lockTokenGuid = new Guid(guidBytes.ToArray());
574+
// Either we are on a big endian machine or the bytes were not a valid GUID.
575+
// Even if the bytes were not valid, use the Guid constructor to leverage the Guid validation rather than throwing ourselves.
576+
lockTokenGuid = new Guid(bytes.ToArray());
552577
}
553-
sbMessage.LockTokenGuid = lockTokenGuid;
578+
return lockTokenGuid;
554579
}
555580

556-
amqpMessage.Dispose();
557-
558-
return sbMessage;
581+
return default;
559582
}
560583

561584
internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType mappingType, out object amqpObject)

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ namespace Azure.Messaging.ServiceBus.Amqp
1515
{
1616
internal static class AmqpMessageExtensions
1717
{
18-
public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message)
18+
public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message) => ToAmqpMessage(message.AmqpMessage);
19+
20+
public static AmqpMessage ToAmqpMessage(this AmqpAnnotatedMessage message)
1921
{
20-
if (message.AmqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
22+
if (message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
2123
{
2224
return AmqpMessage.Create(dataBody.AsAmqpData());
2325
}
24-
if (message.AmqpMessage.Body.TryGetValue(out object value))
26+
if (message.Body.TryGetValue(out object value))
2527
{
2628
if (AmqpMessageConverter.TryGetAmqpObjectFromNetObject(value, MappingType.MessageBody, out object amqpObject))
2729
{
@@ -32,12 +34,12 @@ public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message)
3234
throw new NotSupportedException(Resources.InvalidAmqpMessageValueBody.FormatForUser(amqpObject?.GetType()));
3335
}
3436
}
35-
if (message.AmqpMessage.Body.TryGetSequence(out IEnumerable<IList<object>> sequence))
37+
if (message.Body.TryGetSequence(out IEnumerable<IList<object>> sequence))
3638
{
3739
return AmqpMessage.Create(sequence.Select(s => new AmqpSequence((IList)s)).ToList());
3840
}
3941

40-
throw new NotSupportedException($"{message.AmqpMessage.Body.GetType()} is not a supported message body type.");
42+
throw new NotSupportedException($"{message.Body.GetType()} is not a supported message body type.");
4143
}
4244

4345
private static IEnumerable<Data> AsAmqpData(this IEnumerable<ReadOnlyMemory<byte>> binaryData)

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
385385
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
386386
}
387387

388-
receivedMessages.Add(_messageConverter.AmqpMessageToSBMessage(message));
388+
receivedMessages.Add(_messageConverter.AmqpMessageToSBReceivedMessage(message));
389389
message.Dispose();
390390
}
391391

@@ -1001,7 +1001,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
10011001

10021002
var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
10031003
var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
1004-
message = _messageConverter.AmqpMessageToSBMessage(amqpMessage, true);
1004+
message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage, true);
10051005
messages.Add(message);
10061006
}
10071007

@@ -1311,7 +1311,7 @@ internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDef
13111311
var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
13121312
var amqpMessage =
13131313
AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
1314-
var message = _messageConverter.AmqpMessageToSBMessage(amqpMessage);
1314+
var message = _messageConverter.AmqpMessageToSBReceivedMessage(amqpMessage);
13151315
if (entry.TryGetValue<Guid>(ManagementConstants.Properties.LockToken, out var lockToken))
13161316
{
13171317
message.LockTokenGuid = lockToken;

0 commit comments

Comments
 (0)