Skip to content

Commit 714c5a1

Browse files
authored
[Service Bus] Move settlement & renewLock methods from message to receiver (Azure#11962)
1 parent f58d925 commit 714c5a1

32 files changed

+696
-682
lines changed

sdk/servicebus/service-bus/review/service-bus.api.md

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,9 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {
192192
}
193193

194194
// @public
195-
export interface MessageHandlers<ReceivedMessageT> {
195+
export interface MessageHandlers {
196196
processError(args: ProcessErrorArgs): Promise<void>;
197-
processMessage(message: ReceivedMessageT): Promise<void>;
197+
processMessage(message: ServiceBusReceivedMessage): Promise<void>;
198198
}
199199

200200
export { MessagingError }
@@ -348,19 +348,19 @@ export class ServiceBusAdministrationClient extends ServiceClient {
348348
export class ServiceBusClient {
349349
constructor(connectionString: string, options?: ServiceBusClientOptions);
350350
constructor(fullyQualifiedNamespace: string, credential: TokenCredential, options?: ServiceBusClientOptions);
351-
acceptNextSession(queueName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessageWithLock>>;
352-
acceptNextSession(queueName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessage>>;
353-
acceptNextSession(topicName: string, subscriptionName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessageWithLock>>;
354-
acceptNextSession(topicName: string, subscriptionName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessage>>;
355-
acceptSession(queueName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessageWithLock>>;
356-
acceptSession(queueName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessage>>;
357-
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessageWithLock>>;
358-
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver<ServiceBusReceivedMessage>>;
351+
acceptNextSession(queueName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
352+
acceptNextSession(queueName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
353+
acceptNextSession(topicName: string, subscriptionName: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
354+
acceptNextSession(topicName: string, subscriptionName: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
355+
acceptSession(queueName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
356+
acceptSession(queueName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
357+
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options?: AcceptSessionOptions<"peekLock">): Promise<ServiceBusSessionReceiver>;
358+
acceptSession(topicName: string, subscriptionName: string, sessionId: string, options: AcceptSessionOptions<"receiveAndDelete">): Promise<ServiceBusSessionReceiver>;
359359
close(): Promise<void>;
360-
createReceiver(queueName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver<ServiceBusReceivedMessageWithLock>;
361-
createReceiver(queueName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver<ServiceBusReceivedMessage>;
362-
createReceiver(topicName: string, subscriptionName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver<ServiceBusReceivedMessageWithLock>;
363-
createReceiver(topicName: string, subscriptionName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver<ServiceBusReceivedMessage>;
360+
createReceiver(queueName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver;
361+
createReceiver(queueName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver;
362+
createReceiver(topicName: string, subscriptionName: string, options?: CreateReceiverOptions<"peekLock">): ServiceBusReceiver;
363+
createReceiver(topicName: string, subscriptionName: string, options: CreateReceiverOptions<"receiveAndDelete">): ServiceBusReceiver;
364364
createSender(queueOrTopicName: string): ServiceBusSender;
365365
fullyQualifiedNamespace: string;
366366
}
@@ -431,31 +431,27 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
431431
}
432432

433433
// @public
434-
export interface ServiceBusReceivedMessageWithLock extends ServiceBusReceivedMessage {
435-
abandon(propertiesToModify?: {
434+
export interface ServiceBusReceiver {
435+
abandonMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
436436
[key: string]: any;
437437
}): Promise<void>;
438-
complete(): Promise<void>;
439-
deadLetter(options?: DeadLetterOptions & {
438+
close(): Promise<void>;
439+
completeMessage(message: ServiceBusReceivedMessage): Promise<void>;
440+
deadLetterMessage(message: ServiceBusReceivedMessage, options?: DeadLetterOptions & {
440441
[key: string]: any;
441442
}): Promise<void>;
442-
defer(propertiesToModify?: {
443+
deferMessage(message: ServiceBusReceivedMessage, propertiesToModify?: {
443444
[key: string]: any;
444445
}): Promise<void>;
445-
renewLock(): Promise<Date>;
446-
}
447-
448-
// @public
449-
export interface ServiceBusReceiver<ReceivedMessageT> {
450-
close(): Promise<void>;
451446
entityPath: string;
452-
getMessageIterator(options?: GetMessageIteratorOptions): AsyncIterableIterator<ReceivedMessageT>;
447+
getMessageIterator(options?: GetMessageIteratorOptions): AsyncIterableIterator<ServiceBusReceivedMessage>;
453448
isClosed: boolean;
454449
peekMessages(maxMessageCount: number, options?: PeekMessagesOptions): Promise<ServiceBusReceivedMessage[]>;
455-
receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptionsBase): Promise<ReceivedMessageT[]>;
456-
receiveMessages(maxMessageCount: number, options?: ReceiveMessagesOptions): Promise<ReceivedMessageT[]>;
450+
receiveDeferredMessages(sequenceNumbers: Long | Long[], options?: OperationOptionsBase): Promise<ServiceBusReceivedMessage[]>;
451+
receiveMessages(maxMessageCount: number, options?: ReceiveMessagesOptions): Promise<ServiceBusReceivedMessage[]>;
457452
receiveMode: "peekLock" | "receiveAndDelete";
458-
subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): {
453+
renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date>;
454+
subscribe(handlers: MessageHandlers, options?: SubscribeOptions): {
459455
close(): Promise<void>;
460456
};
461457
}
@@ -473,13 +469,13 @@ export interface ServiceBusSender {
473469
}
474470

475471
// @public
476-
export interface ServiceBusSessionReceiver<ReceivedMessageT extends ServiceBusReceivedMessage | ServiceBusReceivedMessageWithLock> extends ServiceBusReceiver<ReceivedMessageT> {
472+
export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
477473
getSessionState(options?: OperationOptionsBase): Promise<any>;
478474
renewSessionLock(options?: OperationOptionsBase): Promise<Date>;
479475
readonly sessionId: string;
480476
readonly sessionLockedUntilUtc: Date;
481477
setSessionState(state: any, options?: OperationOptionsBase): Promise<void>;
482-
subscribe(handlers: MessageHandlers<ReceivedMessageT>, options?: SubscribeOptions): {
478+
subscribe(handlers: MessageHandlers, options?: SubscribeOptions): {
483479
close(): Promise<void>;
484480
};
485481
}

sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ export function createProcessingSpan(
9999
// NOTE: the connectionConfig also has an entityPath property but that only
100100
// represents the optional entityPath in their connection string which is NOT
101101
// what we want for tracing.
102-
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
102+
receiver: Pick<ServiceBusReceiver, "entityPath">,
103103
connectionConfig: Pick<ConnectionContext["config"], "host">,
104104
options?: OperationOptionsBase
105105
): Span {
@@ -145,7 +145,7 @@ export function createProcessingSpan(
145145
*/
146146
export function createAndEndProcessingSpan(
147147
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
148-
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
148+
receiver: Pick<ServiceBusReceiver, "entityPath">,
149149
connectionConfig: Pick<ConnectionContext["config"], "host">,
150150
options?: OperationOptionsBase
151151
): void {

sdk/servicebus/service-bus/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ export {
7070
AmqpMessageProperties,
7171
DeadLetterOptions,
7272
ServiceBusReceivedMessage,
73-
ServiceBusReceivedMessageWithLock,
7473
ServiceBusMessage
7574
} from "./serviceBusMessage";
7675
export { ServiceBusMessageBatch } from "./serviceBusMessageBatch";

sdk/servicebus/service-bus/src/models.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
55
import Long from "long";
6+
import { ServiceBusReceivedMessage } from "./serviceBusMessage";
67
import { MessagingError } from "@azure/core-amqp";
78

89
/**
@@ -36,13 +37,13 @@ export interface ProcessErrorArgs {
3637
/**
3738
* The general message handler interface (used for streamMessages).
3839
*/
39-
export interface MessageHandlers<ReceivedMessageT> {
40+
export interface MessageHandlers {
4041
/**
4142
* Handler that processes messages from service bus.
4243
*
4344
* @param message A message received from Service Bus.
4445
*/
45-
processMessage(message: ReceivedMessageT): Promise<void>;
46+
processMessage(message: ServiceBusReceivedMessage): Promise<void>;
4647
/**
4748
* Handler that processes errors that occur during receiving.
4849
* @param args The error and additional context to indicate where
@@ -55,8 +56,7 @@ export interface MessageHandlers<ReceivedMessageT> {
5556
* @internal
5657
* @ignore
5758
*/
58-
export interface InternalMessageHandlers<ReceivedMessageT>
59-
extends MessageHandlers<ReceivedMessageT> {
59+
export interface InternalMessageHandlers extends MessageHandlers {
6060
/**
6161
* Called when the connection is initialized but before we subscribe to messages or add credits.
6262
*

0 commit comments

Comments
 (0)