Skip to content

Commit 9f2a8de

Browse files
[Service Bus] Fix test failures (Azure#23081)
1 parent f646db8 commit 9f2a8de

File tree

2 files changed

+115
-35
lines changed

2 files changed

+115
-35
lines changed

sdk/servicebus/service-bus/karma.conf.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ module.exports = function (config) {
4646
// environment values MUST be exported or set with same console running "karma start"
4747
// https://www.npmjs.com/package/karma-env-preprocessor
4848
envPreprocessor: [
49+
"SERVICEBUS_CONNECTION_STRING_PREMIUM",
4950
"SERVICEBUS_CONNECTION_STRING",
5051
"AZURE_CLIENT_ID",
5152
"AZURE_CLIENT_SECRET",

sdk/servicebus/service-bus/test/internal/sendBatch.spec.ts

Lines changed: 114 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ const should = chai.should();
66
const assert = chai.assert;
77
import chaiAsPromised from "chai-as-promised";
88
chai.use(chaiAsPromised);
9-
import { OperationOptions, ServiceBusMessage } from "../../src";
9+
import {
10+
OperationOptions,
11+
ServiceBusAdministrationClient,
12+
ServiceBusClient,
13+
ServiceBusMessage,
14+
} from "../../src";
1015
import { TestClientType } from "../public/utils/testUtils";
1116
import {
1217
EntityName,
@@ -16,6 +21,8 @@ import {
1621
getRandomTestClientTypeWithNoSessions,
1722
} from "../public/utils/testutils2";
1823
import { ServiceBusSender, ServiceBusSenderImpl } from "../../src/sender";
24+
import { getEnvVarValue } from "../public/utils/envVarUtils";
25+
import { delay } from "@azure/core-util";
1926

2027
describe("Send Batch", () => {
2128
let sender: ServiceBusSender;
@@ -34,9 +41,7 @@ describe("Send Batch", () => {
3441
});
3542

3643
async function beforeEachTest(entityType: TestClientType): Promise<void> {
37-
entityNames = await serviceBusClient.test.createTestEntities(entityType, {
38-
maxMessageSizeInKilobytes: 102400,
39-
});
44+
entityNames = await serviceBusClient.test.createTestEntities(entityType);
4045

4146
sender = serviceBusClient.test.addToCleanup(
4247
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
@@ -260,37 +265,6 @@ describe("Send Batch", () => {
260265
});
261266
});
262267

263-
describe("Send single message - size > 1 MB", function (): void {
264-
afterEach(async () => {
265-
await afterEachTest();
266-
});
267-
268-
function prepareMessage(useSessions: boolean): ServiceBusMessage {
269-
return {
270-
body: Buffer.alloc(1024 * 1024),
271-
sessionId: useSessions ? `s` : undefined,
272-
};
273-
}
274-
275-
async function testSend(): Promise<void> {
276-
// Prepare messages to send
277-
const messageToSend = prepareMessage(entityNames.usesSessions);
278-
await sender.sendMessages(messageToSend);
279-
// receive all the messages in receive and delete mode
280-
await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, [messageToSend]);
281-
}
282-
283-
it(`${noSessionTestClientType}: SendBatch`, async function (): Promise<void> {
284-
await beforeEachTest(noSessionTestClientType);
285-
await testSend();
286-
});
287-
288-
it(`${withSessionTestClientType}: SendBatch`, async function (): Promise<void> {
289-
await beforeEachTest(withSessionTestClientType);
290-
await testSend();
291-
});
292-
});
293-
294268
describe("Send multiple heterogenous messages - size > max_batch_size_allowed", function (): void {
295269
afterEach(async () => {
296270
await afterEachTest();
@@ -546,3 +520,108 @@ describe("Send Batch", () => {
546520
);
547521
});
548522
});
523+
524+
describe("Premium namespaces - Sending", () => {
525+
const premiumConnectionString = getEnvVarValue("SERVICEBUS_CONNECTION_STRING_PREMIUM");
526+
let atomClient: ServiceBusAdministrationClient;
527+
528+
before(function (this: Mocha.Context) {
529+
if (!premiumConnectionString) {
530+
this.skip();
531+
}
532+
atomClient = new ServiceBusAdministrationClient(premiumConnectionString);
533+
});
534+
let sender: ServiceBusSender;
535+
let serviceBusClient: ServiceBusClient;
536+
let queueName: string | undefined;
537+
let topicName: string | undefined;
538+
let subscriptionName: string | undefined;
539+
let withSessions: boolean;
540+
541+
const noSessionTestClientType =
542+
Math.random() > 0.5 ? TestClientType.UnpartitionedQueue : TestClientType.UnpartitionedTopic;
543+
const withSessionTestClientType =
544+
Math.random() > 0.5
545+
? TestClientType.UnpartitionedQueueWithSessions
546+
: TestClientType.UnpartitionedTopicWithSessions;
547+
548+
before(() => {
549+
serviceBusClient = new ServiceBusClient(premiumConnectionString || "");
550+
});
551+
552+
after(async () => {
553+
await serviceBusClient.close();
554+
});
555+
556+
async function beforeEachTest(entityType: TestClientType): Promise<void> {
557+
atomClient = new ServiceBusAdministrationClient(premiumConnectionString || "");
558+
withSessions = !entityType.includes("WithSessions");
559+
const randomSeed = Math.ceil(Math.random() * 10000 + 1000);
560+
const isQueue = entityType.includes("Queue");
561+
if (isQueue) {
562+
queueName = "queue-" + randomSeed;
563+
await atomClient.createQueue(queueName, {
564+
requiresSession: withSessions,
565+
maxMessageSizeInKilobytes: 10240,
566+
}); // 10 MB
567+
sender = serviceBusClient.createSender(queueName);
568+
} else {
569+
topicName = "topic-" + randomSeed;
570+
subscriptionName = "subscription-" + randomSeed;
571+
await atomClient.createTopic(topicName, { maxMessageSizeInKilobytes: 10240 }); // 10 MB
572+
await atomClient.createSubscription(topicName, subscriptionName, {
573+
requiresSession: withSessions,
574+
});
575+
sender = serviceBusClient.createSender(topicName);
576+
}
577+
}
578+
579+
async function afterEachTest(): Promise<void> {
580+
await sender.close();
581+
if (queueName) {
582+
await atomClient.deleteQueue(queueName);
583+
queueName = undefined;
584+
} else if (topicName) {
585+
await atomClient.deleteTopic(topicName);
586+
topicName = undefined;
587+
}
588+
}
589+
590+
describe("Send single message - size > 1 MB (Large messages)", function (): void {
591+
afterEach(async () => {
592+
await afterEachTest();
593+
});
594+
595+
function prepareMessage(useSessions: boolean): ServiceBusMessage {
596+
return {
597+
body: Buffer.alloc(1024 * 1024),
598+
sessionId: useSessions ? `s` : undefined,
599+
};
600+
}
601+
602+
async function testSend(): Promise<void> {
603+
// Prepare messages to send
604+
const messageToSend = prepareMessage(withSessions);
605+
await sender.sendMessages(messageToSend);
606+
await delay(1000);
607+
should.equal(
608+
queueName
609+
? (await atomClient.getQueueRuntimeProperties(queueName)).totalMessageCount
610+
: (await atomClient.getSubscriptionRuntimeProperties(topicName!, subscriptionName!))
611+
.totalMessageCount,
612+
1,
613+
`Unexpected number of messages are present in the entity.`
614+
);
615+
}
616+
617+
it(`${noSessionTestClientType}: SendBatch`, async function (): Promise<void> {
618+
await beforeEachTest(noSessionTestClientType);
619+
await testSend();
620+
});
621+
622+
it(`${withSessionTestClientType}: SendBatch`, async function (): Promise<void> {
623+
await beforeEachTest(withSessionTestClientType);
624+
await testSend();
625+
});
626+
});
627+
});

0 commit comments

Comments
 (0)