Skip to content

Commit dee392e

Browse files
authored
[ServiceBus] add drainCredit timeout when suspending receiver (Azure#25369)
There are cases where the service doesn't respond to our drain-credit request, or the response comes back after a long and unexpected time. Since we are blocking on the `receiver_drained` event before resolving, application code could be blocked indefinitely when suspending/closing. This PR adds a timeout around credit draining, and if the time limit expires we close the receiver then returns.
1 parent 5847ad4 commit dee392e

File tree

4 files changed

+46
-2
lines changed

4 files changed

+46
-2
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
- Fix an issue of over-adding credits when receiving messages in a batch [PR #25185](https://github.com/Azure/azure-sdk-for-js/pull/25185)
1212
- Fix a race condition in initializing management links [PR #25279](https://github.com/Azure/azure-sdk-for-js/pull/25279)
1313
- `Uint8Array` payload is converted into JSON before being sent. This PR fixes it so that `Uint8Array` is being treated the same as a Buffer.
14+
- Fix an issue where closing receiver could be blocked indefinitely when we don't receive a drain credit response.
1415

1516
### Other Changes
1617

sdk/servicebus/service-bus/src/core/receiverHelper.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { AbortError } from "@azure/abort-controller";
55
import { Receiver, ReceiverEvents } from "rhea-promise";
66
import { receiverLogger as logger } from "../log";
77
import { ServiceBusError } from "../serviceBusError";
8+
import { receiveDrainTimeoutInMs } from "../util/constants";
89

910
/**
1011
* Wraps the receiver with some higher level operations for managing state
@@ -126,12 +127,19 @@ export class ReceiverHelper {
126127
);
127128

128129
const drainPromise = new Promise<void>((resolve) => {
130+
const timer = setTimeout(async () => {
131+
logger.warning(`${logPrefix} Time out when draining credits in suspend().`);
132+
// Close the receiver link since we have not received the receiver_drained event
133+
// to prevent out-of-sync link state between local and remote
134+
await receiver?.close();
135+
resolve();
136+
}, receiveDrainTimeoutInMs);
129137
receiver.once(ReceiverEvents.receiverDrained, () => {
130138
logger.verbose(`${logPrefix} Receiver has been drained.`);
131139
receiver.drain = false;
140+
clearTimeout(timer);
132141
resolve();
133142
});
134-
135143
receiver.drainCredit();
136144
});
137145

sdk/servicebus/service-bus/src/util/constants.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ export const packageJsonInfo = {
1414
*/
1515
export const messageDispositionTimeout = 20000;
1616

17+
/**
18+
* The amount of time in milliseconds that a receiver
19+
* will wait while draining credits before returning
20+
* @internal
21+
*/
22+
export const receiveDrainTimeoutInMs = 200;
23+
1724
/**
1825
* @internal
1926
*/

sdk/servicebus/service-bus/test/internal/unit/receiverHelper.spec.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import chai from "chai";
55
import chaiAsPromised from "chai-as-promised";
6-
import { Receiver, ReceiverEvents } from "rhea-promise";
6+
import { Receiver, ReceiverEvents, delay } from "rhea-promise";
77
import { ReceiverHelper } from "../../../src/core/receiverHelper";
88
import { assertThrows } from "../../public/utils/testUtils";
99
import { createRheaReceiverForTests } from "./unittestUtils";
@@ -127,4 +127,32 @@ describe("ReceiverHelper unit tests", () => {
127127
helper.addCredit(101);
128128
assert.equal(receiver.credit, 101);
129129
});
130+
131+
it("resolves from suspend() when drain is blocking ", async () => {
132+
const receiver = createRheaReceiverForTests();
133+
const helper = new ReceiverHelper(() => ({ receiver, logPrefix: "hello" }));
134+
135+
(receiver as any)["_link"]["drain_credit"] = () => {
136+
(receiver as any).credit = 0;
137+
// not emitting the `receiverDrained` event
138+
};
139+
let drainWasCalled = false;
140+
141+
receiver.on(ReceiverEvents.receiverDrained, () => {
142+
drainWasCalled = true;
143+
});
144+
145+
// we can explicitly drain
146+
helper.resume();
147+
helper.addCredit(101);
148+
149+
await Promise.race([
150+
helper.drain(),
151+
delay(2000).then(() => {
152+
throw new Error("Test failed. helper.drain() should have already resolved.");
153+
}),
154+
]);
155+
156+
assert.isFalse(drainWasCalled);
157+
});
130158
});

0 commit comments

Comments
 (0)