Skip to content

Commit f4d16eb

Browse files
[azservicebus] Removing the idle tracking code. (Azure#19581)
Client-side idle tracking was added in v1.1.2, but it seems be causing issues for customers and is actually making their receiving _less_ reliable. I had previously removed it for just sessions, but I'm now also removing it for non-session receivers as well. I'm leaving the idle check code, just not hooked up for now, because it still seems like a good defensive feature but it requires more testing and stressing to be fit for production. This rolls back the changes in Azure#19465 (simple idle timer), Azure#19492 (ignore ctx on link close) and Azure#19506 (track idle across multiple receives).
1 parent 614f503 commit f4d16eb

File tree

15 files changed

+33
-421
lines changed

15 files changed

+33
-421
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
# Release History
22

3-
## 1.1.3 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 1.1.3 (2022-11-16)
84

95
### Bugs Fixed
106

11-
- Disable the idle link tracking on session links (#19571)
12-
13-
### Other Changes
7+
- Removing changes for client-side idle timer and closing without timeout. Combined these are
8+
causing issues with links not properly recovering or closing. Investigating an alternative
9+
for a future release.
1410

1511
## 1.1.2 (2022-11-08)
1612

sdk/messaging/azservicebus/client.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,9 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt
167167
id, cleanupOnClose := client.getCleanupForCloseable()
168168
receiver, err := newReceiver(newReceiverArgs{
169169
cleanupOnClose: cleanupOnClose,
170+
ns: client.namespace,
170171
entity: entity{Queue: queueName},
171172
getRecoveryKindFunc: internal.GetRecoveryKind,
172-
idleTimeout: 0,
173-
ns: client.namespace,
174173
retryOptions: client.retryOptions,
175174
}, options)
176175

@@ -187,10 +186,9 @@ func (client *Client) NewReceiverForSubscription(topicName string, subscriptionN
187186
id, cleanupOnClose := client.getCleanupForCloseable()
188187
receiver, err := newReceiver(newReceiverArgs{
189188
cleanupOnClose: cleanupOnClose,
189+
ns: client.namespace,
190190
entity: entity{Topic: topicName, Subscription: subscriptionName},
191191
getRecoveryKindFunc: internal.GetRecoveryKind,
192-
idleTimeout: 0,
193-
ns: client.namespace,
194192
retryOptions: client.retryOptions,
195193
}, options)
196194

sdk/messaging/azservicebus/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ module github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
22

33
go 1.18
44

5+
retract (
6+
v1.1.2 // Breaks customers in situations where close is slow/infinite.
7+
)
8+
59
require (
610
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0
711
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0

sdk/messaging/azservicebus/internal/amqpLinks.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,9 +468,7 @@ func (l *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error {
468468

469469
// close closes the link.
470470
// NOTE: No locking is done in this function, call `Close` if you require locking.
471-
func (l *AMQPLinksImpl) closeWithoutLocking(_ context.Context, permanent bool) error {
472-
ctx := context.Background()
473-
471+
func (l *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) error {
474472
if l.closedPermanently {
475473
return nil
476474
}

sdk/messaging/azservicebus/internal/amqp_test_utils.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ type FakeAMQPLinks struct {
4343

4444
Closed int
4545
CloseIfNeededCalled int
46-
CloseIfNeededArgs []error
47-
48-
GetFn func(ctx context.Context) (*LinksWithID, error)
4946

5047
// values to be returned for each `Get` call
5148
Revision LinkID
@@ -191,10 +188,6 @@ func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error) {
191188
case <-ctx.Done():
192189
return nil, ctx.Err()
193190
default:
194-
if l.GetFn != nil {
195-
return l.GetFn(ctx)
196-
}
197-
198191
return &LinksWithID{
199192
Sender: l.Sender,
200193
Receiver: l.Receiver,
@@ -225,7 +218,6 @@ func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error {
225218

226219
func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind {
227220
l.CloseIfNeededCalled++
228-
l.CloseIfNeededArgs = append(l.CloseIfNeededArgs, err)
229221
return GetRecoveryKind(err)
230222
}
231223

sdk/messaging/azservicebus/internal/errors.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,6 @@ func GetRecoveryKind(err error) RecoveryKind {
169169
return RecoveryKindFatal
170170
}
171171

172-
if IsLocalIdleError(err) {
173-
return RecoveryKindLink
174-
}
175-
176172
var netErr net.Error
177173

178174
// these are errors that can flow from the go-amqp connection to

sdk/messaging/azservicebus/internal/errors_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,3 @@ func Test_TransformError(t *testing.T) {
286286
// and it's okay, for convenience, to pass a nil.
287287
require.Nil(t, TransformError(nil))
288288
}
289-
290-
func Test_IdleError(t *testing.T) {
291-
require.Equal(t, RecoveryKindLink, GetRecoveryKind(localIdleError))
292-
}

sdk/messaging/azservicebus/internal/idle_check.go

Lines changed: 0 additions & 71 deletions
This file was deleted.

sdk/messaging/azservicebus/internal/idle_check_test.go

Lines changed: 0 additions & 112 deletions
This file was deleted.

sdk/messaging/azservicebus/internal/rpc.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1514
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1615
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
1716
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
18-
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1917
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
2018
)
2119

@@ -100,14 +98,6 @@ type RPCLinkArgs struct {
10098
LogEvent azlog.Event
10199
}
102100

103-
func closeOrLog(name string, closeable interface {
104-
Close(ctx context.Context) error
105-
}) {
106-
if err := closeable.Close(context.Background()); err != nil {
107-
log.Writef(exported.EventAuth, "Failed closing %s for RPC Link: %s", name, err.Error())
108-
}
109-
}
110-
111101
// NewRPCLink will build a new request response link
112102
func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
113103
session, err := args.Client.NewSession(ctx, nil)
@@ -118,7 +108,6 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
118108

119109
linkID, err := uuid.New()
120110
if err != nil {
121-
closeOrLog("session", session)
122111
return nil, err
123112
}
124113

@@ -140,7 +129,6 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
140129
nil,
141130
)
142131
if err != nil {
143-
closeOrLog("session", session)
144132
return nil, err
145133
}
146134

@@ -161,8 +149,11 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
161149

162150
receiver, err := session.NewReceiver(ctx, args.Address, receiverOpts)
163151
if err != nil {
164-
closeOrLog("sender", sender)
165-
closeOrLog("session", session)
152+
// make sure we close the sender
153+
clsCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
154+
defer cancel()
155+
156+
_ = sender.Close(clsCtx)
166157
return nil, err
167158
}
168159

@@ -331,11 +322,7 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, err
331322
}
332323

333324
// Close the link receiver, sender and session
334-
func (l *rpcLink) Close(_ context.Context) error {
335-
// we're finding, in practice, that allowing cancellations when cleaning up state
336-
// just results in inconsistencies. We'll cut cancellation off here for now.
337-
ctx := context.Background()
338-
325+
func (l *rpcLink) Close(ctx context.Context) error {
339326
l.rpcLinkCtxCancel()
340327

341328
if err := l.closeReceiver(ctx); err != nil {

0 commit comments

Comments
 (0)