Skip to content

Commit 4474d1e

Browse files
[azeventhubs] Adding in prefetch value for the Processor, updating stress tests (Azure#19786)
- Added the option to configure prefetch count for ProcessorPartitionClient's created from the Processor - Fixed some issues with the stress tests. Batch tests were using a really large batch size, which was too large for the amount of memory we're using in our test containers. - Processor tests weren't configuring a prefetch value (prior to this PR the setting wasn't exposed) Also, just generally made the stress tests a bit cleaner, leveragin the scenario-matrix more for configuring values. Fixes Azure#19770
1 parent a975191 commit 4474d1e

File tree

8 files changed

+103
-44
lines changed

8 files changed

+103
-44
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Adds ProcessorOptions.Prefetch field, allowing configuration of Prefetch values for PartitionClients created using the Processor. (PR#19786)
8+
79
### Breaking Changes
810

911
### Bugs Fixed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
dependencies:
22
- name: stress-test-addons
33
repository: https://stresstestcharts.blob.core.windows.net/helm/
4-
version: 0.1.21
5-
digest: sha256:ac7f0861fd54ebba0e3fec92c1fecc058e96f58df9094641605dd4250a7a423f
6-
generated: "2022-10-26T23:28:36.470982569Z"
4+
version: 0.2.0
5+
digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0
6+
generated: "2023-01-06T23:57:35.96426915Z"

sdk/messaging/azeventhubs/internal/eh/stress/scenarios-matrix.yaml

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,39 @@ matrix:
66
scenarios:
77
batch:
88
testTarget: batch
9+
type: "batch"
10+
rounds: 100
11+
prefetch: 0
12+
verbose: ""
13+
sleepAfter: "5m"
914
batchprefetchoff:
10-
testTarget: batchprefetchoff
15+
testTarget: batch
16+
rounds: 100
17+
prefetch: -1
18+
verbose: ""
19+
sleepAfter: "5m"
1120
batchinfinite:
12-
testTarget: batchinfinite
21+
testTarget: batch
22+
type: "batch"
23+
rounds: 100
24+
prefetch: 0
25+
verbose: ""
26+
sleepAfter: "5m"
1327
processor:
1428
testTarget: processor
29+
rounds: 100
30+
prefetch: 0
31+
verbose: ""
32+
sleepAfter: "5m"
1533
processorprefetchoff:
16-
testTarget: processorprefetchoff
34+
testTarget: processor
35+
rounds: 100
36+
prefetch: -1
37+
verbose: ""
38+
sleepAfter: "5m"
1739
processorinfinite:
18-
testTarget: processorinfinite
40+
testTarget: processor
41+
rounds: 100
42+
prefetch: 0
43+
verbose: ""
44+
sleepAfter: "5m"

sdk/messaging/azeventhubs/internal/eh/stress/templates/deploy-job.yaml

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,37 +29,14 @@ spec:
2929
# NOTE: -verbose activates _all_ the Azure internal logging, which can get quite large.
3030
# so it's not enabled for every target in here. We also have an issue filed to whittle it
3131
# down (https://github.com/Azure/azure-sdk-for-go/issues/19459)
32-
{{ if (eq .Stress.testTarget "batch") }}
33-
- batch
32+
- "{{.Stress.testTarget}}"
3433
- "-rounds"
35-
- "100"
36-
- "-verbose"
37-
{{ else if (eq .Stress.testTarget "batchprefetchoff") }}
38-
- batch
39-
- "-rounds"
40-
- "100"
41-
- "-prefetch"
42-
- "-1"
43-
- "-verbose"
44-
{{ else if (eq .Stress.testTarget "batchinfinite") }}
45-
- batch
46-
- "-rounds"
47-
- "-1"
48-
{{ else if (eq .Stress.testTarget "processor") }}
49-
- processor
50-
- "-rounds"
51-
- "100"
52-
{{ else if (eq .Stress.testTarget "processorprefetchoff") }}
53-
- processor
54-
- "-rounds"
55-
- "100"
34+
- "{{.Stress.rounds}}"
5635
- "-prefetch"
57-
- "-1"
58-
{{ else if (eq .Stress.testTarget "processorinfinite") }}
59-
- processor
60-
- "-rounds"
61-
- "-1"
62-
{{- end -}}
36+
- "{{.Stress.prefetch}}"
37+
- "{{.Stress.verbose}}"
38+
- "-sleepAfter"
39+
- "{{.Stress.sleepAfter}}"
6340
{{- include "stress-test-addons.container-env" . | nindent 6 }}
6441
{{- end -}}
6542

sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) {
3939
// Look in ../templates/deploy-job.yaml for some of the other parameter variations we use in stress/longevity
4040
// testing.
4141
fs.IntVar(&params.numToSend, "send", 1000000, "Number of events to send.")
42-
fs.IntVar(&params.batchSize, "receive", 1000000, "Size to request each time we call ReceiveEvents()")
42+
fs.IntVar(&params.batchSize, "receive", 1000, "Size to request each time we call ReceiveEvents(). Higher batch sizes will require higher amounts of memory for this test.")
4343
fs.StringVar(&batchDurationStr, "timeout", "60s", "Time to wait for each batch (ie: 1m, 30s, etc..)")
4444
prefetch := fs.Int("prefetch", 0, "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 uses the default for the package.")
4545

@@ -48,8 +48,9 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) {
4848
fs.StringVar(&params.partitionID, "partition", "0", "Partition ID to send and receive events to")
4949
fs.IntVar(&params.maxDeadlineExceeded, "maxtimeouts", 10, "Number of consecutive receive timeouts allowed before quitting")
5050
fs.BoolVar(&params.enableVerboseLogging, "verbose", false, "enable verbose azure sdk logging")
51+
sleepAfterFn := addSleepAfterFlag(fs)
5152

52-
if err := fs.Parse(os.Args[2:]); errors.Is(err, flag.ErrHelp) {
53+
if err := fs.Parse(os.Args[2:]); err != nil {
5354
fs.PrintDefaults()
5455
return batchTesterParams{}, err
5556
}
@@ -68,6 +69,7 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) {
6869
}
6970

7071
params.batchDuration = batchDuration
72+
params.sleepAfterFn = sleepAfterFn
7173

7274
return params, nil
7375
}
@@ -81,6 +83,8 @@ func BatchStressTester(ctx context.Context) error {
8183
return err
8284
}
8385

86+
defer params.sleepAfterFn()
87+
8488
testData, err := newStressTestData("batch", params.enableVerboseLogging, map[string]string{
8589
"BatchDuration": params.batchDuration.String(),
8690
"BatchSize": fmt.Sprintf("%d", params.batchSize),
@@ -160,6 +164,7 @@ type batchTesterParams struct {
160164
prefetch int32
161165
maxDeadlineExceeded int
162166
enableVerboseLogging bool
167+
sleepAfterFn func()
163168
}
164169

165170
func consumeForBatchTester(ctx context.Context, round int64, cc *azeventhubs.ConsumerClient, sp azeventhubs.StartPosition, params batchTesterParams, testData *stressTestData) error {

sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type processorStressTest struct {
3939
eventsPerRound int
4040
rounds int64
4141

42+
prefetch int32
43+
sleepAfterFn func()
44+
4245
checkpointStore azeventhubs.CheckpointStore
4346
}
4447

@@ -48,9 +51,11 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) {
4851
numProcessors := fs.Int("processors", 1, "Number of processors to run, concurrently")
4952
eventsPerRound := fs.Int("send", 5000, "Number of events to send per round")
5053
rounds := fs.Int64("rounds", 100, "Number of rounds. -1 means math.MaxInt64")
54+
prefetch := fs.Int("prefetch", 0, "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 uses the default for the package.")
5155
enableVerboseLogging := fs.Bool("verbose", false, "enable verbose azure sdk logging")
56+
sleepAfterFn := addSleepAfterFlag(fs)
5257

53-
if err := fs.Parse(args); errors.Is(err, flag.ErrHelp) {
58+
if err := fs.Parse(args); err != nil {
5459
fs.PrintDefaults()
5560
return nil, err
5661
}
@@ -63,6 +68,7 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) {
6368
"Processors": fmt.Sprintf("%d", numProcessors),
6469
"EventsPerRound": fmt.Sprintf("%d", eventsPerRound),
6570
"Rounds": fmt.Sprintf("%d", rounds),
71+
"Prefetch": fmt.Sprintf("%d", *prefetch),
6672
})
6773

6874
if err != nil {
@@ -84,6 +90,8 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) {
8490
eventsPerRound: *eventsPerRound,
8591
rounds: *rounds,
8692
checkpointStore: blobStore,
93+
prefetch: int32(*prefetch),
94+
sleepAfterFn: sleepAfterFn,
8795
}, nil
8896
}
8997

@@ -93,6 +101,8 @@ func (inf *processorStressTest) Run(ctx context.Context) error {
93101
inf.eventsPerRound,
94102
inf.containerName)
95103

104+
defer inf.sleepAfterFn()
105+
96106
checkpoints, err := initCheckpointStore(ctx, inf.containerName, inf.stressTestData)
97107

98108
if err != nil {
@@ -123,7 +133,7 @@ func (inf *processorStressTest) Run(ctx context.Context) error {
123133
}
124134

125135
go func() {
126-
if err := inf.receiveForever(ctx, partClient, logger); err != nil {
136+
if err := inf.receiveForever(ctx, partClient, logger, inf.eventsPerRound); err != nil {
127137
inf.TC.TrackException(err)
128138
panic(err)
129139
}
@@ -194,7 +204,8 @@ func (inf *processorStressTest) Run(ctx context.Context) error {
194204
// start checking the checkpoint store to see how far along we are, and when
195205
// we're at the end.
196206
for {
197-
header := fmt.Sprintf("round %d, elapsed %s", round, time.Since(start)/time.Second)
207+
var elapsed = time.Since(start) / time.Second
208+
header := fmt.Sprintf("round %d, elapsed %d seconds", round, elapsed)
198209
output, done, err := inf.report(ctx, header, endPositions)
199210

200211
if err != nil {
@@ -219,7 +230,7 @@ func (inf *processorStressTest) Run(ctx context.Context) error {
219230
return nil
220231
}
221232

222-
func (inf *processorStressTest) receiveForever(ctx context.Context, partClient *azeventhubs.ProcessorPartitionClient, logger logf) error {
233+
func (inf *processorStressTest) receiveForever(ctx context.Context, partClient *azeventhubs.ProcessorPartitionClient, logger logf, eventsPerRound int) error {
223234
defer func() {
224235
logger("Closing")
225236

@@ -233,9 +244,11 @@ func (inf *processorStressTest) receiveForever(ctx context.Context, partClient *
233244

234245
logger("Starting receive loop")
235246

247+
batchSize := int(math.Min(float64(eventsPerRound), 100))
248+
236249
for {
237250
receiveCtx, cancelReceive := context.WithCancel(ctx)
238-
events, err := partClient.ReceiveEvents(receiveCtx, 100, nil)
251+
events, err := partClient.ReceiveEvents(receiveCtx, batchSize, nil)
239252
cancelReceive()
240253

241254
if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil {
@@ -359,7 +372,9 @@ func (inf *processorStressTest) newProcessorForTest(ctx context.Context) (*azeve
359372
return nil, nil, err
360373
}
361374

362-
processor, err := azeventhubs.NewProcessor(cc, cps, nil)
375+
processor, err := azeventhubs.NewProcessor(cc, cps, &azeventhubs.ProcessorOptions{
376+
Prefetch: inf.prefetch,
377+
})
363378

364379
if err != nil {
365380
return nil, nil, err

sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package tests
55
import (
66
"context"
77
"errors"
8+
"flag"
89
"fmt"
910
"log"
1011
"os"
@@ -381,3 +382,23 @@ func enableVerboseLogging() {
381382
log.Printf("[%s] %s", e, s)
382383
})
383384
}
385+
386+
func addSleepAfterFlag(fs *flag.FlagSet) func() {
387+
var durationStr string
388+
fs.StringVar(&durationStr, "sleepAfter", "0m", "Time to sleep after test completes")
389+
390+
return func() {
391+
sleepAfter, err := time.ParseDuration(durationStr)
392+
393+
if err != nil {
394+
log.Printf("Invalid sleepAfter duration given: %s", sleepAfter)
395+
return
396+
}
397+
398+
if sleepAfter > 0 {
399+
log.Printf("Sleeping for %s", sleepAfter)
400+
time.Sleep(sleepAfter)
401+
log.Printf("Done sleeping for %s", sleepAfter)
402+
}
403+
}
404+
}

sdk/messaging/azeventhubs/processor.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ type ProcessorOptions struct {
5555
// from partition clients with a lower OwnerLevel.
5656
// Default is 0.
5757
OwnerLevel int64
58+
59+
// Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient
60+
// created by this Processor. When set, this client will attempt to always maintain
61+
// an internal cache of events of this size, asynchronously, increasing the odds that
62+
// ReceiveEvents() will use a locally stored cache of events, rather than having to
63+
// wait for events to arrive from the network.
64+
//
65+
// Defaults to 300 events if Prefetch == 0.
66+
// Disabled if Prefetch < 0.
67+
Prefetch int32
5868
}
5969

6070
// StartPositions are used if there is no checkpoint for a partition in
@@ -82,6 +92,7 @@ type Processor struct {
8292
defaultStartPositions StartPositions
8393
checkpointStore CheckpointStore
8494
ownerLevel int64
95+
prefetch int32
8596

8697
// consumerClient is actually a *azeventhubs.ConsumerClient
8798
// it's an interface here to make testing easier.
@@ -156,6 +167,7 @@ func newProcessorImpl(consumerClient consumerClientForProcessor, checkpointStore
156167
PerPartition: startPosPerPartition,
157168
Default: options.StartPositions.Default,
158169
},
170+
prefetch: options.Prefetch,
159171
consumerClientDetails: consumerClient.getDetails(),
160172
runCalled: make(chan struct{}),
161173
lb: newProcessorLoadBalancer(checkpointStore, consumerClient.getDetails(), strategy, partitionDurationExpiration),
@@ -327,6 +339,7 @@ func (p *Processor) addPartitionClient(ctx context.Context, ownership Ownership,
327339
partClient, err := p.consumerClient.NewPartitionClient(ownership.PartitionID, &PartitionClientOptions{
328340
StartPosition: sp,
329341
OwnerLevel: &p.ownerLevel,
342+
Prefetch: p.prefetch,
330343
})
331344

332345
if err != nil {

0 commit comments

Comments
 (0)