Skip to content

Commit 63195b9

Browse files
[azservicebus] Allow link creation to be cancelled (Azure#17598)
AcceptNextSessionFor(Queue|Subscription) can block for a long time (server dependent) if there are no available sessions. HOWEVER, it was intended to be cancellable, which wasn't working. This is a simpler workaround until we get context support plumbed through go-amqp itself. Fixes Azure#17565
1 parent 4413113 commit 63195b9

File tree

11 files changed

+151
-43
lines changed

11 files changed

+151
-43
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
### Bugs Fixed
1414

15+
- Fixing issue where the AcceptNextSessionForQueue and AcceptNextSessionForSubscription
16+
couldn't be cancelled, forcing the user to wait for the service to timeout. (#17598)
17+
1518
### Other Changes
1619

1720
## 0.4.0 (2022-04-06)

sdk/messaging/azservicebus/client_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"time"
1414

1515
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
16+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1617
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
1719
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
1820
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
1921
"github.com/Azure/azure-sdk-for-go/sdk/messaging/internal/sas"
@@ -204,7 +206,7 @@ func TestNewClientNewReceiverNotFound(t *testing.T) {
204206
assertRPCNotFound(t, err)
205207
}
206208

207-
func TestNewClientNewSessionReceiverNotFound(t *testing.T) {
209+
func TestClientNewSessionReceiverNotFound(t *testing.T) {
208210
connectionString := test.GetConnectionString(t)
209211
client, err := NewClientFromConnectionString(connectionString, nil)
210212
require.NoError(t, err)
@@ -258,6 +260,29 @@ func TestClientCloseVsClosePermanently(t *testing.T) {
258260
require.Nil(t, sessionReceiver)
259261
}
260262

263+
func TestClientNewSessionReceiverCancel(t *testing.T) {
264+
// Both the session APIs create the receiver immediately however AcceptNextSession() has a quirk
265+
// where it takes an excessively long time.
266+
connectionString := test.GetConnectionString(t)
267+
268+
queue, cleanup := createQueue(t, connectionString, &admin.QueueProperties{
269+
RequiresSession: to.Ptr(true),
270+
})
271+
272+
defer cleanup()
273+
274+
client, err := NewClientFromConnectionString(connectionString, nil)
275+
require.NoError(t, err)
276+
277+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
278+
defer cancel()
279+
280+
// non-cancelled version
281+
receiver, err := client.AcceptNextSessionForQueue(ctx, queue, nil)
282+
require.ErrorIs(t, err, context.DeadlineExceeded)
283+
require.Nil(t, receiver)
284+
}
285+
261286
func TestNewClientUnitTests(t *testing.T) {
262287
t.Run("WithTokenCredential", func(t *testing.T) {
263288
fakeTokenCredential := struct{ azcore.TokenCredential }{}

sdk/messaging/azservicebus/internal/amqpLinks.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212

1313
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
14+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1415
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1516
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing"
1617
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
@@ -85,7 +86,7 @@ type AMQPLinksImpl struct {
8586
RPCLink RPCLink
8687

8788
// the AMQP session for either the 'sender' or 'receiver' link
88-
session AMQPSessionCloser
89+
session amqpwrap.AMQPSession
8990

9091
// these are populated by your `createLinkFunc` when you construct
9192
// the amqpLinks
@@ -104,7 +105,7 @@ type AMQPLinksImpl struct {
104105

105106
// CreateLinkFunc creates the links, using the given session. Typically you'll only create either an
106107
// *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
107-
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
108+
type CreateLinkFunc func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
108109

109110
type NewAMQPLinksArgs struct {
110111
NS NamespaceForAMQPLinks

sdk/messaging/azservicebus/internal/amqpLinks_test.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
15+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1516
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1617
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
1718
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
@@ -77,7 +78,7 @@ func TestAMQPLinksBasic(t *testing.T) {
7778
links := NewAMQPLinks(NewAMQPLinksArgs{
7879
NS: ns,
7980
EntityPath: entityPath,
80-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
81+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
8182
return newLinksForAMQPLinksTest(entityPath, session)
8283
},
8384
GetRecoveryKindFunc: GetRecoveryKind,
@@ -112,7 +113,7 @@ func TestAMQPLinksLive(t *testing.T) {
112113
links := NewAMQPLinks(NewAMQPLinksArgs{
113114
NS: ns,
114115
EntityPath: entityPath,
115-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
116+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
116117
createLinksCalled++
117118
return newLinksForAMQPLinksTest(entityPath, session)
118119
},
@@ -185,7 +186,7 @@ func TestAMQPLinksLiveRecoverLink(t *testing.T) {
185186
links := NewAMQPLinks(NewAMQPLinksArgs{
186187
NS: ns,
187188
EntityPath: entityPath,
188-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
189+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
189190
createLinksCalled++
190191
return newLinksForAMQPLinksTest(entityPath, session)
191192
},
@@ -223,7 +224,7 @@ func TestAMQPLinksLiveRace(t *testing.T) {
223224
links := NewAMQPLinks(NewAMQPLinksArgs{
224225
NS: ns,
225226
EntityPath: entityPath,
226-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
227+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
227228
createLinksCalled++
228229
return newLinksForAMQPLinksTest(entityPath, session)
229230
},
@@ -275,7 +276,7 @@ func TestAMQPLinksLiveRaceLink(t *testing.T) {
275276
links := NewAMQPLinks(NewAMQPLinksArgs{
276277
NS: ns,
277278
EntityPath: entityPath,
278-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
279+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
279280
createLinksCalled++
280281
return newLinksForAMQPLinksTest(entityPath, session)
281282
},
@@ -319,7 +320,7 @@ func TestAMQPLinksRetry(t *testing.T) {
319320
links := NewAMQPLinks(NewAMQPLinksArgs{
320321
NS: ns,
321322
EntityPath: entityPath,
322-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
323+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
323324
createLinksCalled++
324325
return newLinksForAMQPLinksTest(entityPath, session)
325326
},
@@ -361,7 +362,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {
361362
links := NewAMQPLinks(NewAMQPLinksArgs{
362363
NS: ns,
363364
EntityPath: entityPath,
364-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
365+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
365366
createLinksCalled++
366367
return newLinksForAMQPLinksTest(entityPath, session)
367368
},
@@ -377,7 +378,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {
377378
links2 := NewAMQPLinks(NewAMQPLinksArgs{
378379
NS: ns,
379380
EntityPath: entityPath,
380-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
381+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
381382
createLinksCalled2++
382383
return newLinksForAMQPLinksTest(entityPath, session)
383384
},
@@ -456,7 +457,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
456457
links := NewAMQPLinks(NewAMQPLinksArgs{
457458
NS: ns,
458459
EntityPath: "entityPath",
459-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
460+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
460461
return sender, receiver, nil
461462
},
462463
GetRecoveryKindFunc: GetRecoveryKind,
@@ -486,7 +487,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
486487
links := NewAMQPLinks(NewAMQPLinksArgs{
487488
NS: ns,
488489
EntityPath: "entityPath",
489-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
490+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
490491
return sender, receiver, nil
491492
},
492493
GetRecoveryKindFunc: GetRecoveryKind,
@@ -515,7 +516,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
515516
links := NewAMQPLinks(NewAMQPLinksArgs{
516517
NS: ns,
517518
EntityPath: "entityPath",
518-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
519+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
519520
return sender, receiver, nil
520521
},
521522
GetRecoveryKindFunc: GetRecoveryKind,
@@ -544,7 +545,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
544545
links := NewAMQPLinks(NewAMQPLinksArgs{
545546
NS: ns,
546547
EntityPath: "entityPath",
547-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
548+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
548549
return sender, receiver, nil
549550
},
550551
GetRecoveryKindFunc: GetRecoveryKind,
@@ -607,7 +608,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) {
607608
links := NewAMQPLinks(NewAMQPLinksArgs{
608609
NS: ns,
609610
EntityPath: "entityPath",
610-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
611+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
611612
return sender, receiver, nil
612613
},
613614
GetRecoveryKindFunc: GetRecoveryKind,
@@ -651,7 +652,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
651652
links := NewAMQPLinks(NewAMQPLinksArgs{
652653
NS: ns,
653654
EntityPath: "entityPath",
654-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
655+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
655656
return nil, receiver, nil
656657
},
657658
GetRecoveryKindFunc: GetRecoveryKind,
@@ -684,7 +685,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
684685
links := NewAMQPLinks(NewAMQPLinksArgs{
685686
NS: ns,
686687
EntityPath: "entityPath",
687-
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
688+
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
688689
return nil, receiver, nil
689690
}, GetRecoveryKindFunc: GetRecoveryKind,
690691
})
@@ -710,7 +711,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
710711
})
711712
}
712713

713-
func newLinksForAMQPLinksTest(entityPath string, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
714+
func newLinksForAMQPLinksTest(entityPath string, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
714715
receiveMode := amqp.ModeSecond
715716

716717
opts := []amqp.LinkOption{

sdk/messaging/azservicebus/internal/amqp_test_utils.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99

1010
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
11+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1112
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1213
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
1314
"github.com/Azure/go-amqp"
@@ -18,7 +19,7 @@ type FakeNS struct {
1819
recovered uint64
1920
clientRevisions []uint64
2021
RPCLink RPCLink
21-
Session AMQPSessionCloser
22+
Session amqpwrap.AMQPSession
2223
AMQPLinks *FakeAMQPLinks
2324

2425
CloseCalled int
@@ -30,7 +31,10 @@ type FakeAMQPSender struct {
3031
}
3132

3233
type FakeAMQPSession struct {
33-
AMQPSessionCloser
34+
amqpwrap.AMQPSession
35+
36+
NewReceiverFn func(opts ...amqp.LinkOption) (AMQPReceiverCloser, error)
37+
3438
closed int
3539
}
3640

@@ -54,7 +58,8 @@ type FakeAMQPLinks struct {
5458

5559
type FakeAMQPReceiver struct {
5660
AMQPReceiver
57-
Closed int
61+
Closed int
62+
CloseFn func(ctx context.Context) error
5863

5964
DrainCalled int
6065
DrainCreditImpl func(ctx context.Context) error
@@ -139,6 +144,11 @@ func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error
139144

140145
func (r *FakeAMQPReceiver) Close(ctx context.Context) error {
141146
r.Closed++
147+
148+
if r.CloseFn != nil {
149+
return r.CloseFn(ctx)
150+
}
151+
142152
return nil
143153
}
144154

@@ -189,6 +199,10 @@ func (s *FakeAMQPSender) Close(ctx context.Context) error {
189199
return nil
190200
}
191201

202+
func (s *FakeAMQPSession) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) {
203+
return s.NewReceiverFn(opts...)
204+
}
205+
192206
func (s *FakeAMQPSession) Close(ctx context.Context) error {
193207
s.closed++
194208
return nil
@@ -207,7 +221,7 @@ func (ns *FakeNS) GetEntityAudience(entityPath string) string {
207221
return fmt.Sprintf("audience: %s", entityPath)
208222
}
209223

210-
func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) {
224+
func (ns *FakeNS) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) {
211225
return ns.Session, ns.recovered + 100, nil
212226
}
213227

sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,22 +78,22 @@ func (w *AMQPClientWrapper) NewSession(opts ...amqp.SessionOption) (AMQPSession,
7878
}
7979

8080
return &AMQPSessionWrapper{
81-
inner: sess,
81+
Inner: sess,
8282
}, nil
8383
}
8484

8585
type AMQPSessionWrapper struct {
86-
inner *amqp.Session
86+
Inner *amqp.Session
8787
}
8888

8989
func (w *AMQPSessionWrapper) Close(ctx context.Context) error {
90-
return w.inner.Close(ctx)
90+
return w.Inner.Close(ctx)
9191
}
9292

9393
func (w *AMQPSessionWrapper) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) {
94-
return w.inner.NewReceiver(opts...)
94+
return w.Inner.NewReceiver(opts...)
9595
}
9696

9797
func (w *AMQPSessionWrapper) NewSender(opts ...amqp.LinkOption) (AMQPSenderCloser, error) {
98-
return w.inner.NewSender(opts...)
98+
return w.Inner.NewSender(opts...)
9999
}

sdk/messaging/azservicebus/internal/namespace.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type NamespaceWithNewAMQPLinks interface {
7070
// NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
7171
type NamespaceForAMQPLinks interface {
7272
NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
73-
NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
73+
NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
7474
NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
7575
GetEntityAudience(entityPath string) string
7676
Recover(ctx context.Context, clientRevision uint64) (bool, error)
@@ -192,7 +192,7 @@ func (ns *Namespace) newClient(ctx context.Context) (*amqp.Client, error) {
192192

193193
// NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.
194194
// Returns a closeable AMQP session and the current client revision.
195-
func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) {
195+
func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) {
196196
client, clientRevision, err := ns.GetAMQPClientImpl(ctx)
197197

198198
if err != nil {
@@ -205,7 +205,7 @@ func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uin
205205
return nil, 0, err
206206
}
207207

208-
return session, clientRevision, err
208+
return &amqpwrap.AMQPSessionWrapper{Inner: session}, clientRevision, err
209209
}
210210

211211
// NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.

0 commit comments

Comments
 (0)