Skip to content

Commit d75f119

Browse files
[service-bus] Fix message loss issues with peekLock and receiveAndDelete (Azure#15989)
Fixing an issue where we could lose messages or provoke an alarming message from rhea (`Received transfer when credit was 0`) The message loss issue is related to how we trigger 'drain' using 'addCredit(1)'. Our 'receiver.drain; receiver.addCredit(1)' pattern actually does add a credit, which shows up in the flow frame that gets sent for our drain. This has led to occasionally receiving more messages than we intended. The second part of this was that we were masking this error because we had code that specifically threw out messages if more arrived than were requested. If the message was being auto-renewed it's possible for the message to appear to be missing, and if we were in receiveAndDelete the message is effectively lost at that point. That code is now removed (we defer to just allowing the extrra message, should a bug arise that causes that) and we log an error indicating it did happen. The rhea error message appeared to be triggered by our accidentally allowing multiple overlapping 'drain's to occur (finalAction did not check to see if we were _already_ draining and would allow it to happen multiple times). Removing the concurrent drains fixed this issue but I didn't fully investigate why. Fixes Azure#15606, Azure#15115
1 parent cec69b6 commit d75f119

File tree

17 files changed

+735
-278
lines changed

17 files changed

+735
-278
lines changed

common/config/rush/pnpm-lock.yaml

Lines changed: 16 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
# Release History
22

3-
## 3.0.1 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 3.1.0 (Unreleased)
84

95
### Key Bugs Fixed
106

11-
### Fixed
12-
7+
- Updated to use the latest version of the `rhea` package.
8+
Part of a fix for PR#15989, where draining messages could sometimes lead to message loss with `receiver.receiveMessages()`.
9+
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)
1310

1411
## 3.0.0 (2021-06-09)
1512

sdk/core/core-amqp/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/core-amqp",
33
"sdk-type": "client",
4-
"version": "3.0.1",
4+
"version": "3.1.0",
55
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",
@@ -76,8 +76,8 @@
7676
"events": "^3.0.0",
7777
"jssha": "^3.1.0",
7878
"process": "^0.11.10",
79-
"rhea": "^2.0.2",
80-
"rhea-promise": "^2.0.0",
79+
"rhea": "^2.0.3",
80+
"rhea-promise": "^2.1.0",
8181
"tslib": "^2.2.0",
8282
"url": "^0.11.0",
8383
"util": "^0.12.1"

sdk/eventhub/event-hubs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
"is-buffer": "^2.0.3",
116116
"jssha": "^3.1.0",
117117
"process": "^0.11.10",
118-
"rhea-promise": "^2.0.0",
118+
"rhea-promise": "^2.1.0",
119119
"tslib": "^2.2.0",
120120
"uuid": "^8.3.0"
121121
},

sdk/eventhub/mock-hub/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
"dependencies": {
6363
"@azure/abort-controller": "^1.0.0",
6464
"@azure/core-asynciterator-polyfill": "^1.0.0",
65-
"rhea": "^2.0.2",
65+
"rhea": "^2.0.3",
6666
"tslib": "^2.2.0"
6767
},
6868
"//sampleConfiguration": {

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# Release History
22

3-
## 7.3.0 (Unreleased)
4-
3+
## 7.3.0 (2021-07-06)
54
### Features Added
6-
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
7-
8-
### Breaking Changes
95

6+
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
107
### Key Bugs Fixed
118

9+
- Fixed a bug that could lead to message loss in certain conditions when using `receiver.receiveMessages()`.
10+
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)
11+
1212
### Fixed
1313

1414
- Fixing an issue where the internal link cache would not properly remove closed links.

sdk/servicebus/service-bus/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
"long": "^4.0.0",
127127
"process": "^0.11.10",
128128
"tslib": "^2.2.0",
129-
"rhea-promise": "^2.0.0"
129+
"rhea-promise": "^2.1.0"
130130
},
131131
"devDependencies": {
132132
"@azure/dev-tool": "^1.0.0",

sdk/servicebus/service-bus/src/core/batchingReceiver.ts

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
OnAmqpEvent,
99
ReceiverEvents,
1010
SessionEvents,
11-
Receiver,
11+
Receiver as RheaPromiseReceiver,
1212
Session
1313
} from "rhea-promise";
1414
import { ServiceBusMessageImpl } from "../serviceBusMessage";
@@ -191,16 +191,22 @@ export function getRemainingWaitTimeInMsFn(
191191
*
192192
* @internal
193193
*/
194-
type EventEmitterLike<T extends Receiver | Session> = Pick<T, "once" | "removeListener" | "on">;
194+
type EventEmitterLike<T extends RheaPromiseReceiver | Session> = Pick<
195+
T,
196+
"once" | "removeListener" | "on"
197+
>;
195198

196199
/**
197200
* The bare minimum needed to receive messages for batched
198201
* message receiving.
199202
*
200203
* @internal
201204
*/
202-
export type MinimalReceiver = Pick<Receiver, "name" | "isOpen" | "credit" | "addCredit" | "drain"> &
203-
EventEmitterLike<Receiver> & {
205+
export type MinimalReceiver = Pick<
206+
RheaPromiseReceiver,
207+
"name" | "isOpen" | "credit" | "addCredit" | "drain" | "drainCredit"
208+
> &
209+
EventEmitterLike<RheaPromiseReceiver> & {
204210
session: EventEmitterLike<Session>;
205211
} & {
206212
connection: {
@@ -269,6 +275,7 @@ export class BatchingReceiverLite {
269275

270276
private _getRemainingWaitTimeInMsFn: typeof getRemainingWaitTimeInMsFn;
271277
private _closeHandler: ((connectionError?: AmqpError | Error) => void) | undefined;
278+
private _finalAction: (() => void) | undefined;
272279

273280
isReceivingMessages: boolean;
274281

@@ -389,16 +396,17 @@ export class BatchingReceiverLite {
389396
// - maxMessageCount is reached or
390397
// - maxWaitTime is passed or
391398
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
392-
const finalAction = (): void => {
399+
this._finalAction = (): void => {
400+
if (receiver.drain) {
401+
// If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
402+
// arrive before the drain completes.
403+
return;
404+
}
405+
393406
// Drain any pending credits.
394407
if (receiver.isOpen() && receiver.credit > 0) {
395408
logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`);
396-
397-
// setting .drain and combining it with .addCredit results in (eventually) sending
398-
// a drain request to Service Bus. When the drain completes rhea will call `onReceiveDrain`
399-
// at which point we'll wrap everything up and resolve the promise.
400-
receiver.drain = true;
401-
receiver.addCredit(1);
409+
receiver.drainCredit();
402410
} else {
403411
logger.verbose(
404412
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`
@@ -429,15 +437,24 @@ export class BatchingReceiverLite {
429437
logger.verbose(
430438
`${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.`
431439
);
432-
finalAction();
440+
this._finalAction!();
433441
}, remainingWaitTimeInMs);
434442
}
435443
}
436444

437445
try {
438446
const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
439-
if (brokeredMessages.length < args.maxMessageCount) {
440-
brokeredMessages.push(data);
447+
brokeredMessages.push(data);
448+
449+
// NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
450+
// extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
451+
// silently dropped on the floor.
452+
if (brokeredMessages.length > args.maxMessageCount) {
453+
logger.warning(
454+
`More messages arrived than were expected: ${
455+
args.maxMessageCount
456+
} vs ${brokeredMessages.length + 1}`
457+
);
441458
}
442459
} catch (err) {
443460
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
@@ -448,7 +465,7 @@ export class BatchingReceiverLite {
448465
reject(errObj);
449466
}
450467
if (brokeredMessages.length === args.maxMessageCount) {
451-
finalAction();
468+
this._finalAction!();
452469
}
453470
};
454471

@@ -515,7 +532,7 @@ export class BatchingReceiverLite {
515532
logger.verbose(
516533
`${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.`
517534
);
518-
finalAction();
535+
this._finalAction!();
519536
}, args.maxWaitTimeInMs);
520537

521538
receiver.on(ReceiverEvents.message, onReceiveMessage);

sdk/servicebus/service-bus/src/core/receiverHelper.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,7 @@ export class ReceiverHelper {
132132
resolve();
133133
});
134134

135-
receiver.drain = true;
136-
// this is not actually adding another credit - it'll just
137-
// cause the drain call to start.
138-
receiver.addCredit(1);
135+
receiver.drainCredit();
139136
});
140137

141138
return drainPromise;

sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,24 +1388,13 @@ function causeDisconnectDuringDrain(
13881388
throw new Error("No active link for batching receiver");
13891389
}
13901390

1391-
const origAddCredit = link.addCredit;
1392-
1393-
// We want to simulate a disconnect once the batching receiver is draining.
1394-
// We can detect when the receiver enters a draining state when `addCredit` is
1395-
// called while didRequestDrainResolver is called to resolve the promise.
1396-
const addCreditThatImmediatelyDetaches = function(credits: number): void {
1397-
origAddCredit.call(link, credits);
1398-
1399-
if (link.drain && credits === 1) {
1400-
// initiate the detach now (prior to any possibilty of the 'drain' call being scheduled)
1401-
batchingReceiver
1402-
.onDetached(new Error("Test: fake connection failure"))
1403-
.then(() => resolveOnDetachedCallPromise());
1404-
}
1391+
link["drainCredit"] = () => {
1392+
// don't send the drain request, we'll just detach.
1393+
batchingReceiver
1394+
.onDetached(new Error("Test: fake connection failure"))
1395+
.then(() => resolveOnDetachedCallPromise());
14051396
};
14061397

1407-
link["addCredit"] = addCreditThatImmediatelyDetaches;
1408-
14091398
return {
14101399
onDetachedCalledPromise
14111400
};

0 commit comments

Comments
 (0)