Skip to content

Commit e50c6cc

Browse files
[azeventhubs] Prep for release, fixing tests (Azure#19364)
This is just a simple update for QA, no fixes and also pushing the changelog date forward since it got delayed. - Added in some more tests for recovery. There were some incidents in the stress cluster but they turned out to be red herrings. They did reveal that some more granular tests should exist and now we have them for both link and connection level recovery. - Fixed a bug in one of the stress tests. We kept getting an odd (but non-fatal) error as the test tore down (after passing!) about the connection idling out. Turns out it was accurate - the batch test is a little unusual in that it doesn't send events continually - it sends them once at the beginning of the test and the consumers just keep rewinding to pick them up again. So the producer client just needed to be closed after it's part in the test was done. Fixes Azure#19220
1 parent aeb29e7 commit e50c6cc

File tree

3 files changed

+297
-3
lines changed

3 files changed

+297
-3
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 0.2.0 (2022-10-13)
3+
## 0.2.0 (2022-10-17)
44

55
### Features Added
66

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azeventhubs
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"log"
10+
"sync"
11+
"sync/atomic"
12+
"testing"
13+
"time"
14+
15+
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
16+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
17+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestConsumerClient_Recovery(t *testing.T) {
23+
testParams := test.GetConnectionParamsForTest(t)
24+
25+
// Uncomment to see the entire recovery playbook run.
26+
// test.EnableStdoutLogging()
27+
28+
dac, err := azidentity.NewDefaultAzureCredential(nil)
29+
require.NoError(t, err)
30+
31+
// Overview:
32+
// 1. Send one event per partition
33+
// 2. Receive one event per partition. This'll ensure the links are live.
34+
// 3. Grub into the client to get access to it's connection and shut it off.
35+
// 4. Try again, everything should recover.
36+
producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil)
37+
require.NoError(t, err)
38+
39+
ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil)
40+
require.NoError(t, err)
41+
42+
// trim the partition list down so the test executes in resonable time.
43+
ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways
44+
45+
type sendResult struct {
46+
PartitionID string
47+
OffsetBefore int64
48+
}
49+
50+
sendResults := make([]sendResult, len(ehProps.PartitionIDs))
51+
wg := sync.WaitGroup{}
52+
53+
log.Printf("1. sending 2 events to %d partitions", len(ehProps.PartitionIDs))
54+
55+
for i, pid := range ehProps.PartitionIDs {
56+
wg.Add(1)
57+
58+
go func(i int, pid string) {
59+
defer wg.Done()
60+
61+
partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil)
62+
require.NoError(t, err)
63+
64+
batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{
65+
PartitionID: &pid,
66+
})
67+
require.NoError(t, err)
68+
69+
require.NoError(t, batch.AddEventData(&EventData{
70+
Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)),
71+
}, nil))
72+
73+
require.NoError(t, batch.AddEventData(&EventData{
74+
Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)),
75+
}, nil))
76+
77+
err = producerClient.SendEventBatch(context.Background(), batch, nil)
78+
require.NoError(t, err)
79+
80+
sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset}
81+
}(i, pid)
82+
}
83+
84+
wg.Wait()
85+
86+
test.RequireClose(t, producerClient)
87+
88+
// now we'll receive an event (so we know each partition client is alive)
89+
// each partition actually has two offsets.
90+
consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil)
91+
require.NoError(t, err)
92+
93+
partitionClients := make([]*PartitionClient, len(sendResults))
94+
95+
log.Printf("2. receiving the first event for each partition")
96+
97+
for i, sr := range sendResults {
98+
wg.Add(1)
99+
100+
go func(i int, sr sendResult) {
101+
defer wg.Done()
102+
103+
partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{
104+
StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore},
105+
Prefetch: -1,
106+
})
107+
require.NoError(t, err)
108+
109+
partitionClients[i] = partClient
110+
111+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
112+
defer cancel()
113+
114+
events, err := partClient.ReceiveEvents(ctx, 1, nil)
115+
require.NoError(t, err)
116+
require.EqualValues(t, 1, len(events))
117+
require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
118+
}(i, sr)
119+
}
120+
121+
wg.Wait()
122+
123+
defer test.RequireClose(t, consumerClient)
124+
125+
log.Printf("3. closing connection, which will force recovery for each partition client so they can read the next event")
126+
127+
// now we'll close the internal connection, simulating a connection break
128+
require.NoError(t, consumerClient.namespace.Close(context.Background(), false))
129+
130+
var best int64
131+
132+
log.Printf("4. try to read the second event, which force clients to recover")
133+
134+
// and try to receive the second event for each client
135+
for i, pc := range partitionClients {
136+
wg.Add(1)
137+
138+
go func(i int, pc *PartitionClient) {
139+
defer wg.Done()
140+
141+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
142+
defer cancel()
143+
144+
events, err := pc.ReceiveEvents(ctx, 1, nil)
145+
require.NoError(t, err)
146+
require.EqualValues(t, 1, len(events))
147+
require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body))
148+
149+
atomic.AddInt64(&best, 1)
150+
}(i, pc)
151+
}
152+
153+
wg.Wait()
154+
require.Equal(t, int64(len(ehProps.PartitionIDs)), best)
155+
}
156+
157+
func TestConsumerClient_RecoveryLink(t *testing.T) {
158+
testParams := test.GetConnectionParamsForTest(t)
159+
160+
// Uncomment to see the entire recovery playbook run.
161+
// test.EnableStdoutLogging()
162+
163+
dac, err := azidentity.NewDefaultAzureCredential(nil)
164+
require.NoError(t, err)
165+
166+
// Overview:
167+
// 1. Send one event per partition
168+
// 2. Receive one event per partition. This'll ensure the links are live.
169+
// 3. Grub into the client to get access to it's connection and shut it off.
170+
// 4. Try again, everything should recover.
171+
producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil)
172+
require.NoError(t, err)
173+
174+
ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil)
175+
require.NoError(t, err)
176+
177+
// trim the partition list down so the test executes in resonable time.
178+
ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways
179+
180+
type sendResult struct {
181+
PartitionID string
182+
OffsetBefore int64
183+
}
184+
185+
sendResults := make([]sendResult, len(ehProps.PartitionIDs))
186+
wg := sync.WaitGroup{}
187+
188+
log.Printf("== 1. sending 2 events to %d partitions ==", len(ehProps.PartitionIDs))
189+
190+
for i, pid := range ehProps.PartitionIDs {
191+
wg.Add(1)
192+
193+
go func(i int, pid string) {
194+
defer wg.Done()
195+
196+
partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil)
197+
require.NoError(t, err)
198+
199+
batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{
200+
PartitionID: &pid,
201+
})
202+
require.NoError(t, err)
203+
204+
require.NoError(t, batch.AddEventData(&EventData{
205+
Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)),
206+
}, nil))
207+
208+
require.NoError(t, batch.AddEventData(&EventData{
209+
Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)),
210+
}, nil))
211+
212+
err = producerClient.SendEventBatch(context.Background(), batch, nil)
213+
require.NoError(t, err)
214+
215+
sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset}
216+
}(i, pid)
217+
}
218+
219+
wg.Wait()
220+
221+
test.RequireClose(t, producerClient)
222+
223+
// now we'll receive an event (so we know each partition client is alive)
224+
// each partition actually has two offsets.
225+
consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil)
226+
require.NoError(t, err)
227+
228+
partitionClients := make([]*PartitionClient, len(sendResults))
229+
230+
log.Printf("== 2. receiving the first event for each partition == ")
231+
232+
for i, sr := range sendResults {
233+
wg.Add(1)
234+
235+
go func(i int, sr sendResult) {
236+
defer wg.Done()
237+
238+
partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{
239+
StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore},
240+
Prefetch: -1,
241+
})
242+
require.NoError(t, err)
243+
244+
partitionClients[i] = partClient
245+
246+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
247+
defer cancel()
248+
249+
events, err := partClient.ReceiveEvents(ctx, 1, nil)
250+
require.NoError(t, err)
251+
require.EqualValues(t, 1, len(events))
252+
require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
253+
}(i, sr)
254+
}
255+
256+
wg.Wait()
257+
258+
defer test.RequireClose(t, consumerClient)
259+
260+
var best int64
261+
262+
log.Printf("== 3. Closing links, but leaving connection intact ==")
263+
264+
for i, pc := range partitionClients {
265+
links := pc.links.(*internal.Links[amqpwrap.AMQPReceiverCloser])
266+
lwid, err := links.GetLink(context.Background(), sendResults[i].PartitionID)
267+
require.NoError(t, err)
268+
require.NoError(t, lwid.Link.Close(context.Background()))
269+
}
270+
271+
log.Printf("== 4. try to read the second event, which force clients to recover ==")
272+
273+
// and try to receive the second event for each client
274+
for i, pc := range partitionClients {
275+
wg.Add(1)
276+
277+
go func(i int, pc *PartitionClient) {
278+
defer wg.Done()
279+
280+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
281+
defer cancel()
282+
283+
events, err := pc.ReceiveEvents(ctx, 1, nil)
284+
require.NoError(t, err)
285+
require.EqualValues(t, 1, len(events))
286+
require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body))
287+
288+
atomic.AddInt64(&best, 1)
289+
}(i, pc)
290+
}
291+
292+
wg.Wait()
293+
require.Equal(t, int64(len(ehProps.PartitionIDs)), best)
294+
}

sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ func BatchStressTester(ctx context.Context) error {
105105
return err
106106
}
107107

108-
defer closeOrPanic(producerClient)
109-
110108
// we're going to read (and re-read these events over and over in our tests)
111109
log.Printf("Sending messages to partition %s", params.partitionID)
112110

@@ -118,6 +116,8 @@ func BatchStressTester(ctx context.Context) error {
118116
testData: testData,
119117
})
120118

119+
closeOrPanic(producerClient)
120+
121121
if err != nil {
122122
log.Fatalf("Failed to send events to partition %s: %s", params.partitionID, err)
123123
}

0 commit comments

Comments
 (0)