Skip to content

Commit cab3ab6

Browse files
Amqp api updates (Azure#16796)
1 parent 52fb9b7 commit cab3ab6

14 files changed

+81
-72
lines changed

sdk/core/Azure.Core.Amqp/api/Azure.Core.Amqp.netstandard2.0.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public partial struct AmqpAddress : System.IEquatable<Azure.Core.Amqp.AmqpAddres
1818
}
1919
public partial class AmqpAnnotatedMessage
2020
{
21-
public AmqpAnnotatedMessage(System.Collections.Generic.IEnumerable<System.ReadOnlyMemory<byte>> dataBody) { }
21+
public AmqpAnnotatedMessage(Azure.Core.Amqp.AmqpMessageBody body) { }
2222
public System.Collections.Generic.IDictionary<string, object> ApplicationProperties { get { throw null; } }
2323
public Azure.Core.Amqp.AmqpMessageBody Body { get { throw null; } set { } }
2424
public System.Collections.Generic.IDictionary<string, object> DeliveryAnnotations { get { throw null; } }
@@ -27,16 +27,11 @@ public AmqpAnnotatedMessage(System.Collections.Generic.IEnumerable<System.ReadOn
2727
public System.Collections.Generic.IDictionary<string, object> MessageAnnotations { get { throw null; } }
2828
public Azure.Core.Amqp.AmqpMessageProperties Properties { get { throw null; } }
2929
}
30-
public partial class AmqpDataMessageBody : Azure.Core.Amqp.AmqpMessageBody
30+
public partial class AmqpMessageBody
3131
{
32-
public AmqpDataMessageBody(System.Collections.Generic.IEnumerable<System.ReadOnlyMemory<byte>> data) { }
33-
public override Azure.Core.Amqp.AmqpMessageBodyType BodyType { get { throw null; } }
34-
public virtual System.Collections.Generic.IEnumerable<System.ReadOnlyMemory<byte>> Data { get { throw null; } }
35-
}
36-
public abstract partial class AmqpMessageBody
37-
{
38-
protected AmqpMessageBody() { }
39-
public abstract Azure.Core.Amqp.AmqpMessageBodyType BodyType { get; }
32+
public AmqpMessageBody(System.Collections.Generic.IEnumerable<System.ReadOnlyMemory<byte>> data) { }
33+
public Azure.Core.Amqp.AmqpMessageBodyType BodyType { get { throw null; } }
34+
public bool TryGetData(out System.Collections.Generic.IEnumerable<System.ReadOnlyMemory<byte>>? data) { throw null; }
4035
}
4136
public enum AmqpMessageBodyType
4237
{

sdk/core/Azure.Core.Amqp/src/AmqpAnnotatedMessage.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ public class AmqpAnnotatedMessage
1515
/// <summary>
1616
/// Initializes a new Data body <see cref="AmqpAnnotatedMessage"/>.
1717
/// </summary>
18-
/// <param name="dataBody">The data sections comprising the message body.
18+
/// <param name="body">The data sections comprising the message body.
1919
/// <seealso href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-data"/>
2020
/// </param>
21-
public AmqpAnnotatedMessage(IEnumerable<ReadOnlyMemory<byte>> dataBody)
21+
public AmqpAnnotatedMessage(AmqpMessageBody body)
2222
{
23-
Body = new AmqpDataMessageBody(dataBody);
23+
Body = body;
2424
}
2525

2626
/// <summary>

sdk/core/Azure.Core.Amqp/src/AmqpDataMessageBody.cs

Lines changed: 0 additions & 36 deletions
This file was deleted.
Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,53 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
48
namespace Azure.Core.Amqp
59
{
610
/// <summary>
711
/// Represents an AMQP message body.
812
/// <seealso href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format" />
913
/// </summary>
10-
public abstract class AmqpMessageBody
14+
public class AmqpMessageBody
1115
{
16+
/// <summary>
17+
/// The data sections for the AMQP message body.
18+
/// </summary>
19+
private readonly IEnumerable<ReadOnlyMemory<byte>> _data;
20+
1221
/// <summary>
1322
/// Gets the type of the message body.
1423
/// </summary>
15-
public abstract AmqpMessageBodyType BodyType { get; }
24+
public AmqpMessageBodyType BodyType { get; }
25+
26+
/// <summary>
27+
/// Initializes a new <see cref="AmqpMessageBody"/> instance with the
28+
/// passed in data sections.
29+
/// </summary>
30+
/// <param name="data">The data sections.</param>
31+
public AmqpMessageBody(IEnumerable<ReadOnlyMemory<byte>> data)
32+
{
33+
_data = data ?? Enumerable.Empty<ReadOnlyMemory<byte>>();
34+
BodyType = AmqpMessageBodyType.Data;
35+
}
36+
37+
/// <summary>
38+
/// Try to get the data sections for the AMQP message body.
39+
/// </summary>
40+
/// <param name="data"></param>
41+
/// <returns></returns>
42+
public bool TryGetData(out IEnumerable<ReadOnlyMemory<byte>>? data)
43+
{
44+
if (BodyType == AmqpMessageBodyType.Data)
45+
{
46+
data = _data;
47+
return true;
48+
}
49+
data = null;
50+
return false;
51+
}
1652
}
1753
}

sdk/core/Azure.Core.Amqp/tests/AmqpAnnotatedMessageTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class AmqpAnnotatedMessageTests
1414
[Test]
1515
public void CanCreateAnnotatedMessage()
1616
{
17-
var message = new AmqpAnnotatedMessage(new ReadOnlyMemory<byte>[] { Encoding.UTF8.GetBytes("some data") });
17+
var message = new AmqpAnnotatedMessage(new AmqpMessageBody(new ReadOnlyMemory<byte>[] { Encoding.UTF8.GetBytes("some data") }));
1818
message.ApplicationProperties.Add("applicationKey", "applicationValue");
1919
message.DeliveryAnnotations.Add("deliveryKey", "deliveryValue");
2020
message.MessageAnnotations.Add("messageKey", "messageValue");
@@ -40,7 +40,8 @@ public void CanCreateAnnotatedMessage()
4040
message.Properties.UserId = Encoding.UTF8.GetBytes("userId");
4141

4242
Assert.AreEqual(AmqpMessageBodyType.Data, message.Body.BodyType);
43-
Assert.AreEqual("some data", Encoding.UTF8.GetString(((AmqpDataMessageBody)message.Body).Data.First().ToArray()));
43+
Assert.IsTrue(message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> body));
44+
Assert.AreEqual("some data", Encoding.UTF8.GetString(body.First().ToArray()));
4445
Assert.AreEqual("applicationValue", message.ApplicationProperties["applicationKey"]);
4546
Assert.AreEqual("deliveryValue", message.DeliveryAnnotations["deliveryKey"]);
4647
Assert.AreEqual("messageValue", message.MessageAnnotations["messageKey"]);

sdk/core/Azure.Core.Amqp/tests/AmqpDataMessageBodyTests.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.Collections.Generic;
56
using NUnit.Framework;
67

78
namespace Azure.Core.Amqp.Tests
@@ -11,8 +12,15 @@ public class AmqpDataMessageBodyTests
1112
[Test]
1213
public void CanCreateDataBody()
1314
{
14-
var body = new AmqpDataMessageBody(Array.Empty<ReadOnlyMemory<byte>>());
15+
var body = new AmqpMessageBody(Array.Empty<ReadOnlyMemory<byte>>());
1516
Assert.AreEqual(AmqpMessageBodyType.Data, body.BodyType);
17+
Assert.IsTrue(body.TryGetData(out var data));
18+
Assert.NotNull(data);
19+
20+
body = new AmqpMessageBody(null);
21+
Assert.AreEqual(AmqpMessageBodyType.Data, body.BodyType);
22+
Assert.IsTrue(body.TryGetData(out data));
23+
Assert.NotNull(data);
1624
}
1725
}
1826
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,11 @@ public static ServiceBusReceivedMessage AmqpMessageToSBMessage(AmqpMessage amqpM
214214

215215
if ((amqpMessage.BodyType & SectionFlag.Data) != 0 && amqpMessage.DataBody != null)
216216
{
217-
annotatedMessage = new AmqpAnnotatedMessage(amqpMessage.GetDataViaDataBody());
217+
annotatedMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(amqpMessage.GetDataViaDataBody()));
218218
}
219219
else
220220
{
221-
annotatedMessage = new AmqpAnnotatedMessage(new ReadOnlyMemory<byte>[] { Array.Empty<byte>() });
221+
annotatedMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(Enumerable.Empty<ReadOnlyMemory<byte>>()));
222222
}
223223
ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);
224224

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ internal static class AmqpMessageExtensions
1515
{
1616
public static AmqpMessage ToAmqpMessage(this ServiceBusMessage message)
1717
{
18-
if (message.AmqpMessage.Body is AmqpDataMessageBody dataBody)
18+
if (message.AmqpMessage.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
1919
{
20-
return AmqpMessage.Create(dataBody.Data.AsAmqpData());
20+
return AmqpMessage.Create(dataBody.AsAmqpData());
2121
}
2222
throw new NotSupportedException($"{message.AmqpMessage.Body.GetType()} is not a supported message body type.");
2323
}
@@ -130,9 +130,9 @@ public static DateTimeOffset GetScheduledEnqueueTime(this AmqpAnnotatedMessage m
130130

131131
public static BinaryData GetBody(this AmqpAnnotatedMessage message)
132132
{
133-
if (message.Body is AmqpDataMessageBody dataBody)
133+
if (message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody))
134134
{
135-
return dataBody.Data.ConvertAndFlattenData();
135+
return dataBody.ConvertAndFlattenData();
136136
}
137137
throw new NotSupportedException($"{message.Body.GetType()} is not a supported message body type.");
138138
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Azure.Messaging.ServiceBus.csproj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<Description>Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and state. This client library allows for both sending and receiving messages using Azure Service Bus. For more information about Service Bus, see https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview</Description>
44
<Version>7.0.0-preview.10</Version>
@@ -12,7 +12,6 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Include="Azure.Core.Amqp" />
1615
<PackageReference Include="Microsoft.Azure.Amqp" />
1716
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
1817
<!--This can be removed once this dependency is added to Core in the next release.-->
@@ -36,6 +35,9 @@
3635
<Compile Include="$(AzureCoreSharedSources)PageResponseEnumerator.cs" Link="SharedSource\Azure.Core\PageResponseEnumerator.cs" />
3736
<Compile Include="$(AzureCoreSharedSources)AzureResourceProviderNamespaceAttribute.cs" Link="SharedSource\Azure.Core\AzureResourceProviderNamespaceAttribute.cs" />
3837
</ItemGroup>
38+
<ItemGroup>
39+
<ProjectReference Include="..\..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj" />
40+
</ItemGroup>
3941
<ItemGroup>
4042
<Compile Update="Resources.Designer.cs">
4143
<DesignTime>True</DesignTime>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Plugins/ServiceBusPlugin.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public virtual ValueTask AfterMessageReceiveAsync(ServiceBusReceivedMessage mess
3939
#pragma warning disable CA1822 // Mark members as static
4040
protected void SetBody(ServiceBusReceivedMessage message, BinaryData body)
4141
{
42-
message.AmqpMessage.Body = new AmqpDataMessageBody(new ReadOnlyMemory<byte>[] { body });
42+
message.AmqpMessage.Body = new AmqpMessageBody(new ReadOnlyMemory<byte>[] { body });
4343
}
4444

4545
/// <summary>

0 commit comments

Comments
 (0)