Skip to content

Commit 76388be

Browse files
[azservicebus] Update go-amqp (Azure#18345)
Updating our internally vendored go-amqp to 18d378101570d5469d237799a76b77c0bf0c6fe3 This mostly impacted us because the connection error is no longer a sentinel error.
1 parent 194c59a commit 76388be

File tree

12 files changed

+76
-56
lines changed

12 files changed

+76
-56
lines changed

sdk/messaging/azservicebus/internal/amqpLinks_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,22 @@ func (pe fakeNetError) Timeout() bool { return pe.timeout }
3333
func (pe fakeNetError) Temporary() bool { return pe.temp }
3434
func (pe fakeNetError) Error() string { return "Fake but very permanent error" }
3535

36-
func assertFailedLinks(t *testing.T, lwid *LinksWithID, expectedErr error, expectedRPCError error) {
36+
func assertFailedLinks[T error, T2 error](t *testing.T, lwid *LinksWithID, expectedErr T, expectedRPCError T2) {
3737
err := lwid.Sender.Send(context.TODO(), &amqp.Message{
3838
Data: [][]byte{
3939
{0},
4040
},
4141
})
42+
43+
require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr))
4244
require.ErrorIs(t, err, expectedErr)
4345

4446
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
45-
require.ErrorIs(t, err, expectedRPCError)
47+
require.True(t, errors.Is(err, expectedRPCError) || errors.As(err, &expectedRPCError))
4648

4749
msg, err := lwid.Receiver.Receive(context.TODO())
4850
require.ErrorIs(t, err, expectedErr)
51+
require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr))
4952
require.Nil(t, msg)
5053

5154
}
@@ -126,7 +129,7 @@ func TestAMQPLinksLive(t *testing.T) {
126129
}()
127130

128131
require.EqualValues(t, 0, createLinksCalled)
129-
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, amqp.ErrConnClosed))
132+
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{}))
130133
require.EqualValues(t, 1, createLinksCalled)
131134

132135
lwr, err := links.Get(context.Background())
@@ -138,10 +141,10 @@ func TestAMQPLinksLive(t *testing.T) {
138141
require.NoError(t, amqpClient.Close())
139142

140143
// all the links are dead because the connection is dead.
141-
assertFailedLinks(t, lwr, amqp.ErrConnClosed, amqp.ErrConnClosed)
144+
assertFailedLinks(t, lwr, &amqp.ConnectionError{}, &amqp.ConnectionError{})
142145

143146
// now we'll recover, which should recreate everything
144-
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, amqp.ErrConnClosed))
147+
require.NoError(t, links.RecoverIfNeeded(context.Background(), lwr.ID, &amqp.ConnectionError{}))
145148
require.EqualValues(t, 2, createLinksCalled)
146149

147150
lwr, err = links.Get(context.Background())
@@ -199,7 +202,7 @@ func TestAMQPLinksLiveRecoverLink(t *testing.T) {
199202
}()
200203

201204
require.EqualValues(t, 0, createLinksCalled)
202-
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, amqp.ErrConnClosed))
205+
require.NoError(t, links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{}))
203206
require.EqualValues(t, 1, createLinksCalled)
204207

205208
lwr, err := links.Get(context.Background())
@@ -242,7 +245,7 @@ func TestAMQPLinksLiveRace(t *testing.T) {
242245
wg.Add(1)
243246
go func() {
244247
defer wg.Done()
245-
err := links.RecoverIfNeeded(context.Background(), LinkID{}, amqp.ErrConnClosed)
248+
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{})
246249
require.NoError(t, err)
247250
}()
248251
}
@@ -334,7 +337,7 @@ func TestAMQPLinksRetry(t *testing.T) {
334337

335338
err = links.Retry(context.Background(), log.Event("NotUsed"), "NotUsed", func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error {
336339
// force recoveries
337-
return amqp.ErrConnClosed
340+
return &amqp.ConnectionError{}
338341
}, exported.RetryOptions{
339342
MaxRetries: 2,
340343
// note: omitting MaxRetries just to give a sanity check that
@@ -343,7 +346,8 @@ func TestAMQPLinksRetry(t *testing.T) {
343346
MaxRetryDelay: time.Millisecond,
344347
})
345348

346-
require.ErrorIs(t, err, amqp.ErrConnClosed)
349+
var connErr *amqp.ConnectionError
350+
require.ErrorAs(t, err, &connErr)
347351
require.EqualValues(t, 3, createLinksCalled)
348352
}
349353

@@ -530,7 +534,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
530534
_, err := links.Get(context.Background())
531535
require.NoError(t, err)
532536

533-
rk := links.CloseIfNeeded(context.Background(), amqp.ErrConnClosed)
537+
rk := links.CloseIfNeeded(context.Background(), &amqp.ConnectionError{})
534538
require.Equal(t, RecoveryKindConn, rk)
535539
require.Equal(t, 1, receiver.Closed)
536540
require.Equal(t, 1, sender.Closed)
@@ -577,7 +581,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) {
577581
{Err: nil, Attempts: []int32{0}},
578582

579583
// connection related or unknown failures happen, all attempts exhausted
580-
{Err: amqp.ErrConnClosed, Attempts: []int32{0, 1, 2, 3}},
584+
{Err: &amqp.ConnectionError{}, Attempts: []int32{0, 1, 2, 3}},
581585
{Err: errors.New("unknown error"), Attempts: []int32{0, 1, 2, 3}},
582586

583587
// fatal errors don't retry at all.
@@ -698,7 +702,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
698702
endCapture := test.CaptureLogsForTest()
699703
defer endCapture()
700704

701-
err := links.RecoverIfNeeded(context.Background(), LinkID{}, amqp.ErrConnClosed)
705+
err := links.RecoverIfNeeded(context.Background(), LinkID{}, &amqp.ConnectionError{})
702706
require.NoError(t, err)
703707

704708
messages := endCapture()

sdk/messaging/azservicebus/internal/errors.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ func GetRecoveryKind(err error) RecoveryKind {
173173
return RecoveryKindLink
174174
}
175175

176-
if errors.Is(err, amqp.ErrConnClosed) ||
176+
var connErr *amqp.ConnectionError
177+
178+
if errors.As(err, &connErr) ||
177179
// session closures appear to leak through when the connection itself is going down.
178180
errors.Is(err, amqp.ErrSessionClosed) {
179181
return RecoveryKindConn

sdk/messaging/azservicebus/internal/errors_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func Test_recoveryKind(t *testing.T) {
115115
}
116116

117117
t.Run("sentinel errors", func(t *testing.T) {
118-
rk := GetRecoveryKind(amqp.ErrConnClosed)
118+
rk := GetRecoveryKind(&amqp.ConnectionError{})
119119
require.EqualValues(t, RecoveryKindConn, rk)
120120
})
121121
})
@@ -188,7 +188,7 @@ func Test_ServiceBusError_ConnectionRecoveryNeeded(t *testing.T) {
188188
var connErrors = []error{
189189
&amqp.Error{Condition: amqp.ErrorConnectionForced},
190190
&amqp.Error{Condition: amqp.ErrorInternalError},
191-
amqp.ErrConnClosed,
191+
&amqp.ConnectionError{},
192192
amqp.ErrSessionClosed,
193193
io.EOF,
194194
fakeNetError{temp: true},
@@ -274,7 +274,7 @@ func Test_TransformError(t *testing.T) {
274274
require.ErrorAs(t, err, &asExportedErr)
275275
require.Equal(t, exported.CodeConnectionLost, asExportedErr.Code)
276276

277-
err = TransformError(amqp.ErrConnClosed)
277+
err = TransformError(&amqp.ConnectionError{})
278278
require.ErrorAs(t, err, &asExportedErr)
279279
require.Equal(t, exported.CodeConnectionLost, asExportedErr.Code)
280280

sdk/messaging/azservicebus/internal/go-amqp/conn.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"encoding/binary"
99
"errors"
1010
"fmt"
11-
"io"
1211
"math"
1312
"net"
1413
"net/url"
@@ -362,7 +361,10 @@ func (c *conn) Start() error {
362361
func (c *conn) Close() error {
363362
c.closeMuxOnce.Do(func() { close(c.closeMux) })
364363
err := c.Err()
365-
if err == ErrConnClosed {
364+
var connErr *ConnectionError
365+
if errors.As(err, &connErr) && connErr.inner == nil {
366+
// an empty ConnectionError means the connection was closed by the caller
367+
// or as requested by the peer and no error was provided in the close frame.
366368
return nil
367369
}
368370
return err
@@ -388,6 +390,7 @@ func (c *conn) close() {
388390
err := c.net.Close()
389391
switch {
390392
// conn.err already set
393+
// TODO: err info is lost, log it?
391394
case c.err != nil:
392395

393396
// conn.err not set and c.net.Close() returned a non-nil error
@@ -396,7 +399,6 @@ func (c *conn) close() {
396399

397400
// no errors
398401
default:
399-
c.err = ErrConnClosed
400402
}
401403

402404
// check rxDone after closing net, otherwise may block
@@ -409,7 +411,7 @@ func (c *conn) close() {
409411
func (c *conn) Err() error {
410412
c.errMu.Lock()
411413
defer c.errMu.Unlock()
412-
return c.err
414+
return &ConnectionError{inner: c.err}
413415
}
414416

415417
// mux is started in it's own goroutine after initial connection establishment.
@@ -456,8 +458,6 @@ func (c *conn) mux() {
456458
case *frames.PerformClose:
457459
if body.Error != nil {
458460
c.err = body.Error
459-
} else {
460-
c.err = ErrConnClosed
461461
}
462462
return
463463

@@ -481,7 +481,7 @@ func (c *conn) mux() {
481481
case *frames.PerformEnd:
482482
session, ok = sessionsByRemoteChannel[fr.Channel]
483483
if !ok {
484-
c.err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel", fr.Body, fr.Channel)
484+
c.err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel (PerformEnd)", fr.Body, fr.Channel)
485485
break
486486
}
487487
// we MUST remove the remote channel from our map as soon as we receive
@@ -657,7 +657,7 @@ func (c *conn) connReader() {
657657
// parse the frame
658658
b, ok := buf.Next(bodySize)
659659
if !ok {
660-
c.connErr <- io.EOF
660+
c.connErr <- fmt.Errorf("buffer EOF; requested bytes: %d, actual size: %d", bodySize, buf.Len())
661661
return
662662
}
663663

@@ -703,6 +703,7 @@ func (c *conn) connWriter() {
703703
var err error
704704
for {
705705
if err != nil {
706+
debug(1, "connWriter error: %v", err)
706707
c.connErr <- err
707708
return
708709
}
@@ -868,7 +869,7 @@ func (c *conn) readProtoHeader() (protoHeader, error) {
868869
case fr := <-c.rxFrame:
869870
return p, fmt.Errorf("readProtoHeader: unexpected frame %#v", fr)
870871
case <-deadline:
871-
return p, ErrTimeout
872+
return p, errors.New("amqp: timeout waiting for response")
872873
}
873874
}
874875

@@ -1037,7 +1038,7 @@ func (c *conn) readFrame() (frames.Frame, error) {
10371038
case p := <-c.rxProto:
10381039
return fr, fmt.Errorf("unexpected protocol header %#v", p)
10391040
case <-deadline:
1040-
return fr, ErrTimeout
1041+
return fr, errors.New("amqp: timeout waiting for response")
10411042
}
10421043
}
10431044

sdk/messaging/azservicebus/internal/go-amqp/errors.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,6 @@ func (e *DetachError) Error() string {
6363

6464
// Errors
6565
var (
66-
ErrTimeout = errors.New("amqp: timeout waiting for response")
67-
68-
// ErrConnClosed is propagated to Session and Senders/Receivers
69-
// when Client.Close() is called or the server closes the connection
70-
// without specifying an error.
71-
ErrConnClosed = errors.New("amqp: connection closed")
72-
7366
// ErrSessionClosed is propagated to Sender/Receivers
7467
// when Session.Close() is called.
7568
ErrSessionClosed = errors.New("amqp: session closed")
@@ -79,6 +72,19 @@ var (
7972
ErrLinkClosed = errors.New("amqp: link closed")
8073
)
8174

75+
// ConnectionError is propagated to Session and Senders/Receivers
76+
// when the connection has been closed or is no longer functional.
77+
type ConnectionError struct {
78+
inner error
79+
}
80+
81+
func (c *ConnectionError) Error() string {
82+
if c.inner == nil {
83+
return "amqp: connection closed"
84+
}
85+
return c.inner.Error()
86+
}
87+
8288
// Default link options
8389
const (
8490
DefaultLinkCredit = 1

sdk/messaging/azservicebus/internal/go-amqp/internal/frames/frames.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,7 @@ func (si *SASLInit) frameBody() {}
13531353
func (si *SASLInit) Marshal(wr *buffer.Buffer) error {
13541354
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLInit, []encoding.MarshalField{
13551355
{Value: &si.Mechanism, Omit: false},
1356-
{Value: &si.InitialResponse, Omit: len(si.InitialResponse) == 0},
1356+
{Value: &si.InitialResponse, Omit: false},
13571357
{Value: &si.Hostname, Omit: len(si.Hostname) == 0},
13581358
})
13591359
}

sdk/messaging/azservicebus/internal/go-amqp/link.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
237237
l.Messages = make(chan Message, l.receiver.maxCredit)
238238
l.unsettledMessages = map[string]struct{}{}
239239
// copy the received filter values
240-
l.Source.Filter = resp.Source.Filter
240+
if resp.Source != nil {
241+
l.Source.Filter = resp.Source.Filter
242+
}
241243
} else {
242244
if l.Target == nil {
243245
l.Target = new(frames.Target)
@@ -681,7 +683,7 @@ func (l *link) muxHandleFrame(fr frames.FrameBody) error {
681683
if !fr.Echo {
682684
// if the 'drain' flag has been set in the frame sent to the _receiver_ then
683685
// we signal whomever is waiting (the service has seen and acknowledged our drain)
684-
if fr.Drain && l.receiver.manualCreditor != nil {
686+
if fr.Drain && l.receiver != nil && l.receiver.manualCreditor != nil {
685687
l.linkCredit = 0 // we have no active credits at this point.
686688
l.receiver.manualCreditor.EndDrain()
687689
}

sdk/messaging/azservicebus/internal/go-amqp/link_options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ func LinkTargetAddress(addr string) LinkOption {
102102
}
103103
}
104104

105+
// LinkTargetCapabilities sets the target capabilities.
106+
func LinkTargetCapabilities(capabilities ...string) LinkOption {
107+
return func(l *link) error {
108+
if l.Target == nil {
109+
l.Target = new(frames.Target)
110+
}
111+
112+
// Convert string to symbol
113+
symbolCapabilities := make([]encoding.Symbol, len(capabilities))
114+
for i, v := range capabilities {
115+
symbolCapabilities[i] = encoding.Symbol(v)
116+
}
117+
118+
l.Target.Capabilities = append(l.Target.Capabilities, symbolCapabilities...)
119+
return nil
120+
}
121+
}
122+
105123
// LinkAddressDynamic requests a dynamically created address from the server.
106124
func LinkAddressDynamic() LinkOption {
107125
return func(l *link) error {

sdk/messaging/azservicebus/internal/go-amqp/receiver.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (r *Receiver) Prefetched(ctx context.Context) (*Message, error) {
6262
case msg := <-r.link.Messages:
6363
debug(3, "Receive() non blocking %d", msg.deliveryID)
6464
msg.link = r.link
65-
return acceptIfModeFirst(ctx, r, &msg)
65+
return &msg, nil
6666
case <-ctx.Done():
6767
return nil, ctx.Err()
6868
default:
@@ -89,27 +89,14 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
8989
case msg := <-r.link.Messages:
9090
debug(3, "Receive() blocking %d", msg.deliveryID)
9191
msg.link = r.link
92-
return acceptIfModeFirst(ctx, r, &msg)
92+
return &msg, nil
9393
case <-r.link.Detached:
9494
return nil, r.link.err
9595
case <-ctx.Done():
9696
return nil, ctx.Err()
9797
}
9898
}
9999

100-
// acceptIfModeFirst auto-accepts a message if we are in mode first, otherwise it no-ops.
101-
func acceptIfModeFirst(ctx context.Context, r *Receiver, msg *Message) (*Message, error) {
102-
// for ModeFirst, auto-accept the message
103-
if receiverSettleModeValue(r.link.ReceiverSettleMode) == ModeSecond {
104-
return msg, nil
105-
}
106-
if err := r.AcceptMessage(ctx, msg); err != nil {
107-
return nil, err
108-
}
109-
return msg, nil
110-
111-
}
112-
113100
// Accept notifies the server that the message has been
114101
// accepted and does not require redelivery.
115102
func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error {

sdk/messaging/azservicebus/receiver_unit_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,13 +438,13 @@ func TestReceiver_UserFacingErrors(t *testing.T) {
438438
require.ErrorAs(t, err, &asSBError)
439439
require.Equal(t, CodeConnectionLost, asSBError.Code)
440440

441-
fakeAMQPLinks.Err = amqp.ErrConnClosed
441+
fakeAMQPLinks.Err = &amqp.ConnectionError{}
442442
messages, err = receiver.ReceiveDeferredMessages(context.Background(), []int64{1}, nil)
443443
require.Empty(t, messages)
444444
require.ErrorAs(t, err, &asSBError)
445445
require.Equal(t, CodeConnectionLost, asSBError.Code)
446446

447-
fakeAMQPLinks.Err = amqp.ErrConnClosed
447+
fakeAMQPLinks.Err = &amqp.ConnectionError{}
448448
messages, err = receiver.ReceiveMessages(context.Background(), 1, nil)
449449
require.Empty(t, messages)
450450
require.ErrorAs(t, err, &asSBError)

0 commit comments

Comments
 (0)