Skip to content

Commit fd331bf

Browse files
[azservicebus] RetryOptions were not being propagated throughout all the clients Azure#17695
RetryOptions were not being propagated throughout all the clients (mismatch in old vs new system). Fixed and added tests for the various spots that inherit these options to make sure they're properly getting copied over: - Sender - All the session receiver functions (AcceptNext, Accept) - Receivers - Namespace Fixes Azure#17686
1 parent 16caf89 commit fd331bf

File tree

9 files changed

+220
-42
lines changed

9 files changed

+220
-42
lines changed

sdk/messaging/azservicebus/client.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) {
133133
}
134134

135135
if options != nil {
136+
client.retryOptions = options.RetryOptions
137+
136138
if options.TLSConfig != nil {
137139
nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(options.TLSConfig))
138140
}
@@ -160,6 +162,7 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt
160162
ns: client.namespace,
161163
entity: entity{Queue: queueName},
162164
getRecoveryKindFunc: internal.GetRecoveryKind,
165+
retryOptions: client.retryOptions,
163166
}, options)
164167

165168
if err != nil {
@@ -178,6 +181,7 @@ func (client *Client) NewReceiverForSubscription(topicName string, subscriptionN
178181
ns: client.namespace,
179182
entity: entity{Topic: topicName, Subscription: subscriptionName},
180183
getRecoveryKindFunc: internal.GetRecoveryKind,
184+
retryOptions: client.retryOptions,
181185
}, options)
182186

183187
if err != nil {
@@ -200,7 +204,8 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions)
200204
ns: client.namespace,
201205
queueOrTopic: queueOrTopic,
202206
cleanupOnClose: cleanupOnClose,
203-
}, client.retryOptions)
207+
retryOptions: client.retryOptions,
208+
})
204209

205210
if err != nil {
206211
return nil, err
@@ -216,11 +221,13 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin
216221
id, cleanupOnClose := client.getCleanupForCloseable()
217222
sessionReceiver, err := newSessionReceiver(
218223
ctx,
219-
&sessionID,
220-
client.namespace,
221-
entity{Queue: queueName},
222-
cleanupOnClose,
223-
toReceiverOptions(options))
224+
newSessionReceiverArgs{
225+
sessionID: &sessionID,
226+
ns: client.namespace,
227+
entity: entity{Queue: queueName},
228+
cleanupOnClose: cleanupOnClose,
229+
retryOptions: client.retryOptions,
230+
}, toReceiverOptions(options))
224231

225232
if err != nil {
226233
return nil, err
@@ -240,10 +247,13 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam
240247
id, cleanupOnClose := client.getCleanupForCloseable()
241248
sessionReceiver, err := newSessionReceiver(
242249
ctx,
243-
&sessionID,
244-
client.namespace,
245-
entity{Topic: topicName, Subscription: subscriptionName},
246-
cleanupOnClose,
250+
newSessionReceiverArgs{
251+
sessionID: &sessionID,
252+
ns: client.namespace,
253+
entity: entity{Topic: topicName, Subscription: subscriptionName},
254+
cleanupOnClose: cleanupOnClose,
255+
retryOptions: client.retryOptions,
256+
},
247257
toReceiverOptions(options))
248258

249259
if err != nil {
@@ -264,11 +274,13 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName s
264274
id, cleanupOnClose := client.getCleanupForCloseable()
265275
sessionReceiver, err := newSessionReceiver(
266276
ctx,
267-
nil,
268-
client.namespace,
269-
entity{Queue: queueName},
270-
cleanupOnClose,
271-
toReceiverOptions(options))
277+
newSessionReceiverArgs{
278+
sessionID: nil,
279+
ns: client.namespace,
280+
entity: entity{Queue: queueName},
281+
cleanupOnClose: cleanupOnClose,
282+
retryOptions: client.retryOptions,
283+
}, toReceiverOptions(options))
272284

273285
if err != nil {
274286
return nil, err
@@ -288,11 +300,13 @@ func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topi
288300
id, cleanupOnClose := client.getCleanupForCloseable()
289301
sessionReceiver, err := newSessionReceiver(
290302
ctx,
291-
nil,
292-
client.namespace,
293-
entity{Topic: topicName, Subscription: subscriptionName},
294-
cleanupOnClose,
295-
toReceiverOptions(options))
303+
newSessionReceiverArgs{
304+
sessionID: nil,
305+
ns: client.namespace,
306+
entity: entity{Topic: topicName, Subscription: subscriptionName},
307+
cleanupOnClose: cleanupOnClose,
308+
retryOptions: client.retryOptions,
309+
}, toReceiverOptions(options))
296310

297311
if err != nil {
298312
return nil, err

sdk/messaging/azservicebus/client_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,76 @@ func TestClientNewSessionReceiverCancel(t *testing.T) {
283283
require.Nil(t, receiver)
284284
}
285285

286+
func TestClientPropagatesRetryOptionsForSessions(t *testing.T) {
287+
connectionString := test.GetConnectionString(t)
288+
289+
queue, cleanupQueue := createQueue(t, connectionString, &admin.QueueProperties{
290+
RequiresSession: to.Ptr(true),
291+
})
292+
293+
defer cleanupQueue()
294+
295+
topic, cleanupTopic := createSubscription(t, connectionString, nil, &admin.SubscriptionProperties{
296+
RequiresSession: to.Ptr(true),
297+
})
298+
299+
defer cleanupTopic()
300+
301+
expectedRetryOptions := RetryOptions{
302+
MaxRetries: 1,
303+
RetryDelay: time.Second,
304+
MaxRetryDelay: time.Millisecond,
305+
}
306+
307+
client, err := NewClientFromConnectionString(connectionString, &ClientOptions{
308+
RetryOptions: expectedRetryOptions,
309+
})
310+
require.NoError(t, err)
311+
312+
actualNS := client.namespace.(*internal.Namespace)
313+
require.Equal(t, expectedRetryOptions, actualNS.RetryOptions)
314+
315+
queueSender, err := client.NewSender(queue, nil)
316+
require.NoError(t, err)
317+
318+
topicSender, err := client.NewSender(topic, nil)
319+
require.NoError(t, err)
320+
321+
err = queueSender.SendMessage(context.Background(), &Message{
322+
SessionID: to.Ptr("hello"),
323+
}, nil)
324+
require.NoError(t, err)
325+
326+
err = topicSender.SendMessage(context.Background(), &Message{
327+
SessionID: to.Ptr("hello"),
328+
}, nil)
329+
require.NoError(t, err)
330+
331+
sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queue, "hello", nil)
332+
require.NoError(t, err)
333+
require.NoError(t, sessionReceiver.Close(context.Background()))
334+
335+
require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)
336+
337+
sessionReceiver, err = client.AcceptSessionForSubscription(context.Background(), topic, "sub", "hello", nil)
338+
require.NoError(t, err)
339+
require.NoError(t, sessionReceiver.Close(context.Background()))
340+
341+
require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)
342+
343+
sessionReceiver, err = client.AcceptNextSessionForQueue(context.Background(), queue, nil)
344+
require.NoError(t, err)
345+
require.NoError(t, sessionReceiver.Close(context.Background()))
346+
347+
require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)
348+
349+
sessionReceiver, err = client.AcceptNextSessionForSubscription(context.Background(), topic, "sub", nil)
350+
require.NoError(t, err)
351+
require.NoError(t, sessionReceiver.Close(context.Background()))
352+
353+
require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)
354+
}
355+
286356
func TestNewClientUnitTests(t *testing.T) {
287357
t.Run("WithTokenCredential", func(t *testing.T) {
288358
fakeTokenCredential := struct{ azcore.TokenCredential }{}
@@ -361,6 +431,67 @@ func TestNewClientUnitTests(t *testing.T) {
361431
require.Empty(t, client.links)
362432
require.EqualValues(t, 1, ns.AMQPLinks.Closed)
363433
})
434+
435+
t.Run("RetryOptionsArePropagated", func(t *testing.T) {
436+
// retry options are passed and copied along several routes, just make sure it's properly propagated.
437+
// NOTE: session receivers are checked in a separate test because they require actual SB access.
438+
client, err := NewClient("fake.something", struct{ azcore.TokenCredential }{}, &ClientOptions{
439+
RetryOptions: RetryOptions{
440+
MaxRetries: 101,
441+
RetryDelay: 6 * time.Hour,
442+
MaxRetryDelay: 12 * time.Hour,
443+
},
444+
})
445+
446+
client.namespace = &internal.FakeNS{
447+
AMQPLinks: &internal.FakeAMQPLinks{
448+
Receiver: &internal.FakeAMQPReceiver{},
449+
},
450+
}
451+
452+
require.NoError(t, err)
453+
454+
require.Equal(t, RetryOptions{
455+
MaxRetries: 101,
456+
RetryDelay: 6 * time.Hour,
457+
MaxRetryDelay: 12 * time.Hour,
458+
}, client.retryOptions)
459+
460+
sender, err := client.NewSender("hello", nil)
461+
require.NoError(t, err)
462+
463+
require.Equal(t, RetryOptions{
464+
MaxRetries: 101,
465+
RetryDelay: 6 * time.Hour,
466+
MaxRetryDelay: 12 * time.Hour,
467+
}, sender.retryOptions)
468+
469+
receiver, err := client.NewReceiverForQueue("hello", nil)
470+
require.NoError(t, err)
471+
472+
require.Equal(t, RetryOptions{
473+
MaxRetries: 101,
474+
RetryDelay: 6 * time.Hour,
475+
MaxRetryDelay: 12 * time.Hour,
476+
}, receiver.retryOptions)
477+
478+
actualSettler := receiver.settler.(*messageSettler)
479+
480+
require.Equal(t, RetryOptions{
481+
MaxRetries: 101,
482+
RetryDelay: 6 * time.Hour,
483+
MaxRetryDelay: 12 * time.Hour,
484+
}, actualSettler.retryOptions)
485+
486+
subscriptionReceiver, err := client.NewReceiverForSubscription("hello", "world", nil)
487+
require.NoError(t, err)
488+
489+
require.Equal(t, RetryOptions{
490+
MaxRetries: 101,
491+
RetryDelay: 6 * time.Hour,
492+
MaxRetryDelay: 12 * time.Hour,
493+
}, subscriptionReceiver.retryOptions)
494+
})
364495
}
365496

366497
func assertRPCNotFound(t *testing.T, err error) {

sdk/messaging/azservicebus/internal/namespace.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ type (
4545

4646
newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)
4747

48-
retryOptions exported.RetryOptions
48+
// NOTE: exported only so it can be checked in a test
49+
RetryOptions exported.RetryOptions
4950

5051
clientMu sync.RWMutex
5152
client *amqp.Client
@@ -133,7 +134,7 @@ func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredentia
133134

134135
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption {
135136
return func(ns *Namespace) error {
136-
ns.retryOptions = retryOptions
137+
ns.RetryOptions = retryOptions
137138
return nil
138139
}
139140
}
@@ -407,7 +408,7 @@ func (ns *Namespace) startNegotiateClaimRenewer(ctx context.Context,
407408

408409
expiresOn = tmpExpiresOn
409410
return nil
410-
}, IsFatalSBError, ns.retryOptions)
411+
}, IsFatalSBError, ns.RetryOptions)
411412

412413
if err == nil {
413414
break

sdk/messaging/azservicebus/internal/namespace_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestNamespaceNegotiateClaim(t *testing.T) {
3333
expires := time.Now().Add(24 * time.Hour)
3434

3535
ns := &Namespace{
36-
retryOptions: retryOptionsOnlyOnce,
36+
RetryOptions: retryOptionsOnlyOnce,
3737
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}),
3838
}
3939

@@ -78,7 +78,7 @@ func TestNamespaceNegotiateClaimRenewal(t *testing.T) {
7878
expires := time.Now().Add(24 * time.Hour)
7979

8080
ns := &Namespace{
81-
retryOptions: retryOptionsOnlyOnce,
81+
RetryOptions: retryOptionsOnlyOnce,
8282
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}),
8383
}
8484

@@ -156,7 +156,7 @@ func TestNamespaceNegotiateClaimFailsToGetClient(t *testing.T) {
156156

157157
func TestNamespaceNegotiateClaimNonRenewableToken(t *testing.T) {
158158
ns := &Namespace{
159-
retryOptions: retryOptionsOnlyOnce,
159+
RetryOptions: retryOptionsOnlyOnce,
160160
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{
161161
// credentials that don't renew return a zero-initialized time.
162162
expires: time.Time{},

sdk/messaging/azservicebus/liveTestHelpers_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,35 @@ func createQueue(t *testing.T, connectionString string, queueProperties *admin.Q
6262
}
6363
}
6464

65+
// createSubscription creates a queue, automatically setting it to delete on idle in 5 minutes.
66+
func createSubscription(t *testing.T, connectionString string, topicProperties *admin.TopicProperties, subscriptionProperties *admin.SubscriptionProperties) (string, func()) {
67+
nanoSeconds := time.Now().UnixNano()
68+
topicName := fmt.Sprintf("topic-%X", nanoSeconds)
69+
70+
adminClient, err := admin.NewClientFromConnectionString(connectionString, nil)
71+
require.NoError(t, err)
72+
73+
if topicProperties == nil {
74+
topicProperties = &admin.TopicProperties{}
75+
}
76+
77+
autoDeleteOnIdle := "PT5M"
78+
topicProperties.AutoDeleteOnIdle = &autoDeleteOnIdle
79+
80+
_, err = adminClient.CreateTopic(context.Background(), topicName, &admin.CreateTopicOptions{
81+
Properties: topicProperties,
82+
})
83+
require.NoError(t, err)
84+
85+
_, err = adminClient.CreateSubscription(context.Background(), topicName, "sub", &admin.CreateSubscriptionOptions{Properties: subscriptionProperties})
86+
require.NoError(t, err)
87+
88+
return topicName, func() {
89+
_, err := adminClient.DeleteTopic(context.Background(), topicName, nil)
90+
require.NoError(t, err)
91+
}
92+
}
93+
6594
func deleteQueue(t *testing.T, ac *admin.Client, queueName string) {
6695
_, err := ac.DeleteQueue(context.Background(), queueName, nil)
6796
require.NoError(t, err)

sdk/messaging/azservicebus/receiver.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,11 @@ type ReceiverOptions struct {
8181
// SubQueue should be set to connect to the sub queue (ex: dead letter queue)
8282
// of the queue or subscription.
8383
SubQueue SubQueue
84-
85-
retryOptions RetryOptions
8684
}
8785

8886
const defaultLinkRxBuffer = 2048
8987

9088
func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverOptions) error {
91-
9289
if options == nil {
9390
receiver.receiveMode = ReceiveModePeekLock
9491
} else {
@@ -101,8 +98,6 @@ func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverO
10198
if err := entity.SetSubQueue(options.SubQueue); err != nil {
10299
return err
103100
}
104-
105-
receiver.retryOptions = options.retryOptions
106101
}
107102

108103
entityPath, err := entity.String()
@@ -121,6 +116,7 @@ type newReceiverArgs struct {
121116
cleanupOnClose func()
122117
getRecoveryKindFunc func(err error) internal.RecoveryKind
123118
newLinkFn func(ctx context.Context, session amqpwrap.AMQPSession) (internal.AMQPSenderCloser, internal.AMQPReceiverCloser, error)
119+
retryOptions RetryOptions
124120
}
125121

126122
func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, error) {
@@ -133,6 +129,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
133129
cleanupOnClose: args.cleanupOnClose,
134130
defaultDrainTimeout: time.Second,
135131
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
132+
retryOptions: args.retryOptions,
136133
}
137134

138135
if err := applyReceiverOptions(receiver, &args.entity, options); err != nil {

0 commit comments

Comments
 (0)