Skip to content

Commit 4cf3566

Browse files
[service-bus] Final work to expose AMQP body type encoding publicly (Azure#15295)
Finishing work to enable AMQP body type encoding - We completely missed the schedule messages code path. Added, with tests. - Consolidated the two files of AMQP messaging tests into the `public` branch so we also get min/max testing. Removed all references to non-published interfaces.
1 parent 5fe25b1 commit 4cf3566

File tree

9 files changed

+466
-343
lines changed

9 files changed

+466
-343
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 7.2.0-beta.1 (Unreleased)
3+
## 7.2.0-beta.1 (2021-05-18)
44

55
### New Features
66

sdk/servicebus/service-bus/review/service-bus.api.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ export interface ServiceBusMessageBatch {
400400
// @internal
401401
readonly _messageSpanContexts: SpanContext[];
402402
readonly sizeInBytes: number;
403-
tryAddMessage(message: ServiceBusMessage, options?: TryAddOptions): boolean;
403+
tryAddMessage(message: ServiceBusMessage | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
404404
}
405405

406406
// @public
@@ -458,8 +458,8 @@ export interface ServiceBusSender {
458458
createMessageBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch>;
459459
entityPath: string;
460460
isClosed: boolean;
461-
scheduleMessages(messages: ServiceBusMessage | ServiceBusMessage[], scheduledEnqueueTimeUtc: Date, options?: OperationOptionsBase): Promise<Long[]>;
462-
sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: OperationOptionsBase): Promise<void>;
461+
scheduleMessages(messages: ServiceBusMessage | ServiceBusMessage[] | AmqpAnnotatedMessage | AmqpAnnotatedMessage[], scheduledEnqueueTimeUtc: Date, options?: OperationOptionsBase): Promise<Long[]>;
462+
sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch | AmqpAnnotatedMessage | AmqpAnnotatedMessage[], options?: OperationOptionsBase): Promise<void>;
463463
}
464464

465465
// @public

sdk/servicebus/service-bus/src/core/managementClient.ts

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import {
2020
MessagingError,
2121
RequestResponseLink,
2222
SendRequestOptions,
23-
RetryOptions
23+
RetryOptions,
24+
AmqpAnnotatedMessage
2425
} from "@azure/core-amqp";
2526
import { ConnectionContext } from "../connectionContext";
2627
import {
@@ -29,7 +30,9 @@ import {
2930
ServiceBusMessage,
3031
ServiceBusMessageImpl,
3132
toRheaMessage,
32-
fromRheaMessage
33+
fromRheaMessage,
34+
updateScheduledTime,
35+
updateMessageId
3336
} from "../serviceBusMessage";
3437
import { LinkEntity, RequestResponseLinkOptions } from "./linkEntity";
3538
import { managementClientLogger, receiverLogger, senderLogger, ServiceBusLogger } from "../log";
@@ -585,7 +588,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
585588
*/
586589
async scheduleMessages(
587590
scheduledEnqueueTimeUtc: Date,
588-
messages: ServiceBusMessage[],
591+
messages: ServiceBusMessage[] | AmqpAnnotatedMessage[],
589592
options?: OperationOptionsBase & SendManagementRequestOptions
590593
): Promise<Long[]> {
591594
throwErrorIfConnectionClosed(this._context);
@@ -595,21 +598,28 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
595598
const messageBody: any[] = [];
596599
for (let i = 0; i < messages.length; i++) {
597600
const item = messages[i];
598-
if (!item.messageId) item.messageId = generate_uuid();
599-
item.scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc;
600601

601602
try {
602-
const amqpMessage = toRheaMessage(item, defaultDataTransformer);
603-
604-
const entry: any = {
605-
message: RheaMessageUtil.encode(amqpMessage),
606-
"message-id": item.messageId
603+
const rheaMessage = toRheaMessage(item, defaultDataTransformer);
604+
updateMessageId(rheaMessage, rheaMessage.message_id || generate_uuid());
605+
updateScheduledTime(rheaMessage, scheduledEnqueueTimeUtc);
606+
607+
const entry: {
608+
message: Buffer;
609+
["message-id"]: ServiceBusMessage["messageId"];
610+
["partition-key"]?: ServiceBusMessage["partitionKey"];
611+
[Constants.sessionIdMapKey]?: string | undefined;
612+
} = {
613+
message: RheaMessageUtil.encode(rheaMessage),
614+
"message-id": rheaMessage.message_id
607615
};
608-
if (item.sessionId) {
609-
entry[Constants.sessionIdMapKey] = item.sessionId;
616+
617+
if (rheaMessage.group_id) {
618+
entry[Constants.sessionIdMapKey] = rheaMessage.group_id;
610619
}
611-
if (item.partitionKey) {
612-
entry["partition-key"] = item.partitionKey;
620+
621+
if (rheaMessage.message_annotations?.[Constants.partitionKey]) {
622+
entry["partition-key"] = rheaMessage.message_annotations[Constants.partitionKey];
613623
}
614624

615625
// Will be required later for implementing Transactions
@@ -1319,3 +1329,41 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
13191329
}
13201330
}
13211331
}
1332+
1333+
/**
1334+
* Converts an AmqpAnnotatedMessage or ServiceBusMessage into a properly formatted
1335+
* message for sending to the mgmt link for scheduling.
1336+
*
1337+
* @internal
1338+
* @hidden
1339+
*/
1340+
export function toScheduleableMessage(
1341+
item: ServiceBusMessage | AmqpAnnotatedMessage,
1342+
scheduledEnqueueTimeUtc: Date
1343+
) {
1344+
const rheaMessage = toRheaMessage(item, defaultDataTransformer);
1345+
updateMessageId(rheaMessage, rheaMessage.message_id || generate_uuid());
1346+
updateScheduledTime(rheaMessage, scheduledEnqueueTimeUtc);
1347+
1348+
const entry: any = {
1349+
message: RheaMessageUtil.encode(rheaMessage),
1350+
"message-id": rheaMessage.message_id
1351+
};
1352+
1353+
rheaMessage.message_annotations = {
1354+
...rheaMessage.message_annotations,
1355+
[Constants.scheduledEnqueueTime]: scheduledEnqueueTimeUtc
1356+
};
1357+
1358+
if (rheaMessage.group_id) {
1359+
entry[Constants.sessionIdMapKey] = rheaMessage.group_id;
1360+
}
1361+
1362+
const partitionKey =
1363+
rheaMessage.message_annotations && rheaMessage.message_annotations[Constants.partitionKey];
1364+
1365+
if (partitionKey) {
1366+
entry["partition-key"] = partitionKey;
1367+
}
1368+
return entry;
1369+
}

sdk/servicebus/service-bus/src/sender.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ export interface ServiceBusSender {
5151
* @throws `ServiceBusError` if the service returns an error while sending messages to the service.
5252
*/
5353
sendMessages(
54-
messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch,
54+
messages:
55+
| ServiceBusMessage
56+
| ServiceBusMessage[]
57+
| ServiceBusMessageBatch
58+
| AmqpAnnotatedMessage
59+
| AmqpAnnotatedMessage[],
5560
options?: OperationOptionsBase
5661
): Promise<void>;
5762

@@ -98,7 +103,11 @@ export interface ServiceBusSender {
98103
* @throws `ServiceBusError` if the service returns an error while scheduling messages.
99104
*/
100105
scheduleMessages(
101-
messages: ServiceBusMessage | ServiceBusMessage[],
106+
messages:
107+
| ServiceBusMessage
108+
| ServiceBusMessage[]
109+
| AmqpAnnotatedMessage
110+
| AmqpAnnotatedMessage[],
102111
scheduledEnqueueTimeUtc: Date,
103112
options?: OperationOptionsBase
104113
): Promise<Long[]>;
@@ -242,7 +251,11 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
242251
}
243252

244253
async scheduleMessages(
245-
messages: ServiceBusMessage | ServiceBusMessage[],
254+
messages:
255+
| ServiceBusMessage
256+
| ServiceBusMessage[]
257+
| AmqpAnnotatedMessage
258+
| AmqpAnnotatedMessage[],
246259
scheduledEnqueueTimeUtc: Date,
247260
options: OperationOptionsBase = {}
248261
): Promise<Long[]> {
@@ -299,16 +312,14 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
299312
? sequenceNumbers
300313
: [sequenceNumbers];
301314
const cancelSchedulesMessagesOperationPromise = async (): Promise<void> => {
302-
return this._context.getManagementClient(this._entityPath).cancelScheduledMessages(
303-
sequenceNumbersToCancel,
304-
305-
{
315+
return this._context
316+
.getManagementClient(this._entityPath)
317+
.cancelScheduledMessages(sequenceNumbersToCancel, {
306318
...options,
307319
associatedLinkName: this._sender.name,
308320
requestName: "cancelScheduledMessages",
309321
timeoutInMs: this._retryOptions.timeoutInMs
310-
}
311-
);
322+
});
312323
};
313324
const config: RetryConfig<void> = {
314325
operation: cancelSchedulesMessagesOperationPromise,

0 commit comments

Comments
 (0)