Skip to content

Commit 6837144

Browse files
[azservicebus] Adding in the associated-link-name to all management link operations. (Azure#18291)
The associated-link-name property let's Service Bus know that a link is active, even if there is no direct activity on it. For instance, when doing lock renewals you use the management link, but the associated receiver link is what you want to keep alive. We now pass this for all management related operations - scheduling messages, "backup" message settlement, lock renewal for messages and sessions, and session-state operations. Fixes Azure#16205
1 parent 3e704d4 commit 6837144

File tree

10 files changed

+59
-24
lines changed

10 files changed

+59
-24
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
### Bugs Fixed
66

77
- Handle a missing CountDetails node in the returned responses for Get<Entity>RuntimeProperties which could cause a panic. (#18213)
8+
- Adding the `associated-link-name` property to management operations (RenewLock, settlement and others), which
9+
can help extend link lifetime (#18291)
810

911
## 1.0.0 (2022-05-16)
1012

sdk/messaging/azservicebus/internal/amqpLinks_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func assertFailedLinks(t *testing.T, lwid *LinksWithID, expectedErr error, expec
4141
})
4242
require.ErrorIs(t, err, expectedErr)
4343

44-
_, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1)
44+
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
4545
require.ErrorIs(t, err, expectedRPCError)
4646

4747
msg, err := lwid.Receiver.Receive(context.TODO())
@@ -58,7 +58,7 @@ func assertLinks(t *testing.T, lwid *LinksWithID) {
5858
})
5959
require.NoError(t, err)
6060

61-
_, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1)
61+
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
6262
require.NoError(t, err)
6363

6464
require.NoError(t, lwid.Receiver.IssueCredit(1))

sdk/messaging/azservicebus/internal/amqp_test_utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse,
9595
return r.Resp, r.Error
9696
}
9797

98+
func (r *FakeAMQPReceiver) LinkName() string {
99+
return "fakelink"
100+
}
101+
98102
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error {
99103
r.RequestedCredits += credit
100104

sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type AMQPReceiverCloser interface {
3838
type AMQPSender interface {
3939
Send(ctx context.Context, msg *amqp.Message) error
4040
MaxMessageSize() uint64
41+
LinkName() string
4142
}
4243

4344
// AMQPSenderCloser is implemented by *amqp.Sender

sdk/messaging/azservicebus/internal/mgmt.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
DeferredDisposition DispositionStatus = "defered"
3131
)
3232

33-
func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) {
33+
func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, linkName string, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) {
3434
const messagesField, messageField = "messages", "message"
3535

3636
backwardsMode := uint32(0)
@@ -50,6 +50,8 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive
5050
Value: values,
5151
}
5252

53+
addAssociatedLinkName(linkName, msg)
54+
5355
rsp, err := rpcLink.RPC(ctx, msg)
5456
if err != nil {
5557
return nil, err
@@ -108,7 +110,7 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive
108110
return transformedMessages, nil
109111
}
110112

111-
func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) {
113+
func PeekMessages(ctx context.Context, rpcLink RPCLink, linkName string, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) {
112114
const messagesField, messageField = "messages", "message"
113115

114116
msg := &amqp.Message{
@@ -121,6 +123,8 @@ func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64
121123
},
122124
}
123125

126+
addAssociatedLinkName(linkName, msg)
127+
124128
if deadline, ok := ctx.Deadline(); ok {
125129
msg.ApplicationProperties["server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
126130
}
@@ -218,9 +222,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken
218222
},
219223
}
220224

221-
if linkName != "" {
222-
renewRequestMsg.ApplicationProperties["associated-link-name"] = linkName
223-
}
225+
addAssociatedLinkName(linkName, renewRequestMsg)
224226

225227
response, err := rpcLink.RPC(ctx, renewRequestMsg)
226228

@@ -257,7 +259,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken
257259
}
258260

259261
// RenewSessionLocks renews a session lock.
260-
func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error) {
262+
func RenewSessionLock(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) (time.Time, error) {
261263
body := map[string]interface{}{
262264
"session-id": sessionID,
263265
}
@@ -269,6 +271,8 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t
269271
},
270272
}
271273

274+
addAssociatedLinkName(linkName, msg)
275+
272276
resp, err := rpcLink.RPC(ctx, msg)
273277

274278
if err != nil {
@@ -291,7 +295,7 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t
291295
}
292296

293297
// GetSessionState retrieves state associated with the session.
294-
func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error) {
298+
func GetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) ([]byte, error) {
295299
amqpMsg := &amqp.Message{
296300
Value: map[string]interface{}{
297301
"session-id": sessionID,
@@ -301,6 +305,8 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]
301305
},
302306
}
303307

308+
addAssociatedLinkName(linkName, amqpMsg)
309+
304310
resp, err := rpcLink.RPC(ctx, amqpMsg)
305311

306312
if err != nil {
@@ -334,7 +340,7 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]
334340
}
335341

336342
// SetSessionState sets the state associated with the session.
337-
func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error {
343+
func SetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string, state []byte) error {
338344
uuid, err := uuid.New()
339345

340346
if err != nil {
@@ -352,6 +358,8 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta
352358
},
353359
}
354360

361+
addAssociatedLinkName(linkName, amqpMsg)
362+
355363
resp, err := rpcLink.RPC(ctx, amqpMsg)
356364

357365
if err != nil {
@@ -368,7 +376,7 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta
368376
// SendDisposition allows you settle a message using the management link, rather than via your
369377
// *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated
370378
// with a link (ex: deferred messages).
371-
func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error {
379+
func SendDisposition(ctx context.Context, rpcLink RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error {
372380
if lockToken == nil {
373381
err := errors.New("lock token on the message is not set, thus cannot send disposition")
374382
return err
@@ -398,6 +406,8 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID,
398406
Value: value,
399407
}
400408

409+
addAssociatedLinkName(linkName, msg)
410+
401411
// no error, then it was successful
402412
_, err := rpcLink.RPC(ctx, msg)
403413
if err != nil {
@@ -409,7 +419,7 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID,
409419

410420
// ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers
411421
// that can be used to cancel each message.
412-
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) {
422+
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, linkName string, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) {
413423
if len(messages) <= 0 {
414424
return nil, errors.New("expected one or more messages")
415425
}
@@ -470,6 +480,8 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim
470480
},
471481
}
472482

483+
addAssociatedLinkName(linkName, msg)
484+
473485
if deadline, ok := ctx.Deadline(); ok {
474486
msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
475487
}
@@ -502,7 +514,7 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim
502514

503515
// CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery,
504516
// but have not yet ben enqueued.
505-
func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error {
517+
func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, linkName string, seq []int64) error {
506518
msg := &amqp.Message{
507519
ApplicationProperties: map[string]interface{}{
508520
"operation": "com.microsoft:cancel-scheduled-message",
@@ -512,6 +524,8 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64)
512524
},
513525
}
514526

527+
addAssociatedLinkName(linkName, msg)
528+
515529
if deadline, ok := ctx.Deadline(); ok {
516530
msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
517531
}
@@ -527,3 +541,13 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64)
527541

528542
return nil
529543
}
544+
545+
// addAssociatedLinkName adds the 'associated-link-name' application
546+
// property to the AMQP message. Setting this property associates
547+
// management link activity with a sender or receiver link, which can
548+
// prevent it from idling out.
549+
func addAssociatedLinkName(linkName string, msg *amqp.Message) {
550+
if linkName != "" {
551+
msg.ApplicationProperties["associated-link-name"] = linkName
552+
}
553+
}

sdk/messaging/azservicebus/internal/rpc_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ func (tester *rpcTester) Close(ctx context.Context) error {
207207
return nil
208208
}
209209

210+
func (tester *rpcTester) LinkName() string {
211+
return "hello"
212+
}
213+
210214
// receiver functions
211215

212216
func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error {

sdk/messaging/azservicebus/messageSettler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type CompleteMessageOptions struct {
6464
func (s *messageSettler) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error {
6565
return s.settleWithRetries(ctx, message, func(receiver internal.AMQPReceiver, rpcLink internal.RPCLink) error {
6666
if s.useManagementLink(message, receiver) {
67-
return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil)
67+
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil)
6868
} else {
6969
return receiver.AcceptMessage(ctx, message.rawAMQPMessage)
7070
}
@@ -93,7 +93,7 @@ func (s *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMe
9393
propertiesToModify = options.PropertiesToModify
9494
}
9595

96-
return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
96+
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
9797
}
9898

9999
var annotations amqp.Annotations
@@ -127,7 +127,7 @@ func (s *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMess
127127
propertiesToModify = options.PropertiesToModify
128128
}
129129

130-
return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
130+
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
131131
}
132132

133133
var annotations amqp.Annotations
@@ -184,7 +184,7 @@ func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *Receive
184184
propertiesToModify = options.PropertiesToModify
185185
}
186186

187-
return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
187+
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
188188
}
189189

190190
info := map[string]interface{}{

sdk/messaging/azservicebus/receiver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers
209209
var receivedMessages []*ReceivedMessage
210210

211211
err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveDeferredMessages", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
212-
amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, r.receiveMode, sequenceNumbers)
212+
amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, lwid.Receiver.LinkName(), r.receiveMode, sequenceNumbers)
213213

214214
if err != nil {
215215
return err
@@ -257,7 +257,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option
257257
updateInternalSequenceNumber = false
258258
}
259259

260-
messages, err := internal.PeekMessages(ctx, links.RPC, sequenceNumber, int32(maxMessageCount))
260+
messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount))
261261

262262
if err != nil {
263263
return err

sdk/messaging/azservicebus/sender.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type CancelScheduledMessagesOptions struct {
117117
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
118118
func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) error {
119119
err := s.links.Retry(ctx, EventSender, "CancelScheduledMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
120-
return internal.CancelScheduledMessages(ctx, lwv.RPC, sequenceNumbers)
120+
return internal.CancelScheduledMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), sequenceNumbers)
121121
}, s.retryOptions)
122122

123123
return internal.TransformError(err)
@@ -133,7 +133,7 @@ func (s *Sender) scheduleAMQPMessages(ctx context.Context, messages []*amqp.Mess
133133
var sequenceNumbers []int64
134134

135135
err := s.links.Retry(ctx, EventSender, "ScheduleMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
136-
sn, err := internal.ScheduleMessages(ctx, lwv.RPC, scheduledEnqueueTime, messages)
136+
sn, err := internal.ScheduleMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), scheduledEnqueueTime, messages)
137137

138138
if err != nil {
139139
return err

sdk/messaging/azservicebus/session_receiver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSess
199199
var sessionState []byte
200200

201201
err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "GetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
202-
s, err := internal.GetSessionState(ctx, lwv.RPC, sr.SessionID())
202+
s, err := internal.GetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID())
203203

204204
if err != nil {
205205
return err
@@ -221,7 +221,7 @@ type SetSessionStateOptions struct {
221221
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
222222
func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error {
223223
err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
224-
return internal.SetSessionState(ctx, lwv.RPC, sr.SessionID(), state)
224+
return internal.SetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID(), state)
225225
}, sr.inner.retryOptions)
226226

227227
return internal.TransformError(err)
@@ -237,7 +237,7 @@ type RenewSessionLockOptions struct {
237237
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
238238
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error {
239239
err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
240-
newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, *sr.sessionID)
240+
newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, lwv.Receiver.LinkName(), *sr.sessionID)
241241

242242
if err != nil {
243243
return err

0 commit comments

Comments
 (0)