Skip to content

Commit e2b5a62

Browse files
[service-bus] Reliability improvements and testing updates (Azure#15098)
This PR has a few changes in it, primarily to improve our robustness and our reporting: General reliability improvements: - Migrates to a workflow that treats subscription start as a retryable entity, rather than just link creation (which is what had previously). - It checks and throws exceptions on much more granular conditions, particularly in addCredit - Error checking and handling has been migrated to be in far fewer spots and to be more unconditional, which should hopefully eliminate any areas where an exception or error could occur but it never gets forwarded or seen. SDK debugging: - Adds a new SDK only flag (forwardInternalErrors) which will make it so areas that used to eat errors now can forward them to processError. Prior to this the errors were only logged, but that meant they could be missed. Most of these would probably be considered cosmetic by customers so this is primarly for debugging purposes within the SDK itself. - The internal `processInitialize` handler has been split into two (still internal) handlers - preInitialize and postInitialize. preInitialize runs before init(), and postInitialize runs after init() but before addCredit. This lets us write more reliable tests. These are not exposed to customers. Fixes Azure#14535
1 parent 9e856ac commit e2b5a62

23 files changed

+1263
-1070
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 7.1.0-beta.1 (Unreleased)
3+
## 7.0.6 (Unreleased)
44

55
### New Features
66

@@ -14,7 +14,15 @@
1414
- [Bug Fix] `expiresAtUtc` is `Invalid Date` in the received message when the ttl is not defined. Has been fixed in [#13543](https://github.com/Azure/azure-sdk-for-js/pull/13543)
1515
- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).
1616
- Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures.
17-
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files)
17+
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867)
18+
- Fixes an issue where receiver link recovery/creation could fail, resulting in a receiver that was no longer receiving messages.
19+
[PR#15098](https://github.com/Azure/azure-sdk-for-js/pull/15098)
20+
21+
## 7.0.5 (2021-04-06)
22+
23+
### Bug fixes
24+
25+
- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug with regards to the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).
1826

1927
## 7.0.4 (2021-03-31)
2028

sdk/servicebus/service-bus/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@azure/service-bus",
33
"sdk-type": "client",
44
"author": "Microsoft Corporation",
5-
"version": "7.1.0-beta.1",
5+
"version": "7.0.6",
66
"license": "MIT",
77
"description": "Azure Service Bus SDK for JavaScript",
88
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/servicebus/service-bus/",
@@ -73,7 +73,7 @@
7373
"test:node": "npm run clean && npm run build:test:node && npm run integration-test:node",
7474
"test": "npm run test:node && npm run test:browser",
7575
"unit-test:browser": "echo skipped",
76-
"unit-test:node": "mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"dist-esm/test/internal/unit/*.spec.js\" \"dist-esm/test/internal/node/*.spec.js\"",
76+
"unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"",
7777
"unit-test": "npm run unit-test:node && npm run unit-test:browser",
7878
"docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src"
7979
},

sdk/servicebus/service-bus/src/core/messageReceiver.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,6 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
118118
number,
119119
DeferredPromiseAndTimer
120120
>();
121-
/**
122-
* The message handler provided by the user that will be wrapped
123-
* inside _onAmqpMessage.
124-
*/
125-
protected _onMessage!: OnMessage;
126-
/**
127-
* The error handler provided by the user that will be wrapped
128-
* inside _onAmqpError.
129-
*/
130-
protected _onError?: OnError;
131121

132122
/**
133123
* A lock renewer that handles message lock auto-renewal. This is undefined unless the user

sdk/servicebus/service-bus/src/core/receiverHelper.ts

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4+
import { AbortError } from "@azure/abort-controller";
45
import { Receiver, ReceiverEvents } from "rhea-promise";
56
import { receiverLogger as logger } from "../log";
7+
import { ServiceBusError } from "../serviceBusError";
68

79
/**
810
* Wraps the receiver with some higher level operations for managing state
@@ -11,37 +13,61 @@ import { receiverLogger as logger } from "../log";
1113
* @internal
1214
*/
1315
export class ReceiverHelper {
14-
private _isSuspended: boolean = false;
16+
private _isSuspended: boolean = true;
1517

1618
constructor(
1719
private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string }
1820
) {}
1921

22+
private _getCurrentReceiverOrError():
23+
| "is undefined"
24+
| "is not open"
25+
| "is suspended"
26+
| { receiver: Receiver | undefined; logPrefix: string } {
27+
const currentReceiverData = this._getCurrentReceiver();
28+
29+
if (currentReceiverData.receiver == null) {
30+
return "is undefined";
31+
}
32+
33+
if (!currentReceiverData.receiver.isOpen()) {
34+
return "is not open";
35+
}
36+
37+
if (this._isSuspended) {
38+
return "is suspended";
39+
}
40+
41+
return currentReceiverData;
42+
}
43+
2044
/**
2145
* Adds credits to the receiver, respecting any state that
2246
* indicates the receiver is closed or should not continue
2347
* to receive more messages.
2448
*
2549
* @param credits - Number of credits to add.
26-
* @returns true if credits were added, false if there is no current receiver instance
2750
* or `stopReceivingMessages` has been called.
2851
*/
29-
addCredit(credits: number): boolean {
30-
const { receiver, logPrefix } = this._getCurrentReceiver();
52+
addCredit(credits: number): void {
53+
const currentReceiverOrError = this._getCurrentReceiverOrError();
3154

32-
if (!this.canReceiveMessages()) {
33-
logger.verbose(
34-
`${logPrefix} Asked to add ${credits} credits but the receiver is not able to receive messages`
35-
);
36-
return false;
37-
}
55+
if (typeof currentReceiverOrError === "string") {
56+
const errorMessage = `Cannot request messages on the receiver since it ${currentReceiverOrError}.`;
3857

39-
if (receiver != null) {
40-
logger.verbose(`${logPrefix} Adding ${credits} credits`);
41-
receiver.addCredit(credits);
58+
if (currentReceiverOrError === "is suspended") {
59+
// if a user has suspended the receiver we should consider this a non-retryable
60+
// error since it absolutely requires user intervention.
61+
throw new AbortError(errorMessage);
62+
}
63+
64+
throw new ServiceBusError(errorMessage, "GeneralError");
4265
}
4366

44-
return true;
67+
if (currentReceiverOrError.receiver != null) {
68+
logger.verbose(`${currentReceiverOrError.logPrefix} Adding ${credits} credits`);
69+
currentReceiverOrError.receiver.addCredit(credits);
70+
}
4571
}
4672

4773
/**
@@ -60,36 +86,38 @@ export class ReceiverHelper {
6086
logger.verbose(
6187
`${logPrefix} User has requested to stop receiving new messages, attempting to drain.`
6288
);
89+
6390
return this.drain();
6491
}
6592

6693
/**
67-
* Resets tracking so `addCredit` works again.
94+
* Resets tracking so `addCredit` works again by toggling the `_isSuspended` flag.
6895
*/
6996
resume(): void {
7097
this._isSuspended = false;
7198
}
7299

73-
/**
74-
* Whether the receiver can receive messages.
75-
*
76-
* This checks if the the caller has decided to disable adding
77-
* credits via 'suspend' as well as whether the receiver itself is
78-
* still open.
79-
*/
80-
canReceiveMessages(): boolean {
81-
const { receiver } = this._getCurrentReceiver();
82-
return !this._isSuspended && this._isValidReceiver(receiver);
100+
isSuspended(): boolean {
101+
return this._isSuspended;
83102
}
84103

85104
/**
86105
* Initiates a drain for the current receiver and resolves when
87106
* the drain has completed.
107+
*
108+
* NOTE: This method returns immediately if the receiver is not valid or if there
109+
* are no pending credits on the receiver (ie: `receiver.credit === 0`).
88110
*/
89111
async drain(): Promise<void> {
90112
const { receiver, logPrefix } = this._getCurrentReceiver();
91113

92114
if (!this._isValidReceiver(receiver)) {
115+
// TODO: should we throw?
116+
return;
117+
}
118+
119+
if (receiver.credit === 0) {
120+
// nothing to drain
93121
return;
94122
}
95123

0 commit comments

Comments
 (0)