Skip to content

Commit 1cdb007

Browse files
[azservicebus] Adding in raw AMQP support for sending and receiving messages. (Azure#18413)
For sending AMQP messages you can use either Sender.SendAMQPMessages() or schedule them using Sender.ScheduleAMQPMessages(). For receiving, each ReceivedMessage now has a RawAMQPMessage property. I've also added in a way to export the TLS pre-master key so you can easily wireshark and trace the actual AMQP interaction, which can be helpful in validating that things are encoding properly. Fixes Azure#15260
1 parent d1bca58 commit 1cdb007

File tree

14 files changed

+789
-38
lines changed

14 files changed

+789
-38
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
# Release History
22

3-
## 1.0.2 (Unreleased)
3+
## 1.0.2-beta.0 (Unreleased)
44

55
### Features Added
66

7+
- Full access to send and receive all AMQP message properties. (#18413)
8+
- Send AMQP messages using the new `AMQPMessage` type and `Sender.SendAMQPMessage()` (AMQP messages can be added to MessageBatch's as well using MessageBatch.AddAMQPMessage()).
9+
- Access the full set of AMQP message properties when receiving using the `ReceivedMessage.RawAMQPMessage` property.
10+
711
### Breaking Changes
812

913
### Bugs Fixed
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azservicebus
5+
6+
import (
7+
"time"
8+
9+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
10+
)
11+
12+
// AMQPMessage represents the AMQP message, as received from Service Bus.
13+
// For details about these properties, refer to the AMQP specification:
14+
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
15+
//
16+
// Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some
17+
// cases slices and maps.
18+
//
19+
// AMQP simple types include:
20+
// - int (any size), uint (any size)
21+
// - float (any size)
22+
// - string
23+
// - bool
24+
// - time.Time
25+
type AMQPMessage struct {
26+
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
27+
//
28+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
29+
ApplicationProperties map[string]any
30+
31+
// Body represents the body of an AMQP message.
32+
Body AMQPMessageBody
33+
34+
// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
35+
//
36+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
37+
DeliveryAnnotations map[any]any
38+
39+
// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
40+
// for this message.
41+
DeliveryTag []byte
42+
43+
// Footer is the transport footers for this AMQP message.
44+
//
45+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
46+
Footer map[any]any
47+
48+
// Header is the transport headers for this AMQP message.
49+
Header *AMQPMessageHeader
50+
51+
// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
52+
//
53+
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
54+
MessageAnnotations map[any]any
55+
56+
// Properties corresponds to the properties section of an AMQP message.
57+
Properties *AMQPMessageProperties
58+
59+
linkName string
60+
61+
// inner is the AMQP message we originally received, which contains some hidden
62+
// data that's needed to settle with go-amqp. We strip out most of the underlying
63+
// data so it's fairly minimal.
64+
inner *amqp.Message
65+
}
66+
67+
// AMQPMessageProperties represents the properties of an AMQP message.
68+
// See here for more details:
69+
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties
70+
type AMQPMessageProperties struct {
71+
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
72+
AbsoluteExpiryTime *time.Time
73+
74+
// ContentEncoding corresponds to the 'content-encoding' property.
75+
ContentEncoding *string
76+
77+
// ContentType corresponds to the 'content-type' property
78+
ContentType *string
79+
80+
// CorrelationID corresponds to the 'correlation-id' property.
81+
// The type of CorrelationID can be a uint64, UUID, []byte, or a string
82+
CorrelationID any
83+
84+
// CreationTime corresponds to the 'creation-time' property.
85+
CreationTime *time.Time
86+
87+
// GroupID corresponds to the 'group-id' property.
88+
GroupID *string
89+
90+
// GroupSequence corresponds to the 'group-sequence' property.
91+
GroupSequence *uint32
92+
93+
// MessageID corresponds to the 'message-id' property.
94+
// The type of MessageID can be a uint64, UUID, []byte, or string
95+
MessageID any
96+
97+
// ReplyTo corresponds to the 'reply-to' property.
98+
ReplyTo *string
99+
100+
// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
101+
ReplyToGroupID *string
102+
103+
// Subject corresponds to the 'subject' property.
104+
Subject *string
105+
106+
// To corresponds to the 'to' property.
107+
To *string
108+
109+
// UserID corresponds to the 'user-id' property.
110+
UserID []byte
111+
}
112+
113+
// AMQPMessageBody represents the body of an AMQP message.
114+
// Only one of these fields can be used a a time. They are mutually exclusive.
115+
type AMQPMessageBody struct {
116+
// Data is encoded/decoded as multiple data sections in the body.
117+
Data [][]byte
118+
119+
// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
120+
//
121+
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPMessage.
122+
Sequence [][]any
123+
124+
// Value is encoded/decoded as the amqp-value section in the body.
125+
//
126+
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPMessage,
127+
// as well as slices or maps of AMQP simple types.
128+
Value any
129+
}
130+
131+
// AMQPMessageHeader carries standard delivery details about the transfer
132+
// of a message.
133+
// See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
134+
// for more details.
135+
type AMQPMessageHeader struct {
136+
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
137+
// It corresponds to the 'delivery-count' property.
138+
DeliveryCount uint32
139+
140+
// Durable corresponds to the 'durable' property.
141+
Durable bool
142+
143+
// FirstAcquirer corresponds to the 'first-acquirer' property.
144+
FirstAcquirer bool
145+
146+
// Priority corresponds to the 'priority' property.
147+
Priority uint8
148+
149+
// TTL corresponds to the 'ttl' property.
150+
TTL time.Duration
151+
}
152+
153+
// toAMQPMessage converts between our (azservicebus) AMQP message
154+
// to the underlying message used by go-amqp.
155+
func (am *AMQPMessage) toAMQPMessage() *amqp.Message {
156+
var header *amqp.MessageHeader
157+
158+
if am.Header != nil {
159+
header = &amqp.MessageHeader{
160+
DeliveryCount: am.Header.DeliveryCount,
161+
Durable: am.Header.Durable,
162+
FirstAcquirer: am.Header.FirstAcquirer,
163+
Priority: am.Header.Priority,
164+
TTL: am.Header.TTL,
165+
}
166+
}
167+
168+
var properties *amqp.MessageProperties
169+
170+
if am.Properties != nil {
171+
properties = &amqp.MessageProperties{
172+
AbsoluteExpiryTime: am.Properties.AbsoluteExpiryTime,
173+
ContentEncoding: am.Properties.ContentEncoding,
174+
ContentType: am.Properties.ContentType,
175+
CorrelationID: am.Properties.CorrelationID,
176+
CreationTime: am.Properties.CreationTime,
177+
GroupID: am.Properties.GroupID,
178+
GroupSequence: am.Properties.GroupSequence,
179+
MessageID: am.Properties.MessageID,
180+
ReplyTo: am.Properties.ReplyTo,
181+
ReplyToGroupID: am.Properties.ReplyToGroupID,
182+
Subject: am.Properties.Subject,
183+
To: am.Properties.To,
184+
UserID: am.Properties.UserID,
185+
}
186+
} else {
187+
properties = &amqp.MessageProperties{}
188+
}
189+
190+
var footer amqp.Annotations
191+
192+
if am.Footer != nil {
193+
footer = (amqp.Annotations)(am.Footer)
194+
}
195+
196+
return &amqp.Message{
197+
Annotations: copyAnnotations(am.MessageAnnotations),
198+
ApplicationProperties: am.ApplicationProperties,
199+
Data: am.Body.Data,
200+
DeliveryAnnotations: amqp.Annotations(am.DeliveryAnnotations),
201+
DeliveryTag: am.DeliveryTag,
202+
Footer: footer,
203+
Header: header,
204+
Properties: properties,
205+
Sequence: am.Body.Sequence,
206+
Value: am.Body.Value,
207+
}
208+
}
209+
210+
func copyAnnotations(src map[any]any) amqp.Annotations {
211+
if src == nil {
212+
return amqp.Annotations{}
213+
}
214+
215+
dest := amqp.Annotations{}
216+
217+
for k, v := range src {
218+
dest[k] = v
219+
}
220+
221+
return dest
222+
}
223+
224+
func newAMQPMessage(goAMQPMessage *amqp.Message) *AMQPMessage {
225+
var header *AMQPMessageHeader
226+
227+
if goAMQPMessage.Header != nil {
228+
header = &AMQPMessageHeader{
229+
DeliveryCount: goAMQPMessage.Header.DeliveryCount,
230+
Durable: goAMQPMessage.Header.Durable,
231+
FirstAcquirer: goAMQPMessage.Header.FirstAcquirer,
232+
Priority: goAMQPMessage.Header.Priority,
233+
TTL: goAMQPMessage.Header.TTL,
234+
}
235+
}
236+
237+
var properties *AMQPMessageProperties
238+
239+
if goAMQPMessage.Properties != nil {
240+
properties = &AMQPMessageProperties{
241+
AbsoluteExpiryTime: goAMQPMessage.Properties.AbsoluteExpiryTime,
242+
ContentEncoding: goAMQPMessage.Properties.ContentEncoding,
243+
ContentType: goAMQPMessage.Properties.ContentType,
244+
CorrelationID: goAMQPMessage.Properties.CorrelationID,
245+
CreationTime: goAMQPMessage.Properties.CreationTime,
246+
GroupID: goAMQPMessage.Properties.GroupID,
247+
GroupSequence: goAMQPMessage.Properties.GroupSequence,
248+
MessageID: goAMQPMessage.Properties.MessageID,
249+
ReplyTo: goAMQPMessage.Properties.ReplyTo,
250+
ReplyToGroupID: goAMQPMessage.Properties.ReplyToGroupID,
251+
Subject: goAMQPMessage.Properties.Subject,
252+
To: goAMQPMessage.Properties.To,
253+
UserID: goAMQPMessage.Properties.UserID,
254+
}
255+
}
256+
257+
var footer map[any]any
258+
259+
if goAMQPMessage.Footer != nil {
260+
footer = (map[any]any)(goAMQPMessage.Footer)
261+
}
262+
263+
return &AMQPMessage{
264+
MessageAnnotations: map[any]any(goAMQPMessage.Annotations),
265+
ApplicationProperties: goAMQPMessage.ApplicationProperties,
266+
Body: AMQPMessageBody{
267+
Data: goAMQPMessage.Data,
268+
Sequence: goAMQPMessage.Sequence,
269+
Value: goAMQPMessage.Value,
270+
},
271+
DeliveryAnnotations: map[any]any(goAMQPMessage.DeliveryAnnotations),
272+
DeliveryTag: goAMQPMessage.DeliveryTag,
273+
Footer: footer,
274+
Header: header,
275+
linkName: goAMQPMessage.LinkName(),
276+
Properties: properties,
277+
inner: goAMQPMessage,
278+
}
279+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azservicebus
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestAMQPMessageUnitTest(t *testing.T) {
13+
t.Run("Default", func(t *testing.T) {
14+
msg := &AMQPMessage{}
15+
amqpMessage := msg.toAMQPMessage()
16+
17+
// we duplicate/inflate these since we modify them
18+
// in various parts of the API.
19+
require.NotNil(t, amqpMessage.Properties)
20+
require.NotNil(t, amqpMessage.Annotations)
21+
})
22+
}

sdk/messaging/azservicebus/example_receiver_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,41 @@ func ExampleReceiver_ReceiveMessages() {
116116
fmt.Printf("Received and completed the message\n")
117117
}
118118
}
119+
120+
func ExampleReceiver_ReceiveMessages_amqpMessage() {
121+
// AMQP is the underlying protocol for all interaction with Service Bus.
122+
// You can, if needed, send and receive messages that have a 1:1 correspondence
123+
// with an AMQP message. This gives you full control over details that are not
124+
// exposed via the azservicebus.ReceivedMessage type.
125+
126+
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
127+
128+
if err != nil {
129+
panic(err)
130+
}
131+
132+
// NOTE: For this example we'll assume we received at least one message.
133+
134+
// Every received message carries a RawAMQPMessage.
135+
rawAMQPMessage := messages[0].RawAMQPMessage
136+
137+
// All the various body encodings available for AMQP messages are exposed via Body
138+
_ = rawAMQPMessage.Body.Data
139+
_ = rawAMQPMessage.Body.Value
140+
_ = rawAMQPMessage.Body.Sequence
141+
142+
// delivery and message annotations
143+
_ = rawAMQPMessage.DeliveryAnnotations
144+
_ = rawAMQPMessage.MessageAnnotations
145+
146+
// headers and footers
147+
_ = rawAMQPMessage.Header
148+
_ = rawAMQPMessage.Footer
149+
150+
// Settlement (if in azservicebus.ReceiveModePeekLockMode) stil works on the ReceivedMessage.
151+
err = receiver.CompleteMessage(context.TODO(), messages[0], nil)
152+
153+
if err != nil {
154+
panic(err)
155+
}
156+
}

0 commit comments

Comments
 (0)