Skip to content

Commit ac6e370

Browse files
[azservicebus] Some naming changes for raw AMQP message support for consistency with other stacks Azure#18430
Some naming changes to be more consistent with other track 2 SDKs as well as the AMQP spec: Rename the AMQPMessage to be AMQPAnnotatedMessage (track 2 consistency, also matches terminology in the spec) Create an AMQPAnnotatedMessageBody type to house the .Value, .Sequence and .Data fields. This matches more with the logical format of an AMQP message, where the Body is comprised of sections (Data, Value, Sequence), rather than them just being free-floating fields on a message.
1 parent 37b3dad commit ac6e370

File tree

9 files changed

+77
-70
lines changed

9 files changed

+77
-70
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
### Features Added
66

77
- Full access to send and receive all AMQP message properties. (#18413)
8-
- Send AMQP messages using the new `AMQPMessage` type and `Sender.SendAMQPMessage()` (AMQP messages can be added to MessageBatch's as well using MessageBatch.AddAMQPMessage()).
8+
- Send AMQP messages using the new `AMQPAnnotatedMessage` type and `Sender.SendAMQPAnnotatedMessage()`.
9+
- AMQP messages can be added to MessageBatch's as well using `MessageBatch.AddAMQPAnnotatedMessage()`.
10+
- AMQP messages can be scheduled using `Sender.ScheduleAMQPAnnotatedMessages`.
911
- Access the full set of AMQP message properties when receiving using the `ReceivedMessage.RawAMQPMessage` property.
1012

1113
### Breaking Changes

sdk/messaging/azservicebus/amqp_message.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
1010
)
1111

12-
// AMQPMessage represents the AMQP message, as received from Service Bus.
12+
// AMQPAnnotatedMessage represents the AMQP message, as received from Service Bus.
1313
// For details about these properties, refer to the AMQP specification:
1414
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
1515
//
@@ -22,18 +22,18 @@ import (
2222
// - string
2323
// - bool
2424
// - time.Time
25-
type AMQPMessage struct {
25+
type AMQPAnnotatedMessage struct {
2626
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
2727
//
28-
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
28+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
2929
ApplicationProperties map[string]any
3030

3131
// Body represents the body of an AMQP message.
32-
Body AMQPMessageBody
32+
Body AMQPAnnotatedMessageBody
3333

3434
// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
3535
//
36-
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
36+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
3737
DeliveryAnnotations map[any]any
3838

3939
// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
@@ -42,19 +42,19 @@ type AMQPMessage struct {
4242

4343
// Footer is the transport footers for this AMQP message.
4444
//
45-
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
45+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
4646
Footer map[any]any
4747

4848
// Header is the transport headers for this AMQP message.
49-
Header *AMQPMessageHeader
49+
Header *AMQPAnnotatedMessageHeader
5050

5151
// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
5252
//
53-
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
53+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
5454
MessageAnnotations map[any]any
5555

5656
// Properties corresponds to the properties section of an AMQP message.
57-
Properties *AMQPMessageProperties
57+
Properties *AMQPAnnotatedMessageProperties
5858

5959
linkName string
6060

@@ -64,10 +64,10 @@ type AMQPMessage struct {
6464
inner *amqp.Message
6565
}
6666

67-
// AMQPMessageProperties represents the properties of an AMQP message.
67+
// AMQPAnnotatedMessageProperties represents the properties of an AMQP message.
6868
// See here for more details:
6969
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
70-
type AMQPMessageProperties struct {
70+
type AMQPAnnotatedMessageProperties struct {
7171
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
7272
AbsoluteExpiryTime *time.Time
7373

@@ -110,29 +110,29 @@ type AMQPMessageProperties struct {
110110
UserID []byte
111111
}
112112

113-
// AMQPMessageBody represents the body of an AMQP message.
113+
// AMQPAnnotatedMessageBody represents the body of an AMQP message.
114114
// Only one of these fields can be used a a time. They are mutually exclusive.
115-
type AMQPMessageBody struct {
115+
type AMQPAnnotatedMessageBody struct {
116116
// Data is encoded/decoded as multiple data sections in the body.
117117
Data [][]byte
118118

119119
// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
120120
//
121-
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
121+
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
122122
Sequence [][]any
123123

124124
// Value is encoded/decoded as the amqp-value section in the body.
125125
//
126-
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPMessage,
126+
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
127127
// as well as slices or maps of AMQP simple types.
128128
Value any
129129
}
130130

131-
// AMQPMessageHeader carries standard delivery details about the transfer
131+
// AMQPAnnotatedMessageHeader carries standard delivery details about the transfer
132132
// of a message.
133133
// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
134134
// for more details.
135-
type AMQPMessageHeader struct {
135+
type AMQPAnnotatedMessageHeader struct {
136136
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
137137
// It corresponds to the 'delivery-count' property.
138138
DeliveryCount uint32
@@ -152,7 +152,7 @@ type AMQPMessageHeader struct {
152152

153153
// toAMQPMessage converts between our (azservicebus) AMQP message
154154
// to the underlying message used by go-amqp.
155-
func (am *AMQPMessage) toAMQPMessage() *amqp.Message {
155+
func (am *AMQPAnnotatedMessage) toAMQPMessage() *amqp.Message {
156156
var header *amqp.MessageHeader
157157

158158
if am.Header != nil {
@@ -221,11 +221,11 @@ func copyAnnotations(src map[any]any) amqp.Annotations {
221221
return dest
222222
}
223223

224-
func newAMQPMessage(goAMQPMessage *amqp.Message) *AMQPMessage {
225-
var header *AMQPMessageHeader
224+
func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
225+
var header *AMQPAnnotatedMessageHeader
226226

227227
if goAMQPMessage.Header != nil {
228-
header = &AMQPMessageHeader{
228+
header = &AMQPAnnotatedMessageHeader{
229229
DeliveryCount: goAMQPMessage.Header.DeliveryCount,
230230
Durable: goAMQPMessage.Header.Durable,
231231
FirstAcquirer: goAMQPMessage.Header.FirstAcquirer,
@@ -234,10 +234,10 @@ func newAMQPMessage(goAMQPMessage *amqp.Message) *AMQPMessage {
234234
}
235235
}
236236

237-
var properties *AMQPMessageProperties
237+
var properties *AMQPAnnotatedMessageProperties
238238

239239
if goAMQPMessage.Properties != nil {
240-
properties = &AMQPMessageProperties{
240+
properties = &AMQPAnnotatedMessageProperties{
241241
AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime,
242242
ContentEncoding: goAMQPMessage.Properties.ContentEncoding,
243243
ContentType: goAMQPMessage.Properties.ContentType,
@@ -260,10 +260,10 @@ func newAMQPMessage(goAMQPMessage *amqp.Message) *AMQPMessage {
260260
footer = (map[any]any)(goAMQPMessage.Footer)
261261
}
262262

263-
return &AMQPMessage{
263+
return &AMQPAnnotatedMessage{
264264
MessageAnnotations: map[any]any(goAMQPMessage.Annotations),
265265
ApplicationProperties: goAMQPMessage.ApplicationProperties,
266-
Body: AMQPMessageBody{
266+
Body: AMQPAnnotatedMessageBody{
267267
Data: goAMQPMessage.Data,
268268
Sequence: goAMQPMessage.Sequence,
269269
Value: goAMQPMessage.Value,

sdk/messaging/azservicebus/amqp_message_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12-
func TestAMQPMessageUnitTest(t *testing.T) {
12+
func TestAMQPAnnotatedMessageUnitTest(t *testing.T) {
1313
t.Run("Default", func(t *testing.T) {
14-
msg := &AMQPMessage{}
14+
msg := &AMQPAnnotatedMessage{}
1515
amqpMessage := msg.toAMQPMessage()
1616

1717
// we duplicate/inflate these since we modify them

sdk/messaging/azservicebus/example_receiver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func ExampleReceiver_ReceiveMessages_amqpMessage() {
132132
// NOTE: For this example we'll assume we received at least one message.
133133

134134
// Every received message carries a RawAMQPMessage.
135-
rawAMQPMessage := messages[0].RawAMQPMessage
135+
var rawAMQPMessage *azservicebus.AMQPAnnotatedMessage = messages[0].RawAMQPMessage
136136

137137
// All the various body encodings available for AMQP messages are exposed via Body
138138
_ = rawAMQPMessage.Body.Data

sdk/messaging/azservicebus/example_sender_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,14 @@ func ExampleSender_ScheduleMessages() {
9191
exitOnError("Failed to schedule messages using SendMessage", err)
9292
}
9393

94-
func ExampleSender_SendAMQPMessage() {
94+
func ExampleSender_SendAMQPAnnotatedMessage() {
9595
// AMQP is the underlying protocol for all interaction with Service Bus.
9696
// You can, if needed, send and receive messages that have a 1:1 correspondence
9797
// with an AMQP message. This gives you full control over details that are not
9898
// exposed via the azservicebus.ReceivedMessage type.
9999

100-
message := &azservicebus.AMQPMessage{
101-
Body: azservicebus.AMQPMessageBody{
100+
message := &azservicebus.AMQPAnnotatedMessage{
101+
Body: azservicebus.AMQPAnnotatedMessageBody{
102102
// there are three kinds of different body encodings
103103
// Data, Value and Sequence. See the azservicebus.AMQPMessageBody
104104
// documentation for more details.
@@ -114,7 +114,7 @@ func ExampleSender_SendAMQPMessage() {
114114
},
115115
}
116116

117-
err := sender.SendAMQPMessage(context.TODO(), message, nil)
117+
err := sender.SendAMQPAnnotatedMessage(context.TODO(), message, nil)
118118

119119
if err != nil {
120120
panic(err)

sdk/messaging/azservicebus/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type ReceivedMessage struct {
124124
// to properties that are not exposed by ReceivedMessage such as payloads encoded into the
125125
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
126126
// and Header fields.
127-
RawAMQPMessage *AMQPMessage
127+
RawAMQPMessage *AMQPAnnotatedMessage
128128

129129
// deferred indicates we received it using ReceiveDeferredMessages. These messages
130130
// will still go through the normal Receiver.Settle functions but internally will
@@ -314,7 +314,7 @@ func (m *Message) toAMQPMessage() *amqp.Message {
314314
// serialized byte array in the Data section of the messsage.
315315
func newReceivedMessage(amqpMsg *amqp.Message) *ReceivedMessage {
316316
msg := &ReceivedMessage{
317-
RawAMQPMessage: newAMQPMessage(amqpMsg),
317+
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
318318
State: MessageStateActive,
319319
}
320320

sdk/messaging/azservicebus/message_batch.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@ func (mb *MessageBatch) AddMessage(m *Message, options *AddMessageOptions) error
5454
return mb.addAMQPMessage(m)
5555
}
5656

57-
// AddAMQPMessageOptions contains optional parameters for the AddAMQPMessage function.
58-
type AddAMQPMessageOptions struct {
57+
// AddAMQPAnnotatedMessageOptions contains optional parameters for the AddAMQPAnnotatedMessage function.
58+
type AddAMQPAnnotatedMessageOptions struct {
5959
// For future expansion
6060
}
6161

62-
// AddAMQPMessage adds a message to the batch if the message will not exceed the max size of the batch
62+
// AddAMQPAnnotatedMessage adds a message to the batch if the message will not exceed the max size of the batch
6363
// Returns:
6464
// - ErrMessageTooLarge if the message cannot fit
6565
// - a non-nil error for other failures
6666
// - nil, otherwise
67-
func (mb *MessageBatch) AddAMQPMessage(m *AMQPMessage, options *AddAMQPMessageOptions) error {
67+
func (mb *MessageBatch) AddAMQPAnnotatedMessage(m *AMQPAnnotatedMessage, options *AddAMQPAnnotatedMessageOptions) error {
6868
return mb.addAMQPMessage(m)
6969
}
7070

sdk/messaging/azservicebus/sender.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,20 @@ type SendMessageOptions struct {
6363
// SendMessage sends a Message to a queue or topic.
6464
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
6565
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error {
66-
return s.sendMessage(ctx, message, options)
66+
return s.sendMessage(ctx, message)
6767
}
6868

69-
// SendAMQPMessage sends an AMQPMessage to a queue or topic.
69+
// SendAMQPAnnotatedMessageOptions contains optional parameters for the SendAMQPAnnotatedMessage function.
70+
type SendAMQPAnnotatedMessageOptions struct {
71+
// For future expansion
72+
}
73+
74+
// SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic.
7075
// Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better
7176
// interoperability with pure AMQP clients.
7277
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
73-
func (s *Sender) SendAMQPMessage(ctx context.Context, message *AMQPMessage, options *SendMessageOptions) error {
74-
return s.sendMessage(ctx, message, options)
78+
func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error {
79+
return s.sendMessage(ctx, message)
7580
}
7681

7782
// SendMessageBatchOptions contains optional parameters for the SendMessageBatch function.
@@ -103,16 +108,16 @@ func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, sche
103108
return scheduleMessages(ctx, s.links, s.retryOptions, messages, scheduledEnqueueTime)
104109
}
105110

106-
// ScheduleAMQPMessagesOptions contains optional parameters for the ScheduleAMQPMessages function.
107-
type ScheduleAMQPMessagesOptions struct {
111+
// ScheduleAMQPAnnotatedMessagesOptions contains optional parameters for the ScheduleAMQPAnnotatedMessages function.
112+
type ScheduleAMQPAnnotatedMessagesOptions struct {
108113
// For future expansion
109114
}
110115

111-
// ScheduleAMQPMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time.
116+
// ScheduleAMQPAnnotatedMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time.
112117
// Returns the sequence numbers of the messages that were scheduled. Messages that haven't been
113118
// delivered can be cancelled using `Receiver.CancelScheduleMessage(s)`
114119
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
115-
func (s *Sender) ScheduleAMQPMessages(ctx context.Context, messages []*AMQPMessage, scheduledEnqueueTime time.Time, options *ScheduleAMQPMessagesOptions) ([]int64, error) {
120+
func (s *Sender) ScheduleAMQPAnnotatedMessages(ctx context.Context, messages []*AMQPAnnotatedMessage, scheduledEnqueueTime time.Time, options *ScheduleAMQPAnnotatedMessagesOptions) ([]int64, error) {
116121
return scheduleMessages(ctx, s.links, s.retryOptions, messages, scheduledEnqueueTime)
117122
}
118123

@@ -161,7 +166,7 @@ func (s *Sender) Close(ctx context.Context) error {
161166
return s.links.Close(ctx, true)
162167
}
163168

164-
func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage, options *SendMessageOptions) error {
169+
func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage) error {
165170
err := s.links.Retry(ctx, EventSender, "SendMessage", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
166171
return lwid.Sender.Send(ctx, message.toAMQPMessage())
167172
}, RetryOptions(s.retryOptions))

0 commit comments

Comments
 (0)