Skip to content

Commit 94de68c

Browse files
[azservicebus] Stress test improvements (Azure#19668)
* Stress test improvements: - Fixing the panic that happens if you're not using a topic! - Adding in a chaos run for infiniteSendAndReceive that can be used as an example for others. Also, fixing some issues in CI that cropped up around areas that probably needed some garbage cleanup: * Fixing the inconsistent logging and concurrency issues with the old style. Just keep the function in place, swap out channels, etc... Also, fix the circular references between test and utils. * Eliminate the non-determinism in a message releaser test by only returning a single element, and then blocking until cancel (more realistic behavior)
1 parent f8f2c90 commit 94de68c

File tree

11 files changed

+142
-121
lines changed

11 files changed

+142
-121
lines changed
-482 Bytes
Binary file not shown.

sdk/messaging/azservicebus/internal/stress/job.yaml

Lines changed: 0 additions & 29 deletions
This file was deleted.

sdk/messaging/azservicebus/internal/stress/sb-network-loss.yaml

Lines changed: 0 additions & 26 deletions
This file was deleted.

sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ matrix:
3434
infiniteSendAndReceive:
3535
testTarget: infiniteSendAndReceive
3636
memory: "1.5Gi"
37+
infiniteSendAndReceiveWithChaos:
38+
testTarget: infiniteSendAndReceive
39+
# this value is injected as a label value in templates/deploy-job.yaml
40+
# this'll activate our standard chaos policy, which is at the bottom of that file.
41+
chaos: "true"
42+
memory: "1.5Gi"
3743
longRunningRenewLock:
3844
testTarget: longRunningRenewLock
3945
memory: "1.5Gi"

sdk/messaging/azservicebus/internal/stress/shared/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNa
8888

8989
var topicOpts *admin.CreateTopicOptions
9090

91-
if options.Topic != nil {
91+
if options != nil && options.Topic != nil {
9292
topicOpts = options.Topic
9393
}
9494

sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
{{- define "stress.deploy-example" -}}
33
metadata:
44
labels:
5-
testName: "gosb"
5+
chaos: "{{ default false .Stress.chaos }}"
66
spec:
77
# uncomment to deploy to the southeastasia region.
88
# nodeSelector:
@@ -22,7 +22,7 @@ spec:
2222
# just uses 'limits' for both.
2323
resources:
2424
limits:
25-
memory: {{.Stress.memory}}
25+
memory: {{.Stress.memory }}
2626
cpu: "1"
2727
args:
2828
- "tests"
@@ -31,3 +31,37 @@ spec:
3131
{{- include "stress-test-addons.container-env" . | nindent 6 }}
3232
{{- end -}}
3333

34+
{{- include "stress-test-addons.chaos-wrapper.tpl" (list . "stress.network-chaos") -}}
35+
{{- define "stress.network-chaos" -}}
36+
# basically: every 5 minutes do 10s of network loss
37+
kind: Schedule
38+
apiVersion: chaos-mesh.org/v1alpha1
39+
spec:
40+
schedule: "*/5 * * * *"
41+
startingDeadlineSeconds: null
42+
concurrencyPolicy: Forbid
43+
historyLimit: 1
44+
type: NetworkChaos
45+
networkChaos:
46+
selector:
47+
namespaces:
48+
- "{{ .Release.Namespace }}"
49+
labelSelectors:
50+
scenario: {{ .Stress.Scenario }}
51+
mode: all
52+
action: loss
53+
duration: 10s
54+
loss:
55+
loss: '100'
56+
correlation: '100'
57+
direction: to
58+
target:
59+
selector:
60+
namespaces:
61+
- {{ .Release.Namespace }}
62+
labelSelectors:
63+
scenario: {{ .Stress.Scenario }}
64+
mode: all
65+
externalTargets:
66+
- {{ .Stress.BaseName }}.servicebus.windows.net
67+
{{- end -}}

sdk/messaging/azservicebus/internal/test/test_helpers.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import (
1010
"math/rand"
1111
"net/http"
1212
"os"
13-
"sync"
13+
"sync/atomic"
1414
"testing"
1515
"time"
1616

17+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1718
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1819
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
19-
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
2020
"github.com/stretchr/testify/require"
2121
)
2222

@@ -25,6 +25,7 @@ var (
2525
)
2626

2727
func init() {
28+
addSwappableLogger()
2829
rand.Seed(time.Now().Unix())
2930
}
3031

@@ -78,8 +79,7 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
7879
qd = &atom.QueueDescription{}
7980
}
8081

81-
deleteAfter := 5 * time.Minute
82-
qd.AutoDeleteOnIdle = utils.DurationToStringPtr(&deleteAfter)
82+
qd.AutoDeleteOnIdle = to.Ptr("PT5M")
8383

8484
env := atom.WrapWithQueueEnvelope(qd, em.TokenProvider())
8585

@@ -94,6 +94,23 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
9494
}
9595
}
9696

97+
var LoggingChannelValue atomic.Value
98+
99+
func addSwappableLogger() {
100+
azlog.SetListener(func(e azlog.Event, s string) {
101+
ch, ok := LoggingChannelValue.Load().(*chan string)
102+
103+
if !ok || ch == nil {
104+
return
105+
}
106+
107+
select {
108+
case *ch <- fmt.Sprintf("[%s] %s", e, s):
109+
default:
110+
}
111+
})
112+
}
113+
97114
// CaptureLogsForTest adds a logging listener which captures messages to an
98115
// internal channel.
99116
// Returns a function that ends log capturing and returns any captured messages.
@@ -107,22 +124,22 @@ func CreateExpiringQueue(t *testing.T, qd *atom.QueueDescription) (string, func(
107124
// messages := endCapture()
108125
// /* do inspection of log messages */
109126
func CaptureLogsForTest() func() []string {
110-
messagesCh := make(chan string, 10000)
111-
return CaptureLogsForTestWithChannel(messagesCh)
127+
return CaptureLogsForTestWithChannel(nil)
112128
}
113129

114130
func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
115-
setAzLogListener(func(e azlog.Event, s string) {
116-
messagesCh <- fmt.Sprintf("[%s] %s", e, s)
117-
})
131+
if messagesCh == nil {
132+
messagesCh = make(chan string, 10000)
133+
}
134+
135+
LoggingChannelValue.Store(&messagesCh)
118136

119137
return func() []string {
120138
if messagesCh == nil {
121139
// already been closed, probably manually.
122140
return nil
123141
}
124142

125-
setAzLogListener(nil)
126143
close(messagesCh)
127144

128145
var messages []string
@@ -137,16 +154,22 @@ func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
137154
}
138155

139156
// EnableStdoutLogging turns on logging to stdout for diagnostics.
140-
func EnableStdoutLogging() {
141-
setAzLogListener(func(e azlog.Event, s string) {
142-
log.Printf("%s %s", e, s)
143-
})
144-
}
157+
func EnableStdoutLogging() func() {
158+
ch := make(chan string, 10000)
159+
cleanupLogs := CaptureLogsForTestWithChannel(ch)
145160

146-
var logMu sync.Mutex
161+
doneCh := make(chan struct{})
147162

148-
func setAzLogListener(listener func(e azlog.Event, s string)) {
149-
logMu.Lock()
150-
defer logMu.Unlock()
151-
azlog.SetListener(listener)
163+
go func() {
164+
defer close(doneCh)
165+
166+
for msg := range ch {
167+
log.Printf("%s", msg)
168+
}
169+
}()
170+
171+
return func() {
172+
_ = cleanupLogs()
173+
<-doneCh
174+
}
152175
}

sdk/messaging/azservicebus/internal/utils/retrier_test.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1616
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1717
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -290,16 +291,8 @@ func TestCalcDelay(t *testing.T) {
290291
}
291292

292293
func TestRetryLogging(t *testing.T) {
293-
var logs []string
294-
295-
azlog.SetListener(func(e azlog.Event, s string) {
296-
logs = append(logs, fmt.Sprintf("[%-10s] %s", e, s))
297-
})
298-
299-
defer azlog.SetListener(nil)
300-
301294
t.Run("normal error", func(t *testing.T) {
302-
logs = nil
295+
logsFn := test.CaptureLogsForTest()
303296

304297
err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error {
305298
azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I)
@@ -312,25 +305,40 @@ func TestRetryLogging(t *testing.T) {
312305
require.EqualError(t, err, "hello")
313306

314307
require.Equal(t, []string{
315-
"[TestFunc ] Attempt 0, within test func, returning error hello",
308+
"[TestFunc] Attempt 0, within test func, returning error hello",
316309
"[testLogEvent] (my_operation) Retry attempt 0 returned retryable error: hello",
317310

318311
"[testLogEvent] (my_operation) Retry attempt 1 sleeping for <time elided>",
319-
"[TestFunc ] Attempt 1, within test func, returning error hello",
312+
"[TestFunc] Attempt 1, within test func, returning error hello",
320313
"[testLogEvent] (my_operation) Retry attempt 1 returned retryable error: hello",
321314

322315
"[testLogEvent] (my_operation) Retry attempt 2 sleeping for <time elided>",
323-
"[TestFunc ] Attempt 2, within test func, returning error hello",
316+
"[TestFunc] Attempt 2, within test func, returning error hello",
324317
"[testLogEvent] (my_operation) Retry attempt 2 returned retryable error: hello",
325318

326319
"[testLogEvent] (my_operation) Retry attempt 3 sleeping for <time elided>",
327-
"[TestFunc ] Attempt 3, within test func, returning error hello",
320+
"[TestFunc] Attempt 3, within test func, returning error hello",
328321
"[testLogEvent] (my_operation) Retry attempt 3 returned retryable error: hello",
329-
}, normalizeRetryLogLines(logs))
322+
}, normalizeRetryLogLines(logsFn()))
323+
})
324+
325+
t.Run("normal error2", func(t *testing.T) {
326+
cleanup := test.EnableStdoutLogging()
327+
defer cleanup()
328+
329+
err := Retry(context.Background(), testLogEvent, "my_operation", func(ctx context.Context, args *RetryFnArgs) error {
330+
azlog.Writef("TestFunc", "Attempt %d, within test func, returning error hello", args.I)
331+
return errors.New("hello")
332+
}, func(err error) bool {
333+
return false
334+
}, exported.RetryOptions{
335+
RetryDelay: time.Microsecond,
336+
})
337+
require.EqualError(t, err, "hello")
330338
})
331339

332340
t.Run("cancellation error", func(t *testing.T) {
333-
logs = nil
341+
logsFn := test.CaptureLogsForTest()
334342

335343
err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
336344
azlog.Writef("TestFunc",
@@ -344,13 +352,13 @@ func TestRetryLogging(t *testing.T) {
344352
require.ErrorIs(t, err, context.Canceled)
345353

346354
require.Equal(t, []string{
347-
"[TestFunc ] Attempt 0, within test func",
355+
"[TestFunc] Attempt 0, within test func",
348356
"[testLogEvent] (test_operation) Retry attempt 0 was cancelled, stopping: context canceled",
349-
}, normalizeRetryLogLines(logs))
357+
}, normalizeRetryLogLines(logsFn()))
350358
})
351359

352360
t.Run("custom fatal error", func(t *testing.T) {
353-
logs = nil
361+
logsFn := test.CaptureLogsForTest()
354362

355363
err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
356364
azlog.Writef("TestFunc",
@@ -364,14 +372,13 @@ func TestRetryLogging(t *testing.T) {
364372
require.EqualError(t, err, "custom fatal error")
365373

366374
require.Equal(t, []string{
367-
"[TestFunc ] Attempt 0, within test func",
375+
"[TestFunc] Attempt 0, within test func",
368376
"[testLogEvent] (test_operation) Retry attempt 0 returned non-retryable error: custom fatal error",
369-
}, normalizeRetryLogLines(logs))
377+
}, normalizeRetryLogLines(logsFn()))
370378
})
371379

372380
t.Run("with reset attempts", func(t *testing.T) {
373-
logs = nil
374-
381+
logsFn := test.CaptureLogsForTest()
375382
reset := false
376383

377384
err := Retry(context.Background(), testLogEvent, "test_operation", func(ctx context.Context, args *RetryFnArgs) error {
@@ -399,13 +406,13 @@ func TestRetryLogging(t *testing.T) {
399406
require.Nil(t, err)
400407

401408
require.Equal(t, []string{
402-
"[TestFunc ] Attempt 0, within test func",
403-
"[TestFunc ] Attempt 0, resetting",
409+
"[TestFunc] Attempt 0, within test func",
410+
"[TestFunc] Attempt 0, resetting",
404411
"[testLogEvent] (test_operation) Resetting retry attempts",
405412
"[testLogEvent] (test_operation) Retry attempt -1 returned retryable error: link detached, reason: *Error(nil)",
406-
"[TestFunc ] Attempt 0, within test func",
407-
"[TestFunc ] Attempt 0, return nil",
408-
}, normalizeRetryLogLines(logs))
413+
"[TestFunc] Attempt 0, within test func",
414+
"[TestFunc] Attempt 0, return nil",
415+
}, normalizeRetryLogLines(logsFn()))
409416
})
410417
}
411418

sdk/messaging/azservicebus/receiver_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,9 +380,8 @@ func TestReceiverPeek(t *testing.T) {
380380

381381
func TestReceiverDetachWithPeekLock(t *testing.T) {
382382
// NOTE: uncomment this to see some of the background reconnects
383-
// azlog.SetListener(func(e azlog.Event, s string) {
384-
// log.Printf("%s %s", e, s)
385-
// })
383+
// stopFn := test.EnableStdoutLogging()
384+
// defer stopFn()
386385

387386
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
388387
defer cleanup()

0 commit comments

Comments
 (0)