Skip to content

Commit b643478

Browse files
[Service Bus] Stress Tests - followup PR (Azure#12229)
### Original issue - Azure#11108 Follow-up of the stress tests PR Azure#11546, here are the changes - Uses the latest preview.8 version of the service-bus SDK - Fixes the boolean options being passed as command-line args - Multiple sessions support in the renew session lock test - Adds (some)new options that can be passed as command-line args to the tests - And maybe minor edits
1 parent 4726c35 commit b643478

File tree

10 files changed

+119
-111
lines changed

10 files changed

+119
-111
lines changed

sdk/servicebus/service-bus/test/stress-test-track-2/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"author": "",
1111
"license": "ISC",
1212
"dependencies": {
13-
"@azure/service-bus": "file:../../azure-service-bus-7.0.0-preview.8.tgz",
13+
"@azure/service-bus": "^7.0.0-preview.8",
1414
"minimist": "^1.2.5",
1515
"uuid": "^8.3.0"
1616
},

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioBatchReceive.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
import {
2-
ReceiveMode,
3-
ServiceBusClient,
4-
ServiceBusReceivedMessage,
5-
ServiceBusReceiver
6-
} from "@azure/service-bus";
1+
import { ReceiveMode, ServiceBusClient, ServiceBusReceiver } from "@azure/service-bus";
72
import { SBStressTestsBase } from "./stressTestsBase";
83
import { delay } from "rhea-promise";
94
import parsedArgs from "minimist";
@@ -28,9 +23,11 @@ interface ScenarioReceiveBatchOptions {
2823
settleMessageOnReceive: boolean;
2924
}
3025

31-
function sanitizeOptions(
32-
options: ScenarioReceiveBatchOptions
33-
): Required<ScenarioReceiveBatchOptions> {
26+
function sanitizeOptions(args: string[]): Required<ScenarioReceiveBatchOptions> {
27+
const options = parsedArgs<ScenarioReceiveBatchOptions>(args, {
28+
boolean: ["settleMessageOnReceive"],
29+
default: { settleMessageOnReceive: false }
30+
});
3431
return {
3532
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
3633
receiveMode: (options.receiveMode as ReceiveMode) || "peekLock",
@@ -41,12 +38,12 @@ function sanitizeOptions(
4138
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
4239
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
4340
maxAutoLockRenewalDurationInMs: options.maxAutoLockRenewalDurationInMs || 0, // 0 = disabled
44-
settleMessageOnReceive: options.settleMessageOnReceive || false
41+
settleMessageOnReceive: options.settleMessageOnReceive
4542
};
4643
}
4744

4845
export async function scenarioReceiveBatch() {
49-
const testOptions = sanitizeOptions(parsedArgs<ScenarioReceiveBatchOptions>(process.argv));
46+
const testOptions = sanitizeOptions(process.argv);
5047
const {
5148
testDurationInMs,
5249
receiveMode,
@@ -72,7 +69,7 @@ export async function scenarioReceiveBatch() {
7269

7370
await stressBase.init(undefined, undefined, testOptions);
7471
const sender = sbClient.createSender(stressBase.queueName);
75-
let receiver: ServiceBusReceiver<ServiceBusReceivedMessage>;
72+
let receiver: ServiceBusReceiver;
7673

7774
if (receiveMode === "receiveAndDelete") {
7875
receiver = sbClient.createReceiver(stressBase.queueName, {

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioCloseOpen.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@ interface ScenarioCloseOptions {
1919
shouldCreateNewClientEachTime?: boolean;
2020
}
2121

22-
function sanitizeOptions(options: ScenarioCloseOptions): Required<ScenarioCloseOptions> {
22+
function sanitizeOptions(args: string[]): Required<ScenarioCloseOptions> {
23+
const options = parsedArgs<ScenarioCloseOptions>(args, {
24+
boolean: ["shouldCreateNewClientEachTime"],
25+
default: { shouldCreateNewClientEachTime: true }
26+
});
2327
return {
2428
testDurationInMs: options.testDurationInMs || 60 * 1000, // Default = 60 minutes
2529
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
2630
receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000,
2731
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
2832
delayBeforeCallingCloseInMs: options.delayBeforeCallingCloseInMs || 100,
29-
shouldCreateNewClientEachTime: options.shouldCreateNewClientEachTime || true
33+
shouldCreateNewClientEachTime: options.shouldCreateNewClientEachTime
3034
};
3135
}
3236

3337
export async function scenarioClose() {
34-
const testOptions = sanitizeOptions(parsedArgs<ScenarioCloseOptions>(process.argv));
38+
const testOptions = sanitizeOptions(process.argv);
3539

3640
const {
3741
testDurationInMs,

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioPeekMessages.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
import {
2-
ReceiveMode,
3-
ServiceBusClient,
4-
ServiceBusReceivedMessage,
5-
ServiceBusReceiver
6-
} from "@azure/service-bus";
1+
import { ServiceBusClient, ServiceBusReceiver } from "@azure/service-bus";
72
import { SBStressTestsBase } from "./stressTestsBase";
83
import { delay } from "rhea-promise";
94
import parsedArgs from "minimist";
@@ -61,7 +56,7 @@ export async function scenarioPeekMessages() {
6156

6257
await stressBase.init(undefined, undefined, testOptions);
6358
const sender = sbClient.createSender(stressBase.queueName);
64-
let receiver: ServiceBusReceiver<ServiceBusReceivedMessage>;
59+
let receiver: ServiceBusReceiver;
6560

6661
receiver = sbClient.createReceiver(stressBase.queueName, {
6762
receiveMode: "receiveAndDelete"

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewMessageLock.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ interface ScenarioRenewMessageLockOptions {
2020
completeMessageAfterDuration?: boolean;
2121
}
2222

23-
function sanitizeOptions(
24-
options: ScenarioRenewMessageLockOptions
25-
): Required<ScenarioRenewMessageLockOptions> {
23+
function sanitizeOptions(args: string[]): Required<ScenarioRenewMessageLockOptions> {
24+
const options = parsedArgs<ScenarioRenewMessageLockOptions>(args, {
25+
boolean: ["completeMessageAfterDuration"],
26+
default: { completeMessageAfterDuration: true }
27+
});
2628
return {
2729
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
2830
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
@@ -31,15 +33,12 @@ function sanitizeOptions(
3133
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
3234
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
3335
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
34-
completeMessageAfterDuration: options.completeMessageAfterDuration || true
36+
completeMessageAfterDuration: options.completeMessageAfterDuration
3537
};
3638
}
3739

38-
// TODO: max lock renewal duration to be 70% of testDuration instead of 100%
39-
// TODO: stop sending messages after a 70% of test duration
40-
// TODO: Upon ending max lock renewal duration, pass an option to complete/ignore the message
4140
export async function main() {
42-
const testOptions = sanitizeOptions(parsedArgs<ScenarioRenewMessageLockOptions>(process.argv));
41+
const testOptions = sanitizeOptions(process.argv);
4342

4443
const {
4544
testDurationInMs,
@@ -91,6 +90,7 @@ export async function main() {
9190
messages.map((msg) =>
9291
stressBase.renewMessageLockUntil(
9392
msg,
93+
receiver,
9494
testDurationForLockRenewalInMs - elapsedTime,
9595
completeMessageAfterDuration
9696
)

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewSessionLock.ts

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import {
2-
ServiceBusClient,
3-
ServiceBusReceivedMessage,
4-
ServiceBusSessionReceiver
5-
} from "@azure/service-bus";
1+
import { ServiceBusClient, ServiceBusSessionReceiver } from "@azure/service-bus";
62
import { SBStressTestsBase } from "./stressTestsBase";
73
import { delay } from "rhea-promise";
84
import parsedArgs from "minimist";
@@ -28,42 +24,51 @@ interface ScenarioRenewSessionLockOptions {
2824
* If this flag is set to true, manual lock renewal is disabled, related logging is also gone with it
2925
*/
3026
autoLockRenewal?: boolean;
27+
settleMessageOnReceive?: boolean;
28+
numberOfSessions?: number;
3129
}
3230

33-
function sanitizeOptions(
34-
options: ScenarioRenewSessionLockOptions
35-
): Required<ScenarioRenewSessionLockOptions> {
31+
function sanitizeOptions(args: string[]): Required<ScenarioRenewSessionLockOptions> {
32+
const options = parsedArgs<ScenarioRenewSessionLockOptions>(args, {
33+
boolean: ["autoLockRenewal", "settleMessageOnReceive"],
34+
default: { autoLockRenewal: false, settleMessageOnReceive: true }
35+
});
3636
return {
3737
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
3838
receiveBatchMaxMessageCount: options.receiveBatchMaxMessageCount || 10,
3939
receiveBatchMaxWaitTimeInMs: options.receiveBatchMaxWaitTimeInMs || 10000,
40+
numberOfSessions: options.numberOfSessions || 16,
4041
delayBetweenReceivesInMs: options.delayBetweenReceivesInMs || 0,
4142
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 100,
4243
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
4344
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
44-
autoLockRenewal: options.autoLockRenewal || false
45+
autoLockRenewal: options.autoLockRenewal,
46+
settleMessageOnReceive: options.settleMessageOnReceive
4547
};
4648
}
4749

4850
// TODO: max lock renewal duration to be 70% of testDuration instead of 100%
49-
// TODO: stop sending messages after a 70% of test duration
5051
// TODO: Upon ending max lock renewal duration, pass an option to complete/ignore the message
5152
export async function scenarioRenewSessionLock() {
52-
const testOptions = sanitizeOptions(parsedArgs<ScenarioRenewSessionLockOptions>(process.argv));
53+
const testOptions = sanitizeOptions(process.argv);
5354
const {
5455
testDurationInMs,
5556
receiveBatchMaxMessageCount,
5657
receiveBatchMaxWaitTimeInMs,
58+
numberOfSessions,
5759
delayBetweenReceivesInMs,
5860
numberOfMessagesPerSend,
5961
delayBetweenSendsInMs,
6062
totalNumberOfMessagesToSend,
61-
autoLockRenewal
63+
autoLockRenewal,
64+
settleMessageOnReceive
6265
} = testOptions;
6366

6467
const testDurationForSendInMs = testDurationInMs * 0.7;
6568
// Since we are focusing on session locks in this test
6669
const receiveMode = "receiveAndDelete";
70+
const useSessions = true;
71+
const useScheduleApi = false;
6772

6873
const startedAt = new Date();
6974

@@ -73,29 +78,35 @@ export async function scenarioRenewSessionLock() {
7378

7479
const sbClient = new ServiceBusClient(connectionString);
7580

76-
await stressBase.init(undefined, { requiresSession: true }, testOptions);
81+
await stressBase.init(undefined, { requiresSession: useSessions }, testOptions);
7782
const sender = sbClient.createSender(stressBase.queueName);
7883
async function sendMessages() {
7984
let elapsedTime = new Date().valueOf() - startedAt.valueOf();
8085
while (
8186
elapsedTime < testDurationForSendInMs &&
8287
stressBase.messagesSent.length < totalNumberOfMessagesToSend
8388
) {
84-
await stressBase.sendMessages([sender], numberOfMessagesPerSend, true);
89+
await stressBase.sendMessages(
90+
[sender],
91+
numberOfMessagesPerSend,
92+
useSessions,
93+
useScheduleApi,
94+
numberOfSessions
95+
);
8596
elapsedTime = new Date().valueOf() - startedAt.valueOf();
8697
await delay(delayBetweenSendsInMs);
8798
}
8899
}
89100

90-
let receivers: ServiceBusSessionReceiver<ServiceBusReceivedMessage>[] = [];
101+
let receivers: ServiceBusSessionReceiver[] = [];
91102
async function receiveMessages() {
92103
let elapsedTime = new Date().valueOf() - startedAt.valueOf();
93104
while (elapsedTime < testDurationInMs) {
94105
let receiver;
95106
try {
96107
receiver = await sbClient.acceptNextSession(stressBase.queueName, {
97108
receiveMode,
98-
maxAutoRenewLockDurationInMs: !autoLockRenewal ? 0 : testDurationInMs - elapsedTime
109+
maxAutoLockRenewalDurationInMs: !autoLockRenewal ? 0 : testDurationInMs - elapsedTime
99110
});
100111
} catch (error) {
101112
console.log(error);
@@ -104,7 +115,8 @@ export async function scenarioRenewSessionLock() {
104115
await stressBase.receiveMessages(
105116
receiver,
106117
receiveBatchMaxMessageCount,
107-
receiveBatchMaxWaitTimeInMs
118+
receiveBatchMaxWaitTimeInMs,
119+
settleMessageOnReceive
108120
);
109121
receivers.push(receiver);
110122
if (!autoLockRenewal) {

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioSend.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@ interface ScenarioSimpleSendOptions {
1818
// Define connection string and related Service Bus entity names here
1919
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
2020

21-
function sanitizeOptions(options: ScenarioSimpleSendOptions): Required<ScenarioSimpleSendOptions> {
21+
function sanitizeOptions(args: string[]): Required<ScenarioSimpleSendOptions> {
22+
const options = parsedArgs<ScenarioSimpleSendOptions>(args, {
23+
boolean: ["useScheduleApi"],
24+
default: { useScheduleApi: false }
25+
});
2226
return {
2327
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
2428
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
2529
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
2630
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
27-
useScheduleApi: options.useScheduleApi || false
31+
useScheduleApi: options.useScheduleApi
2832
};
2933
}
3034

3135
async function main() {
32-
const testOptions = sanitizeOptions(parsedArgs<ScenarioSimpleSendOptions>(process.argv));
36+
const testOptions = sanitizeOptions(process.argv);
3337

3438
const {
3539
testDurationInMs,

sdk/servicebus/service-bus/test/stress-test-track-2/scenarioStreamingReceive.ts

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
import {
2-
ReceiveMode,
3-
ServiceBusClient,
4-
ServiceBusReceivedMessage,
5-
ServiceBusReceiver
6-
} from "@azure/service-bus";
1+
import { ReceiveMode, ServiceBusClient, ServiceBusReceiver } from "@azure/service-bus";
72
import { SBStressTestsBase } from "./stressTestsBase";
83
import { delay } from "rhea-promise";
94
import parsedArgs from "minimist";
@@ -29,26 +24,38 @@ interface ScenarioStreamingReceiveOptions {
2924
settleMessageOnReceive?: boolean;
3025
}
3126

32-
function sanitizeOptions(
33-
options: ScenarioStreamingReceiveOptions
34-
): Required<ScenarioStreamingReceiveOptions> {
27+
function sanitizeOptions(args: string[]): Required<ScenarioStreamingReceiveOptions> {
28+
const options = parsedArgs<ScenarioStreamingReceiveOptions>(args, {
29+
boolean: [
30+
"autoComplete",
31+
"manualLockRenewal",
32+
"completeMessageAfterDuration",
33+
"settleMessageOnReceive"
34+
],
35+
default: {
36+
autoComplete: true,
37+
manualLockRenewal: true,
38+
completeMessageAfterDuration: true,
39+
settleMessageOnReceive: false
40+
}
41+
});
3542
return {
3643
testDurationInMs: options.testDurationInMs || 60 * 60 * 1000, // Default = 60 minutes
3744
receiveMode: (options.receiveMode as ReceiveMode) || "peekLock",
38-
autoComplete: options.autoComplete || true,
45+
autoComplete: options.autoComplete,
3946
maxConcurrentCalls: options.maxConcurrentCalls || 100,
4047
maxAutoRenewLockDurationInMs: options.maxAutoRenewLockDurationInMs || 0,
41-
manualLockRenewal: options.manualLockRenewal || true,
48+
manualLockRenewal: options.manualLockRenewal,
4249
numberOfMessagesPerSend: options.numberOfMessagesPerSend || 1,
4350
delayBetweenSendsInMs: options.delayBetweenSendsInMs || 0,
4451
totalNumberOfMessagesToSend: options.totalNumberOfMessagesToSend || Infinity,
45-
completeMessageAfterDuration: options.completeMessageAfterDuration || true,
46-
settleMessageOnReceive: options.settleMessageOnReceive || false
52+
completeMessageAfterDuration: options.completeMessageAfterDuration,
53+
settleMessageOnReceive: options.settleMessageOnReceive
4754
};
4855
}
4956

5057
export async function scenarioStreamingReceive() {
51-
const testOptions = sanitizeOptions(parsedArgs<ScenarioStreamingReceiveOptions>(process.argv));
58+
const testOptions = sanitizeOptions(process.argv);
5259
const {
5360
testDurationInMs,
5461
receiveMode,
@@ -73,7 +80,7 @@ export async function scenarioStreamingReceive() {
7380

7481
await stressBase.init(undefined, undefined, testOptions);
7582
const sender = sbClient.createSender(stressBase.queueName);
76-
let receiver: ServiceBusReceiver<ServiceBusReceivedMessage>;
83+
let receiver: ServiceBusReceiver;
7784

7885
if (receiveMode === "receiveAndDelete") {
7986
receiver = sbClient.createReceiver(stressBase.queueName, {

0 commit comments

Comments
 (0)