Skip to content

Commit a14c17b

Browse files
[azservicebus] Adding in a session manager test - create and receive on sessions over and over again. (Azure#19639)
Adding in a test that lets us test sessions by repeatedly creating them (ie, sending a message with a session ID) and then receiving on it. The session tests (since it holds open more receivers) uses more memory, so we account for that now by specifying a memory limit for each test. Part of the fix for Azure#19511.
1 parent 0799479 commit a14c17b

File tree

11 files changed

+266
-12
lines changed

11 files changed

+266
-12
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
dependencies:
22
- name: stress-test-addons
33
repository: https://stresstestcharts.blob.core.windows.net/helm/
4-
version: 0.1.21
5-
digest: sha256:ac7f0861fd54ebba0e3fec92c1fecc058e96f58df9094641605dd4250a7a423f
6-
generated: "2022-10-29T01:11:52.71900634Z"
4+
version: 0.2.0
5+
digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0
6+
generated: "2022-11-19T01:30:02.403917379Z"
Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
displayNames:
2+
# this makes it so these don't show up in the scenario names,
3+
# since they're just clutter.
4+
1.5Gi": ""
5+
4Gi": ""
6+
image: ""
17
matrix:
28
images:
39
go18:
@@ -6,23 +12,40 @@ matrix:
612
scenarios:
713
constantDetach:
814
testTarget: constantDetach
15+
memory: "1.5Gi"
916
constantDetachmentSender:
1017
testTarget: constantDetachmentSender
18+
memory: "1.5Gi"
19+
emptySessions:
20+
testTarget: emptySessions
21+
memory: "1.0Gi"
1122
finitePeeks:
1223
testTarget: finitePeeks
24+
memory: "1.5Gi"
1325
finiteSendAndReceive:
14-
testTarget: finiteSendAndReceive
26+
testTarget: finiteSendAndReceive
27+
memory: "1.5Gi"
28+
finiteSessions:
29+
testTarget: finiteSessions
30+
memory: "4Gi"
1531
idleFastReconnect:
1632
testTarget: idleFastReconnect
33+
memory: "1.5Gi"
1734
infiniteSendAndReceive:
1835
testTarget: infiniteSendAndReceive
36+
memory: "1.5Gi"
1937
longRunningRenewLock:
2038
testTarget: longRunningRenewLock
39+
memory: "1.5Gi"
2140
mostlyIdleReceiver:
2241
testTarget: mostlyIdleReceiver
42+
memory: "1.5Gi"
2343
rapidOpenClose:
2444
testTarget: rapidOpenClose
45+
memory: "1.5Gi"
2546
receiveCancellation:
2647
testTarget: receiveCancellation
48+
memory: "1.5Gi"
2749
sendAndReceiveDrain:
2850
testTarget: sendAndReceiveDrain
51+
memory: "1.5Gi"

sdk/messaging/azservicebus/internal/stress/shared/stress_context.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func MustCreateStressContext(testName string) *StressContext {
6565

6666
ctx, cancel := NewCtrlCContext()
6767

68-
azlog.SetEvents(azservicebus.EventSender, azservicebus.EventReceiver, azservicebus.EventConn, azservicebus.EventAuth)
68+
azlog.SetEvents(azservicebus.EventSender, azservicebus.EventReceiver, azservicebus.EventConn)
6969

7070
logMessages := make(chan string, 10000)
7171

@@ -158,6 +158,25 @@ func (tracker *StressContext) Failf(format string, args ...any) {
158158
panic(err)
159159
}
160160

161+
func (tracker *StressContext) NoError(err error) {
162+
if err == nil {
163+
return
164+
}
165+
166+
tracker.LogIfFailed(err.Error(), err, nil)
167+
panic(err)
168+
}
169+
170+
func (tracker *StressContext) NoErrorf(err error, format string, args ...any) {
171+
if err == nil {
172+
return
173+
}
174+
175+
msg := fmt.Sprintf(format, args...)
176+
tracker.LogIfFailed(fmt.Sprintf("%s: %s", msg, err.Error()), err, nil)
177+
panic(err)
178+
}
179+
161180
func (tracker *StressContext) Assert(condition bool, message string) {
162181
tracker.LogIfFailed(message, nil, nil)
163182

@@ -166,6 +185,18 @@ func (tracker *StressContext) Assert(condition bool, message string) {
166185
}
167186
}
168187

188+
func (tracker *StressContext) Equal(val1 any, val2 any) {
189+
if val1 != val2 {
190+
panic(fmt.Errorf("Expected %v, got %v", val1, val2))
191+
}
192+
}
193+
194+
func (tracker *StressContext) Nil(val1 any) {
195+
if val1 == nil {
196+
panic("value was not nil")
197+
}
198+
}
199+
169200
func (sc *StressContext) LogIfFailed(message string, err error, stats *Stats) {
170201
if err != nil {
171202
log.Printf("Error: %s: %#v, %T", message, err, err)

sdk/messaging/azservicebus/internal/stress/shared/utils.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,34 @@ func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.
7474
return adminClient
7575
}
7676

77-
func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string) func() {
77+
type MustCreateSubscriptionsOptions struct {
78+
Topic *admin.CreateTopicOptions
79+
Subscription *admin.CreateSubscriptionOptions
80+
}
81+
82+
func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func() {
7883
log.Printf("[BEGIN] Creating topic %s", topicName)
7984
defer log.Printf("[END] Creating topic %s", topicName)
8085

8186
ac, err := admin.NewClientFromConnectionString(sc.ConnectionString, nil)
8287
sc.PanicOnError("Failed to create a topic manager", err)
8388

84-
_, err = ac.CreateTopic(context.Background(), topicName, nil)
89+
var topicOpts *admin.CreateTopicOptions
90+
91+
if options.Topic != nil {
92+
topicOpts = options.Topic
93+
}
94+
95+
_, err = ac.CreateTopic(context.Background(), topicName, topicOpts)
8596
sc.PanicOnError("Failed to create topic", err)
8697

8798
for _, name := range subscriptionNames {
88-
_, err := ac.CreateSubscription(context.Background(), topicName, name, nil)
99+
var subOpts admin.CreateSubscriptionOptions
100+
101+
if options != nil && options.Subscription != nil {
102+
subOpts = *options.Subscription
103+
}
104+
_, err := ac.CreateSubscription(context.Background(), topicName, name, &subOpts)
89105
sc.PanicOnError("Failed to create subscription manager", err)
90106
}
91107

sdk/messaging/azservicebus/internal/stress/templates/deploy-job.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ spec:
2222
# just uses 'limits' for both.
2323
resources:
2424
limits:
25-
memory: "1.5Gi"
25+
memory: {{.Stress.memory}}
2626
cpu: "1"
2727
args:
2828
- "tests"
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package tests
5+
6+
import (
7+
"context"
8+
"errors"
9+
"flag"
10+
"fmt"
11+
"log"
12+
"strings"
13+
"time"
14+
15+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
16+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
17+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
19+
)
20+
21+
func EmptySessions(remainingArgs []string) {
22+
params := struct {
23+
numAttempts int
24+
rounds int
25+
}{}
26+
27+
fs := flag.NewFlagSet("emptysessions", flag.PanicOnError)
28+
29+
fs.IntVar(&params.numAttempts, "sessions", 2000, "Number of attempts to get a session")
30+
fs.IntVar(&params.rounds, "rounds", 100, "Number of rounds to run with these parameters. -1 means math.MaxInt64")
31+
32+
topicName := strings.ToLower(fmt.Sprintf("topic-%X", time.Now().UnixNano()))
33+
log.Printf("Creating topic %s", topicName)
34+
35+
sc := shared.MustCreateStressContext("emptysessions")
36+
defer sc.End()
37+
38+
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"sub1"}, &shared.MustCreateSubscriptionsOptions{
39+
Subscription: &admin.CreateSubscriptionOptions{
40+
Properties: &admin.SubscriptionProperties{
41+
RequiresSession: to.Ptr(true),
42+
},
43+
},
44+
})
45+
defer cleanup()
46+
47+
client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, &azservicebus.ClientOptions{
48+
RetryOptions: azservicebus.RetryOptions{
49+
// barrel through retryable failures - we're specifically trying to see if we ever stop
50+
// receiving the standard "no session within time limit" error.
51+
MaxRetries: 10,
52+
},
53+
})
54+
sc.NoError(err)
55+
56+
defer func() {
57+
sc.NoError(client.Close(context.Background()))
58+
}()
59+
60+
for round := 0; round < params.rounds; round++ {
61+
for i := 0; i < params.numAttempts; i++ {
62+
// the default "wait for next session" timeout is basically a minute. If we exceed that then something is broken.
63+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
64+
sessionReceiver, err := client.AcceptNextSessionForSubscription(ctx, topicName, "sub1", nil)
65+
cancel()
66+
sc.Nil(sessionReceiver)
67+
68+
// the error should indicate that we timed out waiting for a new session
69+
if sbErr := (*azservicebus.Error)(nil); errors.As(err, &sbErr) {
70+
sc.Equal(azservicebus.CodeTimeout, sbErr.Code)
71+
} else if err != nil {
72+
sc.PanicOnError("A non-timeout error occurred", err)
73+
}
74+
}
75+
}
76+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package tests
5+
6+
import (
7+
"context"
8+
"flag"
9+
"fmt"
10+
"log"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
16+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
17+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
18+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
19+
)
20+
21+
type finiteSessionsArgs struct {
22+
numSessions int
23+
rounds int
24+
}
25+
26+
func FiniteSessions(remainingArgs []string) {
27+
// NOTE: these values aren't particularly special, but they do try to create a reasonable default
28+
// test just to make sure everything is working.
29+
//
30+
// Look in ../templates/deploy-job.yaml for some of the other parameter variations we use in stress/longevity
31+
// testing.
32+
fs := flag.NewFlagSet("FiniteSessions", flag.ContinueOnError)
33+
34+
params := finiteSessionsArgs{}
35+
36+
fs.IntVar(&params.numSessions, "sessions", 2000, "Number of sessions to test")
37+
fs.IntVar(&params.rounds, "rounds", 100, "Number of rounds to run with these parameters. -1 means math.MaxInt64")
38+
39+
sc := shared.MustCreateStressContext("FiniteSessions")
40+
defer sc.End()
41+
42+
topicName := strings.ToLower(fmt.Sprintf("topic-%X", time.Now().UnixNano()))
43+
44+
log.Printf("Creating topic %s", topicName)
45+
46+
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"sub1"}, &shared.MustCreateSubscriptionsOptions{
47+
Subscription: &admin.CreateSubscriptionOptions{
48+
Properties: &admin.SubscriptionProperties{
49+
RequiresSession: to.Ptr(true),
50+
},
51+
},
52+
})
53+
defer cleanup()
54+
55+
client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
56+
sc.NoError(err)
57+
58+
sender, err := client.NewSender(topicName, nil)
59+
sc.NoError(err)
60+
61+
defer sender.Close(sc.Context)
62+
63+
for round := 0; round < int(params.rounds); round++ {
64+
var sessionReceivers []*azservicebus.SessionReceiver
65+
wg := sync.WaitGroup{}
66+
67+
for i := 0; i < params.numSessions; i++ {
68+
sessionID := fmt.Sprintf("%d:%d", round, i)
69+
70+
err = sender.SendMessage(sc.Context, &azservicebus.Message{
71+
SessionID: &sessionID,
72+
}, nil)
73+
sc.NoError(err)
74+
75+
sessionReceiver, err := client.AcceptNextSessionForSubscription(sc.Context, topicName, "sub1", nil)
76+
sc.NoError(err)
77+
78+
// one of the things mentioned in the customer issue - they keep the session receivers
79+
// alive for a long time.
80+
sessionReceivers = append(sessionReceivers, sessionReceiver)
81+
82+
wg.Add(1)
83+
84+
go func() {
85+
defer wg.Done()
86+
87+
ctx, cancel := context.WithTimeout(sc.Context, time.Minute)
88+
messages, err := sessionReceiver.ReceiveMessages(ctx, 2, nil)
89+
cancel()
90+
91+
sc.NoError(err)
92+
sc.Equal(1, len(messages))
93+
sc.Equal(sessionID, *messages[0].SessionID)
94+
95+
sc.NoError(sessionReceiver.CompleteMessage(sc.Context, messages[0], nil))
96+
}()
97+
}
98+
99+
wg.Wait()
100+
101+
for _, receiver := range sessionReceivers {
102+
err = receiver.Close(sc.Context)
103+
sc.NoErrorf(err, "No errors when session receiver is closed")
104+
}
105+
}
106+
}

sdk/messaging/azservicebus/internal/stress/tests/idle_fast_reconnect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func IdleFastReconnect(remainingArgs []string) {
2424
sc.Track(startEvent)
2525
defer sc.End()
2626

27-
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"subscriptionA"})
27+
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"subscriptionA"}, nil)
2828
defer cleanup()
2929

3030
client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, &azservicebus.ClientOptions{

sdk/messaging/azservicebus/internal/stress/tests/infinite_send_and_receive.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func InfiniteSendAndReceiveRun(remainingArgs []string) {
2929

3030
stats := sc.NewStat("infinite")
3131

32-
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"batch"})
32+
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"batch"}, nil)
3333
defer cleanup()
3434

3535
time.AfterFunc(5*24*time.Hour, func() {

sdk/messaging/azservicebus/internal/stress/tests/tests.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ func Run(remainingArgs []string) {
3434
allTests := map[string]func(args []string){
3535
"constantDetach": ConstantDetachment,
3636
"constantDetachmentSender": ConstantDetachmentSender,
37+
"emptySessions": EmptySessions,
3738
"finitePeeks": FinitePeeks,
3839
"finiteSendAndReceive": FiniteSendAndReceiveTest,
40+
"finiteSessions": FiniteSessions,
3941
"idleFastReconnect": IdleFastReconnect,
4042
"infiniteSendAndReceive": InfiniteSendAndReceiveRun,
4143
"longRunningRenewLock": LongRunningRenewLockTest,

0 commit comments

Comments
 (0)