Skip to content

Commit 108b295

Browse files
[Service Bus] Enable sending large messages (Azure#23014)
* new test * fix sending large messages * formatting and docs * format * changelog * more docs * Update sdk/servicebus/service-bus/CHANGELOG.md Co-authored-by: Jeremy Meng <yumeng@microsoft.com> * Update sdk/servicebus/service-bus/test/internal/sendBatch.spec.ts * Apply suggestions from code review * fix error message Co-authored-by: Jeremy Meng <yumeng@microsoft.com>
1 parent a3de182 commit 108b295

File tree

4 files changed

+72
-8
lines changed

4 files changed

+72
-8
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Updating the `sendMessages` API to not batch the messages when it is just a single message. This allows users to send individual large message (>1MB) using the `sendMessages` API. [#23014](https://github.com/Azure/azure-sdk-for-js/pull/23014)
12+
1113
### Other Changes
1214

1315
## 7.7.0 (2022-08-09)

sdk/servicebus/service-bus/src/sender.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import {
2525
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
2626
import { TracingSpanLink } from "@azure/core-tracing";
2727
import { senderLogger as logger } from "./log";
28-
import { ServiceBusError } from "./serviceBusError";
2928
import { toSpanOptions, tracingClient } from "./diagnostics/tracing";
3029
import { ensureValidIdentifier } from "./util/utils";
30+
import { ServiceBusError } from "./serviceBusError";
3131

3232
/**
3333
* A Sender can be used to send messages, schedule messages to be sent at a later time
@@ -49,6 +49,14 @@ export interface ServiceBusSender {
4949
* - All messages passed to the same sendMessages() call should have the same `sessionId` (if using
5050
* sessions) and the same `partitionKey` (if using partitions).
5151
*
52+
* **Note:**
53+
*
54+
* __If you want to send messages of size greater than 1MB, please send individual messages instead of sending a batched message or an array of messages like below.__
55+
*
56+
* `await sender.sendMessages(message);`
57+
*
58+
* __This is because the batched messages are not capable of sending the larger messages yet. You'll hit the `force detached` error in this case otherwise. Read [service-bus-premium-messaging#large-messages-support](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-premium-messaging#large-messages-support). More info at [#23014](https://github.com/Azure/azure-sdk-for-js/pull/23014).__
59+
*
5260
* @param messages - A single message or an array of messages or a batch of messages created via the createBatch()
5361
* method to send.
5462
* @param options - Options bag to pass an abort signal or tracing options.
@@ -202,13 +210,28 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
202210
this._throwIfSenderOrConnectionClosed();
203211
throwTypeErrorIfParameterMissing(this._context.connectionId, "messages", messages);
204212

213+
if (!isServiceBusMessageBatch(messages) && !Array.isArray(messages)) {
214+
// Case 1: Single message
215+
throwIfNotValidServiceBusMessage(messages, errorInvalidMessageTypeSingleOrArray);
216+
return tracingClient.withSpan(
217+
"ServiceBusSender.send",
218+
options ?? {},
219+
(updatedOptions) => this._sender.send(messages, updatedOptions),
220+
{
221+
...toSpanOptions(
222+
{ entityPath: this.entityPath, host: this._context.config.host },
223+
"client"
224+
),
225+
}
226+
);
227+
}
228+
205229
let batch: ServiceBusMessageBatch;
206230
if (isServiceBusMessageBatch(messages)) {
231+
// Case 2: Batch message
207232
batch = messages;
208233
} else {
209-
if (!Array.isArray(messages)) {
210-
messages = [messages];
211-
}
234+
// Case 3: Array of messages
212235
batch = await this.createMessageBatch(options);
213236
for (const message of messages) {
214237
throwIfNotValidServiceBusMessage(message, errorInvalidMessageTypeSingleOrArray);

sdk/servicebus/service-bus/test/internal/sendBatch.spec.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ describe("Send Batch", () => {
3434
});
3535

3636
async function beforeEachTest(entityType: TestClientType): Promise<void> {
37-
entityNames = await serviceBusClient.test.createTestEntities(entityType);
37+
entityNames = await serviceBusClient.test.createTestEntities(entityType, {
38+
maxMessageSizeInKilobytes: 102400,
39+
});
3840

3941
sender = serviceBusClient.test.addToCleanup(
4042
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
@@ -258,6 +260,37 @@ describe("Send Batch", () => {
258260
});
259261
});
260262

263+
describe("Send single message - size > 1 MB", function (): void {
264+
afterEach(async () => {
265+
await afterEachTest();
266+
});
267+
268+
function prepareMessage(useSessions: boolean): ServiceBusMessage {
269+
return {
270+
body: Buffer.alloc(1024 * 1024),
271+
sessionId: useSessions ? `s` : undefined,
272+
};
273+
}
274+
275+
async function testSend(): Promise<void> {
276+
// Prepare messages to send
277+
const messageToSend = prepareMessage(entityNames.usesSessions);
278+
await sender.sendMessages(messageToSend);
279+
// receive all the messages in receive and delete mode
280+
await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, [messageToSend]);
281+
}
282+
283+
it(`${noSessionTestClientType}: SendBatch`, async function (): Promise<void> {
284+
await beforeEachTest(noSessionTestClientType);
285+
await testSend();
286+
});
287+
288+
it(`${withSessionTestClientType}: SendBatch`, async function (): Promise<void> {
289+
await beforeEachTest(withSessionTestClientType);
290+
await testSend();
291+
});
292+
});
293+
261294
describe("Send multiple heterogenous messages - size > max_batch_size_allowed", function (): void {
262295
afterEach(async () => {
263296
await afterEachTest();

sdk/servicebus/service-bus/test/public/utils/testutils2.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ function getEntityNames(testClientType: TestClientType): AutoGeneratedEntity {
9393
}
9494

9595
async function createTestEntities(
96-
testClientType: TestClientType
96+
testClientType: TestClientType,
97+
options?: { maxMessageSizeInKilobytes: number }
9798
): Promise<ReturnType<typeof getEntityNames>> {
9899
const relatedEntities = getEntityNames(testClientType);
99100

@@ -103,13 +104,15 @@ async function createTestEntities(
103104
enableBatchedOperations: true,
104105
enablePartitioning: relatedEntities.isPartitioned,
105106
requiresSession: relatedEntities.usesSessions,
107+
maxMessageSizeInKilobytes: options?.maxMessageSizeInKilobytes,
106108
});
107109
}
108110

109111
if (relatedEntities.topic) {
110112
await recreateTopic(relatedEntities.topic, {
111113
enablePartitioning: relatedEntities.isPartitioned,
112114
enableBatchedOperations: true,
115+
maxMessageSizeInKilobytes: options?.maxMessageSizeInKilobytes,
113116
});
114117
}
115118

@@ -315,14 +318,17 @@ export class ServiceBusTestHelpers {
315318
* entity from _this_ instance of the ServiceBusTestHelpers.
316319
*/
317320
async createTestEntities(
318-
testClientType: TestClientType
321+
testClientType: TestClientType,
322+
options?: {
323+
maxMessageSizeInKilobytes: number;
324+
}
319325
): Promise<ReturnType<typeof getEntityNames>> {
320326
// TODO: for now these aren't randomly named. This is prep so we can
321327
// do that soon.
322328
let entityValues = this._testClientEntities.get(testClientType);
323329

324330
if (entityValues == null) {
325-
entityValues = await createTestEntities(testClientType);
331+
entityValues = await createTestEntities(testClientType, options);
326332
this._testClientEntities.set(testClientType, entityValues);
327333
}
328334

0 commit comments

Comments
 (0)