Skip to content

Commit a8700a4

Browse files
[azeventhubs] Fix credit requesting errors (Azure#20015)
The current go-amqp can have issues if your active credits are greater than the session window size. We have a blocking-release bug here: Azure/go-amqp#240. I've also added validation to prevent asking for <= 0 credits as well, which isn't supported. I've also updated the stress tests to build with Go1.20 - it doesn't change the requirements for customer code.
1 parent 3909a45 commit a8700a4

File tree

4 files changed

+132
-6
lines changed

4 files changed

+132
-6
lines changed

sdk/messaging/azeventhubs/internal/eh/stress/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM mcr.microsoft.com/oss/go/microsoft/golang:1.18 as build
1+
FROM mcr.microsoft.com/oss/go/microsoft/golang:1.20 as build
22
# you'll need to run this build from the root of the azeventhubs module
33
ENV GOOS=linux
44
ENV GOARCH=amd64

sdk/messaging/azeventhubs/internal/eh/stress/stress.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ package main
55
import (
66
"context"
77
"fmt"
8-
"math/rand"
98
"os"
109
"sort"
11-
"time"
1210

1311
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/eh/stress/tests"
1412
)
@@ -42,8 +40,6 @@ func main() {
4240

4341
for _, test := range tests {
4442
if test.name == testName {
45-
rand.Seed(time.Now().UnixNano())
46-
4743
if err := test.fn(context.Background()); err != nil {
4844
fmt.Printf("ERROR: %s\n", err)
4945
os.Exit(1)

sdk/messaging/azeventhubs/partition_client.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import (
2121
const DefaultConsumerGroup = "$Default"
2222

2323
const defaultPrefetchSize = uint32(300)
24-
const defaultMaxCreditSize = uint32(2048)
24+
25+
// defaultLinkRxBuffer is the maximum number of transfer frames we can handle
26+
// on the Receiver. This matches the current default window size that go-amqp
27+
// uses for sessions.
28+
const defaultMaxCreditSize = uint32(5000)
2529

2630
// StartPosition indicates the position to start receiving events within a partition.
2731
// The default position is Latest.
@@ -100,6 +104,14 @@ func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options
100104

101105
prefetchDisabled := pc.prefetch < 0
102106

107+
if count <= 0 {
108+
return nil, internal.NewErrNonRetriable("count should be greater than 0")
109+
}
110+
111+
if prefetchDisabled && count > int(defaultMaxCreditSize) {
112+
return nil, internal.NewErrNonRetriable(fmt.Sprintf("count cannot exceed %d", defaultMaxCreditSize))
113+
}
114+
103115
err := pc.links.Retry(ctx, EventConsumer, "ReceiveEvents", pc.partitionID, pc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPReceiverCloser]) error {
104116
events = nil
105117

@@ -259,6 +271,11 @@ func newPartitionClient(args partitionClientArgs, options *PartitionClientOption
259271
return nil, err
260272
}
261273

274+
if options.Prefetch > int32(defaultMaxCreditSize) {
275+
// don't allow them to set the prefetch above the session window size.
276+
return nil, internal.NewErrNonRetriable(fmt.Sprintf("options.Prefetch cannot exceed %d", defaultMaxCreditSize))
277+
}
278+
262279
client := &PartitionClient{
263280
eventHub: args.eventHub,
264281
partitionID: args.partitionID,

sdk/messaging/azeventhubs/partition_client_unit_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ package azeventhubs
55

66
import (
77
"context"
8+
"fmt"
89
"testing"
910
"time"
1011

1112
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
1213
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
14+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
1315
"github.com/stretchr/testify/require"
1416
)
1517

@@ -41,6 +43,43 @@ func TestUnit_PartitionClient_PrefetchOff(t *testing.T) {
4143
require.True(t, ns.Receiver.ManualCreditsSetFromOptions)
4244
}
4345

46+
func TestUnit_PartitionClient_PrefetchOff_CreditLimits(t *testing.T) {
47+
ns := &internal.FakeNSForPartClient{
48+
Receiver: &internal.FakeAMQPReceiver{
49+
Messages: fakeMessages(int(defaultMaxCreditSize)),
50+
},
51+
}
52+
53+
client, err := newPartitionClient(partitionClientArgs{
54+
namespace: ns,
55+
}, &PartitionClientOptions{
56+
Prefetch: -1,
57+
})
58+
require.NoError(t, err)
59+
60+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
61+
defer cancel()
62+
63+
// can't request over the max
64+
events, err := client.ReceiveEvents(ctx, int(defaultMaxCreditSize+1), nil)
65+
require.EqualError(t, err, "count cannot exceed 5000")
66+
require.Empty(t, events)
67+
68+
// can't request negative/0 credits
69+
events, err = client.ReceiveEvents(ctx, 0, nil)
70+
require.EqualError(t, err, "count should be greater than 0")
71+
require.Empty(t, events)
72+
73+
events, err = client.ReceiveEvents(ctx, -1, nil)
74+
require.EqualError(t, err, "count should be greater than 0")
75+
require.Empty(t, events)
76+
77+
// can request the max
78+
events, err = client.ReceiveEvents(ctx, int(defaultMaxCreditSize), nil)
79+
require.NoError(t, err)
80+
require.NotEmpty(t, events)
81+
}
82+
4483
func TestUnit_PartitionClient_PrefetchOffOnlyBackfillsCredits(t *testing.T) {
4584
testData := []struct {
4685
Name string
@@ -116,3 +155,77 @@ func TestUnit_PartitionClient_PrefetchOn(t *testing.T) {
116155
require.Equal(t, uint32(td.initialCredits-3), ns.Receiver.ActiveCredits, "All messages should have been received")
117156
}
118157
}
158+
159+
func TestUnit_PartitionClient_PrefetchLimit(t *testing.T) {
160+
newPartitionClient := func(prefetch int32) (*PartitionClient, error) {
161+
ns := &internal.FakeNSForPartClient{
162+
Receiver: &internal.FakeAMQPReceiver{
163+
Messages: fakeMessages(int(defaultMaxCreditSize) + 1),
164+
},
165+
}
166+
167+
client, err := newPartitionClient(partitionClientArgs{namespace: ns}, &PartitionClientOptions{
168+
Prefetch: prefetch,
169+
})
170+
171+
return client, err
172+
}
173+
174+
t.Run("max allowed credits is defaultMaxCreditSize", func(t *testing.T) {
175+
client, err := newPartitionClient(int32(defaultMaxCreditSize))
176+
require.NoError(t, err)
177+
require.NotNil(t, client)
178+
179+
test.RequireClose(t, client)
180+
})
181+
182+
t.Run("can't request zero or negative credits", func(t *testing.T) {
183+
client, err := newPartitionClient(int32(defaultMaxCreditSize))
184+
require.NoError(t, err)
185+
require.NotNil(t, client)
186+
187+
events, err := client.ReceiveEvents(context.Background(), 0, nil)
188+
require.EqualError(t, err, "count should be greater than 0")
189+
require.Empty(t, events)
190+
191+
events, err = client.ReceiveEvents(context.Background(), -1, nil)
192+
require.EqualError(t, err, "count should be greater than 0")
193+
require.Empty(t, events)
194+
195+
test.RequireClose(t, client)
196+
})
197+
198+
t.Run("can receive more than defaultMaxCreditSize in prefetch mode", func(t *testing.T) {
199+
client, err := newPartitionClient(0)
200+
require.NoError(t, err)
201+
require.NotNil(t, client)
202+
203+
// if you're using prefetch it's fine to ask for more than the `defaultMaxCreditSize`
204+
// since it doesn't actually cause to request more credits than is allowed.
205+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
206+
defer cancel()
207+
208+
events, err := client.ReceiveEvents(ctx, int(defaultMaxCreditSize+1), nil)
209+
require.NoError(t, err)
210+
require.NotEmpty(t, events)
211+
212+
test.RequireClose(t, client)
213+
})
214+
215+
t.Run("can't set a option.Prefetch > defaultMaxCreditSize", func(t *testing.T) {
216+
// and you can't create a PartitionClient that uses more credits than allowed.
217+
client, err := newPartitionClient(int32(defaultMaxCreditSize) + 1)
218+
require.EqualError(t, err, fmt.Sprintf("options.Prefetch cannot exceed %d", defaultMaxCreditSize))
219+
require.Nil(t, client)
220+
})
221+
}
222+
223+
func fakeMessages(count int) []*amqp.Message {
224+
var messages []*amqp.Message
225+
226+
for i := 0; i < count; i++ {
227+
messages = append(messages, &amqp.Message{})
228+
}
229+
230+
return messages
231+
}

0 commit comments

Comments
 (0)