Skip to content

Commit 70d860e

Browse files
[Service Bus] Session Receiver (Both batching and streaming) during a disconnect (Azure#13956)
### Issue Azure#8875 ### What's in the PR "disconnect"/refresh logic added before did not cover the session scenarios. This PR attempts to tackle the batching receiver for sessions upon "disconnect" scenarios to have a smooth-resolution/throw-errors based on the receiveMode. Streaming receiver calls processError with SessionLockLost and closes the link ### receiveMessages - Scenarios to handle/test - [x] throws "session lock has expired" after a disconnect - [x] returns messages if drain is in progress (receiveAndDelete) - [x] throws an error if drain is in progress (peekLock) - [x] returns messages if receive in progress (receiveAndDelete) - [x] throws an error if receive is in progress (peekLock) ### Streaming receiver - [x] Test - calls processError and closes the link ### TODO - [x] Cover the scenarios above - [x] Tests - [x] Bug fix - number of receivers - Azure#13990 - [x] Changelog - [x] Streaming receiver - ~~beyond the scope of this PR~~ Azure#14212 - [ ] Bad log messages Azure#13989 - beyond the scope of this PR - [ ] Stress testing plan for disconnect - Azure#13988 - beyond the scope of this PR Fixes Azure#8875
1 parent 2efddae commit 70d860e

File tree

8 files changed

+712
-334
lines changed

8 files changed

+712
-334
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
- Re-exports `RetryMode` for use when setting the `RetryOptions.mode` field
66
in `ServiceBusClientOptions`.
77
Resolves [#13166](https://github.com/Azure/azure-sdk-for-js/issues/13166).
8+
- When receiving messages from sessions using either the `ServiceBusSessionReceiver.receiveMessages` method or the `ServiceBusSessionReceiver.subscribe` method, errors on the AMQP link or session were being handled well, but an error on the AMQP connection like a network disconnect was not being handled at all. This results in the promise returned by the `receiveMessages` method never getting fulfilled and the `subscribe` method not calling the user provided error handler.
9+
This is now fixed in [#13956](https://github.com/Azure/azure-sdk-for-js/pull/13956) to throw `SessionLockLostError`. If using the `receiveMessages` method in `receiveAndDelete` mode, then the messages collected so far are returned to avoid data loss.
10+
811
- Allow null as a value for the properties in `ServiceBusMessage.applicationProperties`.
912
Fixes [#14329](https://github.com/Azure/azure-sdk-for-js/issues/14329)
1013

sdk/servicebus/service-bus/src/connectionContext.ts

Lines changed: 117 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import { ManagementClient } from "./core/managementClient";
2525
import { formatUserAgentPrefix } from "./util/utils";
2626
import { getRuntimeInfo } from "./util/runtimeInfo";
2727
import { SharedKeyCredential } from "./servicebusSharedKeyCredential";
28-
import { ReceiverType } from "./core/linkEntity";
28+
import { NonSessionReceiverType, ReceiverType } from "./core/linkEntity";
29+
import { ServiceBusError } from "./serviceBusError";
2930

3031
/**
3132
* @internal
@@ -134,15 +135,16 @@ type ConnectionContextMethods = Omit<
134135

135136
/**
136137
* @internal
137-
* Helper method to call onDetached on the receivers from the connection context upon seeing an error.
138+
* Helper method to call onDetached on the non-sessions batching and streaming receivers from the connection context upon seeing an error.
138139
*/
139140
async function callOnDetachedOnReceivers(
140141
connectionContext: ConnectionContext,
141142
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined,
142-
receiverType: ReceiverType
143+
receiverType: NonSessionReceiverType
143144
): Promise<void[]> {
144145
const detachCalls: Promise<void>[] = [];
145146

147+
// Iterating over non-sessions batching and streaming receivers
146148
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
147149
const receiver = connectionContext.messageReceivers[receiverName];
148150
if (receiver && receiver.receiverType === receiverType) {
@@ -165,6 +167,54 @@ async function callOnDetachedOnReceivers(
165167
);
166168
}
167169
}
170+
return Promise.all(detachCalls);
171+
}
172+
173+
/**
174+
* @internal
175+
* Helper method to call onDetached on the session receivers from the connection context upon seeing an error.
176+
*/
177+
async function callOnDetachedOnSessionReceivers(
178+
connectionContext: ConnectionContext,
179+
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined
180+
): Promise<void[]> {
181+
const getSessionError = (sessionId: string, entityPath: string) => {
182+
const sessionInfo =
183+
`The receiver for session "${sessionId}" in "${entityPath}" has been closed and can no longer be used. ` +
184+
`Please create a new receiver using the "acceptSession" or "acceptNextSession" method on the ServiceBusClient.`;
185+
186+
const errorMessage =
187+
contextOrConnectionError == null
188+
? `Unknown error occurred on the AMQP connection while receiving messages. ` + sessionInfo
189+
: `Error occurred on the AMQP connection while receiving messages. ` +
190+
sessionInfo +
191+
`\nMore info - \n${contextOrConnectionError}`;
192+
193+
const error = new ServiceBusError(errorMessage, "SessionLockLost");
194+
error.retryable = false;
195+
return error;
196+
};
197+
198+
const detachCalls: Promise<void>[] = [];
199+
200+
for (const receiverName of Object.keys(connectionContext.messageSessions)) {
201+
const receiver = connectionContext.messageSessions[receiverName];
202+
logger.verbose(
203+
"[%s] calling detached on %s receiver(sessions).",
204+
connectionContext.connection.id,
205+
receiver.name
206+
);
207+
detachCalls.push(
208+
receiver.onDetached(getSessionError(receiver.sessionId, receiver.entityPath)).catch((err) => {
209+
logger.logError(
210+
err,
211+
"[%s] An error occurred while calling onDetached() on the session receiver(sessions) '%s'",
212+
connectionContext.connection.id,
213+
receiver.name
214+
);
215+
})
216+
);
217+
}
168218

169219
return Promise.all(detachCalls);
170220
}
@@ -375,63 +425,76 @@ export namespace ConnectionContext {
375425
await connectionContext.managementClients[entityPath].close();
376426
}
377427

378-
// Calling onDetached on sender
379-
if (!state.wasConnectionCloseCalled && state.numSenders) {
380-
// We don't do recovery for the sender:
381-
// Because we don't want to keep the sender active all the time
382-
// and the "next" send call would bear the burden of creating the link.
383-
// Call onDetached() on sender so that it can gracefully shutdown
384-
// by cleaning up the timers and closing the links.
385-
// We don't call onDetached for sender after `refreshConnection()`
386-
// because any new send calls that potentially initialize links would also get affected if called later.
387-
logger.verbose(
388-
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
389-
`senders. We should not reconnect.`
390-
);
391-
const detachCalls: Promise<void>[] = [];
392-
for (const senderName of Object.keys(connectionContext.senders)) {
393-
const sender = connectionContext.senders[senderName];
394-
if (sender) {
395-
logger.verbose(
396-
"[%s] calling detached on sender '%s'.",
397-
connectionContext.connection.id,
398-
sender.name
399-
);
400-
detachCalls.push(
401-
sender.onDetached().catch((err) => {
402-
logger.logError(
403-
err,
404-
"[%s] An error occurred while calling onDetached() the sender '%s'",
405-
connectionContext.connection.id,
406-
sender.name
407-
);
408-
})
409-
);
428+
if (state.wasConnectionCloseCalled) {
429+
// Do Nothing
430+
} else {
431+
// Calling onDetached on sender
432+
if (state.numSenders) {
433+
// We don't do recovery for the sender:
434+
// Because we don't want to keep the sender active all the time
435+
// and the "next" send call would bear the burden of creating the link.
436+
// Call onDetached() on sender so that it can gracefully shutdown
437+
// by cleaning up the timers and closing the links.
438+
// We don't call onDetached for sender after `refreshConnection()`
439+
// because any new send calls that potentially initialize links would also get affected if called later.
440+
logger.verbose(
441+
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
442+
`senders. We should not reconnect.`
443+
);
444+
const detachCalls: Promise<void>[] = [];
445+
for (const senderName of Object.keys(connectionContext.senders)) {
446+
const sender = connectionContext.senders[senderName];
447+
if (sender) {
448+
logger.verbose(
449+
"[%s] calling detached on sender '%s'.",
450+
connectionContext.connection.id,
451+
sender.name
452+
);
453+
detachCalls.push(
454+
sender.onDetached().catch((err) => {
455+
logger.logError(
456+
err,
457+
"[%s] An error occurred while calling onDetached() the sender '%s'",
458+
connectionContext.connection.id,
459+
sender.name
460+
);
461+
})
462+
);
463+
}
410464
}
465+
await Promise.all(detachCalls);
411466
}
412-
await Promise.all(detachCalls);
413-
}
414467

415-
// Calling onDetached on batching receivers for the same reasons as sender
416-
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
417-
if (!state.wasConnectionCloseCalled && numBatchingReceivers) {
418-
logger.verbose(
419-
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
420-
`batching receivers. We should reconnect.`
421-
);
468+
// Calling onDetached on batching receivers for the same reasons as sender
469+
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
470+
if (numBatchingReceivers) {
471+
logger.verbose(
472+
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
473+
`batching receivers. We should not reconnect.`
474+
);
422475

423-
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
424-
await callOnDetachedOnReceivers(
425-
connectionContext,
426-
connectionError || contextError,
427-
"batching"
428-
);
476+
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
477+
await callOnDetachedOnReceivers(
478+
connectionContext,
479+
connectionError || contextError,
480+
"batching"
481+
);
482+
}
429483

430-
// TODO:
431-
// `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers".
432-
// ...What to do for sessions (connectionContext.messageSessions) ??
433-
}
484+
// Calling onDetached on session receivers
485+
const numSessionReceivers = getNumberOfReceivers(connectionContext, "session");
486+
if (numSessionReceivers) {
487+
logger.verbose(
488+
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numSessionReceivers} ` +
489+
`session receivers. We should close them.`
490+
);
434491

492+
await callOnDetachedOnSessionReceivers(
493+
connectionContext,
494+
connectionError || contextError
495+
);
496+
}
497+
}
435498
await refreshConnection();
436499
waitForConnectionRefreshResolve();
437500
waitForConnectionRefreshPromise = undefined;

sdk/servicebus/service-bus/src/core/batchingReceiver.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ export class BatchingReceiver extends MessageReceiver {
112112
options: OperationOptionsBase
113113
): Promise<ServiceBusMessageImpl[]> {
114114
throwErrorIfConnectionClosed(this._context);
115-
116115
try {
117116
logger.verbose(
118117
"[%s] Receiver '%s', setting max concurrent calls to 0.",

sdk/servicebus/service-bus/src/core/linkEntity.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,14 @@ export interface RequestResponseLinkOptions {
4848
/**
4949
* @internal
5050
*/
51-
export type ReceiverType =
51+
export type NonSessionReceiverType =
5252
| "batching" // batching receiver
53-
| "streaming" // streaming receiver;
54-
| "session"; // message session
53+
| "streaming"; // streaming receiver
54+
55+
/**
56+
* @internal
57+
*/
58+
export type ReceiverType = NonSessionReceiverType | "session"; // message session
5559

5660
/**
5761
* @internal

sdk/servicebus/service-bus/src/serviceBusError.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class ServiceBusError extends MessagingError {
156156

157157
/**
158158
* Translates an error into either an Error or a ServiceBusError which provides a `reason` code that
159-
* can be used by clients to programatically react to errors.
159+
* can be used by clients to programmatically react to errors.
160160
*
161161
* If you are calling `@azure/core-amqp/translate` you should swap to using this function instead since it provides
162162
* Service Bus specific handling of the error (falling back to default translate behavior otherwise).

sdk/servicebus/service-bus/src/session/messageSession.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ export class MessageSession extends LinkEntity<Receiver> {
534534
/**
535535
* Closes the underlying AMQP receiver link.
536536
*/
537-
async close(): Promise<void> {
537+
async close(error?: Error | AmqpError): Promise<void> {
538538
try {
539539
this._isReceivingMessagesForSubscriber = false;
540540
if (this._sessionLockRenewalTimer) clearTimeout(this._sessionLockRenewalTimer);
@@ -546,7 +546,7 @@ export class MessageSession extends LinkEntity<Receiver> {
546546

547547
await super.close();
548548

549-
await this._batchingReceiverLite.terminate();
549+
this._batchingReceiverLite.terminate(error);
550550
} catch (err) {
551551
logger.logError(
552552
err,
@@ -762,6 +762,36 @@ export class MessageSession extends LinkEntity<Receiver> {
762762
}
763763
}
764764

765+
/**
766+
* To be called when connection is disconnected to gracefully close ongoing receive request.
767+
* @param connectionError - The connection error if any.
768+
*/
769+
async onDetached(connectionError: AmqpError | Error): Promise<void> {
770+
logger.error(
771+
translateServiceBusError(connectionError),
772+
`${this.logPrefix} onDetached: closing link (session receiver will not reconnect)`
773+
);
774+
try {
775+
// Notifying so that the streaming receiver knows about the error
776+
this._notifyError({
777+
entityPath: this.entityPath,
778+
fullyQualifiedNamespace: this._context.config.host,
779+
error: translateServiceBusError(connectionError),
780+
errorSource: "receive"
781+
});
782+
} catch (error) {
783+
logger.error(
784+
translateServiceBusError(error),
785+
`${
786+
this.logPrefix
787+
} onDetached: unexpected error seen when tried calling "_notifyError" with ${translateServiceBusError(
788+
connectionError
789+
)}`
790+
);
791+
}
792+
await this.close(connectionError);
793+
}
794+
765795
/**
766796
* Settles the message with the specified disposition.
767797
* @param message - The ServiceBus Message that needs to be settled.

0 commit comments

Comments
 (0)