Skip to content

Commit ff9d8cd

Browse files
authored
[Service Bus] Throw when partitionKey is not same as sessionId (Azure#12490)
1 parent 6831f7a commit ff9d8cd

File tree

4 files changed

+78
-27
lines changed

4 files changed

+78
-27
lines changed

sdk/servicebus/service-bus/src/sender.ts

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
import Long from "long";
55
import { MessageSender } from "./core/messageSender";
6-
import { ServiceBusMessage, isServiceBusMessage } from "./serviceBusMessage";
6+
import { ServiceBusMessage } from "./serviceBusMessage";
77
import { ConnectionContext } from "./connectionContext";
88
import {
99
getSenderClosedErrorMsg,
1010
throwErrorIfConnectionClosed,
11+
throwIfNotValidServiceBusMessage,
1112
throwTypeErrorIfParameterMissing,
1213
throwTypeErrorIfParameterNotLong
1314
} from "./util/errors";
@@ -190,16 +191,18 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
190191

191192
// link message span contexts
192193
let spanContextsToLink: SpanContext[] = [];
193-
if (isServiceBusMessage(messages)) {
194-
messages = [messages];
195-
}
194+
196195
let batch: ServiceBusMessageBatch;
197-
if (Array.isArray(messages)) {
196+
if (isServiceBusMessageBatch(messages)) {
197+
spanContextsToLink = messages._messageSpanContexts;
198+
batch = messages;
199+
} else {
200+
if (!Array.isArray(messages)) {
201+
messages = [messages];
202+
}
198203
batch = await this.createMessageBatch(options);
199204
for (const message of messages) {
200-
if (!isServiceBusMessage(message)) {
201-
throw new TypeError(invalidTypeErrMsg);
202-
}
205+
throwIfNotValidServiceBusMessage(message, invalidTypeErrMsg);
203206
if (!batch.tryAddMessage(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
204207
// this is too big - throw an error
205208
const error = new MessagingError(
@@ -209,11 +212,6 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
209212
throw error;
210213
}
211214
}
212-
} else if (isServiceBusMessageBatch(messages)) {
213-
spanContextsToLink = messages._messageSpanContexts;
214-
batch = messages;
215-
} else {
216-
throw new TypeError(invalidTypeErrMsg);
217215
}
218216

219217
const sendSpan = createSendSpan(
@@ -258,11 +256,10 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
258256
const messagesToSchedule = Array.isArray(messages) ? messages : [messages];
259257

260258
for (const message of messagesToSchedule) {
261-
if (!isServiceBusMessage(message)) {
262-
throw new TypeError(
263-
"Provided value for 'messages' must be of type ServiceBusMessage or an array of type ServiceBusMessage."
264-
);
265-
}
259+
throwIfNotValidServiceBusMessage(
260+
message,
261+
"Provided value for 'messages' must be of type ServiceBusMessage or an array of type ServiceBusMessage."
262+
);
266263
}
267264

268265
const scheduleMessageOperationPromise = async () => {

sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import {
55
ServiceBusMessage,
66
toRheaMessage,
7-
isServiceBusMessage,
87
getMessagePropertyTypeMismatchError
98
} from "./serviceBusMessage";
10-
import { throwTypeErrorIfParameterMissing } from "./util/errors";
9+
import { throwIfNotValidServiceBusMessage, throwTypeErrorIfParameterMissing } from "./util/errors";
1110
import { ConnectionContext } from "./connectionContext";
1211
import {
1312
MessageAnnotations,
@@ -246,9 +245,10 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
246245
*/
247246
public tryAddMessage(message: ServiceBusMessage, options: TryAddOptions = {}): boolean {
248247
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", message);
249-
if (!isServiceBusMessage(message)) {
250-
throw new TypeError("Provided value for 'message' must be of type ServiceBusMessage.");
251-
}
248+
throwIfNotValidServiceBusMessage(
249+
message,
250+
"Provided value for 'message' must be of type ServiceBusMessage."
251+
);
252252

253253
// check if the event has already been instrumented
254254
const previouslyInstrumented = Boolean(

sdk/servicebus/service-bus/src/util/errors.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { logger, receiverLogger } from "../log";
55
import Long from "long";
66
import { ConnectionContext } from "../connectionContext";
7-
import { ServiceBusReceivedMessage } from "../serviceBusMessage";
7+
import { isServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";
88
import { ReceiveMode } from "../models";
99

1010
/**
@@ -249,3 +249,27 @@ export function throwErrorIfInvalidOperationOnMessage(
249249
throw error;
250250
}
251251
}
252+
253+
/**
254+
* Error message for when the ServiceBusMessage provided by the user has different values
255+
* for partitionKey and sessionId.
256+
* @internal
257+
* @throw
258+
*/
259+
export const PartitionKeySessionIdMismatchError =
260+
"The fields 'partitionKey' and 'sessionId' cannot have different values.";
261+
/**
262+
* Throws error if the given object is not a valid ServiceBusMessage
263+
* @internal
264+
* @ignore
265+
* @param msg The object that needs to be validated as a ServiceBusMessage
266+
* @param errorMessageForWrongType The error message to use when given object is not a ServiceBusMessage
267+
*/
268+
export function throwIfNotValidServiceBusMessage(msg: any, errorMessageForWrongType: string): void {
269+
if (!isServiceBusMessage(msg)) {
270+
throw new TypeError(errorMessageForWrongType);
271+
}
272+
if (msg.partitionKey && msg.sessionId && msg.partitionKey !== msg.sessionId) {
273+
throw new TypeError(PartitionKeySessionIdMismatchError);
274+
}
275+
}

sdk/servicebus/service-bus/test/internal/sender.spec.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { ConnectionContext } from "../../src/connectionContext";
77
import { ServiceBusMessage } from "../../src";
88
import { isServiceBusMessageBatch, ServiceBusSenderImpl } from "../../src/sender";
99
import { createConnectionContextForTests } from "./unittestUtils";
10+
import { PartitionKeySessionIdMismatchError } from "../../src/util/errors";
1011

1112
const assert = chai.assert;
1213

@@ -29,58 +30,87 @@ describe("sender unit tests", () => {
2930
return new ServiceBusMessageBatchImpl(fakeContext, 100);
3031
};
3132

32-
["hello", {}, 123, null, undefined, ["hello"]].forEach((invalidValue) => {
33+
const partitionKeySessionIdMismatchMsg = {
34+
body: "boooo",
35+
sessionId: "my-sessionId",
36+
partitionKey: "my-partitionKey"
37+
};
38+
const badMessages = [
39+
"hello",
40+
{},
41+
123,
42+
null,
43+
undefined,
44+
["hello"],
45+
partitionKeySessionIdMismatchMsg
46+
];
47+
48+
badMessages.forEach((invalidValue) => {
3349
it(`don't allow Sender.sendMessages(${invalidValue})`, async () => {
3450
let expectedErrorMsg =
3551
"Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage.";
3652
if (invalidValue === null || invalidValue === undefined) {
3753
expectedErrorMsg = `Missing parameter "messages"`;
3854
}
55+
if (invalidValue === partitionKeySessionIdMismatchMsg) {
56+
expectedErrorMsg = PartitionKeySessionIdMismatchError;
57+
}
58+
3959
try {
4060
await sender.sendMessages(
4161
// @ts-expect-error
4262
invalidValue
4363
);
64+
assert.fail("You should not be seeing this.");
4465
} catch (err) {
4566
assert.equal(err.name, "TypeError");
4667
assert.equal(err.message, expectedErrorMsg);
4768
}
4869
});
4970
});
5071

51-
["hello", {}, null, undefined].forEach((invalidValue) => {
72+
badMessages.forEach((invalidValue) => {
5273
it(`don't allow tryAdd(${invalidValue})`, async () => {
5374
const batch = await sender.createMessageBatch();
5475
let expectedErrorMsg = "Provided value for 'message' must be of type ServiceBusMessage.";
5576
if (invalidValue === null || invalidValue === undefined) {
5677
expectedErrorMsg = `Missing parameter "message"`;
5778
}
79+
if (invalidValue === partitionKeySessionIdMismatchMsg) {
80+
expectedErrorMsg = PartitionKeySessionIdMismatchError;
81+
}
82+
5883
try {
5984
batch.tryAddMessage(
6085
// @ts-expect-error
6186
invalidValue
6287
);
88+
assert.fail("You should not be seeing this.");
6389
} catch (err) {
6490
assert.equal(err.name, "TypeError");
6591
assert.equal(err.message, expectedErrorMsg);
6692
}
6793
});
6894
});
6995

70-
["hello", {}, null, undefined, ["hello"]].forEach((invalidValue) => {
96+
badMessages.forEach((invalidValue) => {
7197
it(`don't allow Sender.scheduleMessages(${invalidValue})`, async () => {
7298
let expectedErrorMsg =
7399
"Provided value for 'messages' must be of type ServiceBusMessage or an array of type ServiceBusMessage.";
74100
if (invalidValue === null || invalidValue === undefined) {
75101
expectedErrorMsg = `Missing parameter "messages"`;
76102
}
103+
if (invalidValue === partitionKeySessionIdMismatchMsg) {
104+
expectedErrorMsg = PartitionKeySessionIdMismatchError;
105+
}
77106

78107
try {
79108
await sender.scheduleMessages(
80109
// @ts-expect-error
81110
invalidValue,
82111
new Date()
83112
);
113+
assert.fail("You should not be seeing this.");
84114
} catch (err) {
85115
assert.equal(err.name, "TypeError");
86116
assert.equal(err.message, expectedErrorMsg);

0 commit comments

Comments
 (0)