Skip to content

Commit 1940e98

Browse files
authored
Add new ServiceBus perf test (Azure#31576)
* perf test * add test to perf1 * add prefetch parameter in options * fix old test
1 parent df7c47a commit 1940e98

File tree

17 files changed

+617
-15
lines changed

17 files changed

+617
-15
lines changed

sdk/servicebus/azure-messaging-servicebus-track1-perf/src/main/java/com/microsoft/azure/servicebus/perf/App.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ public static void main(String[] args) {
2929
ReceiveAndDeleteMessageTest.class,
3030
ReceiveAndLockMessageTest.class,
3131
SendMessageTest.class,
32-
SendMessagesTest.class
32+
SendMessagesTest.class,
33+
SendBatchTest.class,
34+
ReceiveMessagesTest.class
3335
};
3436

3537
PerfStressProgram.run(testClasses, args);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.microsoft.azure.servicebus.perf;
5+
6+
import com.microsoft.azure.servicebus.IMessage;
7+
import com.microsoft.azure.servicebus.perf.core.ServiceBatchTest;
8+
import com.microsoft.azure.servicebus.perf.core.ServiceBusStressOptions;
9+
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
10+
import reactor.core.publisher.Mono;
11+
12+
import java.util.Collection;
13+
import java.util.List;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
public class ReceiveMessagesTest extends ServiceBatchTest<ServiceBusStressOptions> {
17+
/**
18+
* Creates an instance of Batch performance test.
19+
*
20+
* @param options the options configured for the test.
21+
* @throws IllegalStateException if SSL context cannot be created.
22+
*/
23+
public ReceiveMessagesTest(ServiceBusStressOptions options) {
24+
super(options);
25+
}
26+
27+
@Override
28+
public int runBatch() {
29+
Collection<IMessage> messages;
30+
try {
31+
messages = receiver.receiveBatch(options.getMessagesToReceive());
32+
if (messages.size() <= 0) {
33+
throw new RuntimeException("Error. Should have received some messages.");
34+
}
35+
} catch (InterruptedException | ServiceBusException e) {
36+
throw new RuntimeException(e);
37+
}
38+
return messages.size();
39+
}
40+
41+
@Override
42+
public Mono<Integer> runBatchAsync() {
43+
int receiveCount = options.getMessagesToReceive();
44+
return Mono.fromFuture(receiver.receiveBatchAsync(receiveCount))
45+
.handle((messages, synchronousSink) -> {
46+
int count = messages.size();
47+
if (count <= 0) {
48+
synchronousSink.error(new RuntimeException("Error. Should have received some messages."));
49+
}
50+
synchronousSink.complete();
51+
}).then().thenReturn(receiveCount);
52+
}
53+
54+
@Override
55+
public Mono<Void> setupAsync() {
56+
return super.setupAsync()
57+
.then(sendMessage());
58+
}
59+
60+
@Override
61+
public Mono<Void> cleanupAsync() {
62+
return Mono.fromFuture(CompletableFuture.allOf(sender.closeAsync(), receiver.closeAsync()))
63+
.then(super.cleanupAsync());
64+
}
65+
66+
private Mono<Void> sendMessage() {
67+
// Since test does warm up and test many times, we are sending many messages, so we will have them available.
68+
for (int i = 0; i < TOTAL_MESSAGE_MULTIPLIER; i++) {
69+
List<IMessage> messages = ServiceBusTestUtil.getMessagesToSend(options.getMessagesSizeBytesToSend(), options.getMessagesToSend());
70+
sender.sendBatchAsync(messages);
71+
}
72+
return Mono.empty();
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.microsoft.azure.servicebus.perf;
5+
6+
import com.microsoft.azure.servicebus.IMessage;
7+
import com.microsoft.azure.servicebus.perf.core.ServiceBatchTest;
8+
import com.microsoft.azure.servicebus.perf.core.ServiceBusStressOptions;
9+
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
10+
import reactor.core.publisher.Mono;
11+
12+
import java.util.List;
13+
14+
public class SendBatchTest extends ServiceBatchTest<ServiceBusStressOptions> {
15+
private final List<IMessage> messages;
16+
17+
/**
18+
* Creates an instance of Batch performance test.
19+
*
20+
* @param options the options configured for the test.
21+
* @throws IllegalStateException if SSL context cannot be created.
22+
*/
23+
public SendBatchTest(ServiceBusStressOptions options) {
24+
super(options);
25+
messages = ServiceBusTestUtil.getMessagesToSend(options.getMessagesSizeBytesToSend(), options.getMessagesToSend());
26+
}
27+
28+
@Override
29+
public int runBatch() {
30+
try {
31+
sender.sendBatch(messages);
32+
} catch (InterruptedException | ServiceBusException e) {
33+
throw new RuntimeException(e);
34+
}
35+
return messages.size();
36+
}
37+
38+
@Override
39+
public Mono<Integer> runBatchAsync() {
40+
return Mono.fromFuture(sender.sendBatchAsync(messages)).thenReturn(messages.size());
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.microsoft.azure.servicebus.perf;
5+
6+
import com.azure.perf.test.core.TestDataCreationHelper;
7+
import com.microsoft.azure.servicebus.IMessage;
8+
import com.microsoft.azure.servicebus.Message;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.UUID;
13+
14+
public class ServiceBusTestUtil {
15+
16+
public static List<IMessage> getMessagesToSend(int messageSize, int messageToSend) {
17+
String messageContent = TestDataCreationHelper.generateRandomString(messageSize);
18+
List<IMessage> messages = new ArrayList<>();
19+
for (int i = 0; i < messageToSend; ++i) {
20+
Message message = new Message(messageContent);
21+
message.setMessageId(UUID.randomUUID().toString());
22+
messages.add(message);
23+
}
24+
25+
return messages;
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.microsoft.azure.servicebus.perf.core;
2+
3+
import com.azure.core.util.CoreUtils;
4+
import com.azure.perf.test.core.BatchPerfTest;
5+
import com.microsoft.azure.servicebus.ClientFactory;
6+
import com.microsoft.azure.servicebus.IMessageReceiver;
7+
import com.microsoft.azure.servicebus.IMessageSender;
8+
import com.microsoft.azure.servicebus.ReceiveMode;
9+
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
10+
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
11+
12+
import java.util.concurrent.ExecutionException;
13+
14+
public abstract class ServiceBatchTest<TOptions extends ServiceBusStressOptions> extends BatchPerfTest<TOptions> {
15+
16+
private static final String AZURE_SERVICE_BUS_CONNECTION_STRING = "AZURE_SERVICE_BUS_CONNECTION_STRING";
17+
private static final String AZURE_SERVICEBUS_QUEUE_NAME = "AZURE_SERVICEBUS_QUEUE_NAME";
18+
protected static final int TOTAL_MESSAGE_MULTIPLIER = 300;
19+
20+
private final MessagingFactory factory;
21+
22+
protected IMessageSender sender;
23+
protected IMessageReceiver receiver;
24+
25+
/**
26+
* Creates an instance of Batch performance test.
27+
*
28+
* @param options the options configured for the test.
29+
* @throws IllegalStateException if SSL context cannot be created.
30+
*/
31+
public ServiceBatchTest(TOptions options) {
32+
super(options);
33+
String connectionString = System.getenv(AZURE_SERVICE_BUS_CONNECTION_STRING);
34+
if (CoreUtils.isNullOrEmpty(connectionString)) {
35+
throw new IllegalArgumentException(
36+
String.format("Environment variable %s must be set", AZURE_SERVICE_BUS_CONNECTION_STRING));
37+
}
38+
39+
String queueName = System.getenv(AZURE_SERVICEBUS_QUEUE_NAME);
40+
if (CoreUtils.isNullOrEmpty(queueName)) {
41+
throw new IllegalArgumentException(
42+
String.format("Environment variable %s must be set", AZURE_SERVICEBUS_QUEUE_NAME));
43+
}
44+
45+
ReceiveMode receiveMode = options.getIsDeleteMode() ? ReceiveMode.RECEIVEANDDELETE : ReceiveMode.PEEKLOCK;
46+
// Setup the service client
47+
try {
48+
this.factory = MessagingFactory.createFromConnectionString(connectionString);
49+
this.sender = ClientFactory.createMessageSenderFromEntityPath(factory, queueName);
50+
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, queueName, receiveMode);
51+
} catch (ServiceBusException | InterruptedException | ExecutionException e) {
52+
throw new RuntimeException(e);
53+
}
54+
}
55+
}

sdk/servicebus/azure-messaging-servicebus-track1-perf/src/main/java/com/microsoft/azure/servicebus/perf/core/ServiceBusStressOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public class ServiceBusStressOptions extends PerfStressOptions {
2222
@Parameter(names = { "-msb", "--messageSizeBytes" }, description = "Size(in bytes) of one Message")
2323
private int messagesSizeBytesToSend = 10;
2424

25+
@Parameter(names = { "-idm", "--isDeleteMode" }, description = "Receiver client is receive_and_delete mode or peek_lock mode")
26+
private boolean isDeleteMode = true;
27+
2528
/**
2629
* Get the configured messagesToSend option for performance test.
2730
* @return The size.
@@ -45,4 +48,12 @@ public int getMessagesToReceive() {
4548
public int getMessagesSizeBytesToSend() {
4649
return messagesSizeBytesToSend;
4750
}
51+
52+
/**
53+
* Get the configured isDeleteMode option for performance test.
54+
* @return Receive mod is receive_and_delete mode or not.
55+
*/
56+
public boolean getIsDeleteMode() {
57+
return isDeleteMode;
58+
}
4859
}

sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/App.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ public static void main(String[] args) {
2929
ReceiveAndDeleteMessageTest.class,
3030
ReceiveAndLockMessageTest.class,
3131
SendMessageTest.class,
32-
SendMessagesTest.class
32+
SendMessagesTest.class,
33+
SendBatchTest.class,
34+
ReceiveMessagesTest.class,
35+
ServiceBusProcessorTest.class
3336
};
3437

3538
PerfStressProgram.run(testClasses, args);

sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndDeleteMessageTest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.core.util.logging.ClientLogger;
88
import com.azure.messaging.servicebus.ServiceBusMessage;
99
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
10+
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
1011
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
1112
import com.azure.perf.test.core.TestDataCreationHelper;
1213
import reactor.core.publisher.Mono;
@@ -68,11 +69,19 @@ public void run() {
6869

6970
@Override
7071
public Mono<Void> runAsync() {
71-
return receiverAsync
72-
.receiveMessages()
73-
.take(options.getMessagesToReceive())
74-
.map(serviceBusReceivedMessageContext -> {
75-
return serviceBusReceivedMessageContext;
76-
}).then();
72+
return Mono.using(
73+
receiverBuilder::buildAsyncClient,
74+
serviceBusReceiverAsyncClient -> {
75+
return serviceBusReceiverAsyncClient
76+
.receiveMessages()
77+
.take(options.getMessagesToReceive())
78+
.map(message -> {
79+
return message;
80+
})
81+
.then();
82+
},
83+
ServiceBusReceiverAsyncClient::close,
84+
true
85+
);
7786
}
7887
}

sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndLockMessageTest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.core.util.logging.ClientLogger;
99
import com.azure.messaging.servicebus.ServiceBusMessage;
1010
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
11+
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
1112
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
1213
import com.azure.perf.test.core.TestDataCreationHelper;
1314
import reactor.core.publisher.Mono;
@@ -68,11 +69,19 @@ public void run() {
6869

6970
@Override
7071
public Mono<Void> runAsync() {
71-
return receiverAsync
72-
.receiveMessages()
73-
.take(options.getMessagesToReceive())
74-
.flatMap(message -> {
75-
return receiverAsync.complete(message).thenReturn(true);
76-
}, 1).then();
72+
return Mono.using(
73+
receiverBuilder::buildAsyncClient,
74+
receiverAsyncClient -> {
75+
return receiverAsyncClient
76+
.receiveMessages()
77+
.take(options.getMessagesToReceive())
78+
.flatMap(message -> {
79+
receiverAsyncClient.complete(message);
80+
return Mono.just(message);
81+
}, 1).then();
82+
},
83+
ServiceBusReceiverAsyncClient::close,
84+
false
85+
);
7786
}
7887
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.servicebus.perf;
5+
6+
import com.azure.core.util.logging.ClientLogger;
7+
import com.azure.messaging.servicebus.ServiceBusMessage;
8+
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
9+
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
10+
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
11+
import com.azure.perf.test.core.TestDataCreationHelper;
12+
import reactor.core.publisher.Mono;
13+
14+
import java.util.List;
15+
import java.util.UUID;
16+
17+
/**
18+
* Test ServiceBus receiver client receive messages performance. After receive messages, return a count of messages to record.
19+
*/
20+
public class ReceiveMessagesTest extends ServiceBusBatchTest<ServiceBusStressOptions> {
21+
private static final ClientLogger LOGGER = new ClientLogger(ReceiveMessagesTest.class);
22+
23+
/**
24+
* Creates an instance of Batch performance test.
25+
*
26+
* @param options the options configured for the test.
27+
* @throws IllegalStateException if SSL context cannot be created.
28+
*/
29+
public ReceiveMessagesTest(ServiceBusStressOptions options) {
30+
super(options);
31+
}
32+
33+
@Override
34+
public int runBatch() {
35+
int count = 0;
36+
for (ServiceBusReceivedMessage message : receiver.receiveMessages(options.getMessagesToReceive())) {
37+
if (!options.getIsDeleteMode()) {
38+
receiver.complete(message);
39+
}
40+
count++;
41+
}
42+
if (count <= 0) {
43+
throw LOGGER.logExceptionAsWarning(new RuntimeException("Error. Should have received some messages."));
44+
}
45+
return count;
46+
}
47+
48+
@Override
49+
public Mono<Integer> runBatchAsync() {
50+
int receiveCount = options.getMessagesToReceive();
51+
return Mono.using(
52+
receiverClientBuilder::buildAsyncClient,
53+
receiverAsync -> {
54+
return receiverAsync.receiveMessages()
55+
.take(receiveCount)
56+
.flatMap(message -> {
57+
if (!options.getIsDeleteMode()) {
58+
receiverAsync.complete(message);
59+
}
60+
return Mono.empty();
61+
}, 1)
62+
.then()
63+
.thenReturn(receiveCount);
64+
},
65+
ServiceBusReceiverAsyncClient::close,
66+
true
67+
);
68+
}
69+
70+
@Override
71+
public Mono<Void> setupAsync() {
72+
// Since test does warm up and test many times, we are sending many messages, so we will have them available.
73+
for (int i = 0; i < TOTAL_MESSAGE_MULTIPLIER; i++) {
74+
List<ServiceBusMessage> messages = ServiceBusTestUtil.geMessagesToSend(options.getMessagesSizeBytesToSend(), options.getMessagesToSend());
75+
sender.sendMessages(messages);
76+
}
77+
return Mono.empty();
78+
}
79+
80+
}

0 commit comments

Comments
 (0)