Skip to content

Commit b3db04d

Browse files
authored
[event-hubs] fixes sendBatch race condition causing TypeError to be thrown (Azure#15021)
* [event-hubs] fixes sendBatch race condition causing TypeError to be thrown * [event-hubs] add changelog entry for 14606 bug fix * [event-hubs] rename EventHubSender _createLinkIfNotOpen -> _getLink * [event-hubs] cleanup EventHubSender _init if/else statements * [event-hubs] centralize setting of this._sender link
1 parent 958f2fa commit b3db04d

File tree

5 files changed

+102
-38
lines changed

5 files changed

+102
-38
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 5.5.1 (Unreleased)
44

5+
- Fixes issue [#14606](https://github.com/Azure/azure-sdk-for-js/issues/14606) where the `EventHubConsumerClient` could call subscribe's `processError` callback with a "Too much pending tasks" error. This could occur if the consumer was unable to connect to the service for an extended period of time.
6+
7+
- Fixes issue [#15002](https://github.com/Azure/azure-sdk-for-js/issues/15002) where in rare cases an unexpected `TypeError` could be thrown from `EventHubProducerClient.sendBatch` when the connection was disconnected while sending events was in progress.
58

69
## 5.5.0 (2021-04-06)
710

sdk/eventhub/event-hubs/src/eventHubReceiver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ export class EventHubReceiver extends LinkEntity {
586586
// store the underlying link in a cache
587587
this._context.receivers[this.name] = this;
588588

589-
await this._ensureTokenRenewal();
589+
this._ensureTokenRenewal();
590590
} else {
591591
logger.verbose(
592592
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +

sdk/eventhub/event-hubs/src/eventHubSender.ts

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ export class EventHubSender extends LinkEntity {
193193
* @returns boolean
194194
*/
195195
isOpen(): boolean {
196-
const result: boolean = this._sender! && this._sender!.isOpen();
196+
const result = Boolean(this._sender && this._sender.isOpen());
197197
logger.verbose(
198198
"[%s] Sender '%s' with address '%s' is open? -> %s",
199199
this._context.connectionId,
@@ -216,9 +216,9 @@ export class EventHubSender extends LinkEntity {
216216
abortSignal?: AbortSignalLike;
217217
} = {}
218218
): Promise<number> {
219-
await this._createLinkIfNotOpen(options);
219+
const sender = await this._getLink(options);
220220

221-
return this._sender!.maxMessageSize;
221+
return sender.maxMessageSize;
222222
}
223223

224224
/**
@@ -339,21 +339,20 @@ export class EventHubSender extends LinkEntity {
339339
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
340340
retryOptions.timeoutInMs = timeoutInMs;
341341

342-
const initStartTime = Date.now();
343-
await this._createLinkIfNotOpen(options);
344-
const timeTakenByInit = Date.now() - initStartTime;
345-
346342
const sendEventPromise = async (): Promise<void> => {
343+
const initStartTime = Date.now();
344+
const sender = await this._getLink(options);
345+
const timeTakenByInit = Date.now() - initStartTime;
347346
logger.verbose(
348347
"[%s] Sender '%s', credit: %d available: %d",
349348
this._context.connectionId,
350349
this.name,
351-
this._sender!.credit,
352-
this._sender!.session.outgoing.available()
350+
sender.credit,
351+
sender.session.outgoing.available()
353352
);
354353

355354
let waitTimeForSendable = 1000;
356-
if (!this._sender!.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
355+
if (!sender.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
357356
logger.verbose(
358357
"%s Sender '%s', waiting for 1 second for sender to become sendable",
359358
this._context.connectionId,
@@ -366,14 +365,14 @@ export class EventHubSender extends LinkEntity {
366365
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
367366
this._context.connectionId,
368367
this.name,
369-
this._sender!.credit,
370-
this._sender!.session?.outgoing?.available()
368+
sender.credit,
369+
sender.session?.outgoing?.available()
371370
);
372371
} else {
373372
waitTimeForSendable = 0;
374373
}
375374

376-
if (!this._sender!.sendable()) {
375+
if (!sender.sendable()) {
377376
// let us retry to send the message after some time.
378377
const msg =
379378
`[${this._context.connectionId}] Sender "${this.name}", ` +
@@ -404,10 +403,9 @@ export class EventHubSender extends LinkEntity {
404403
throw translate(e);
405404
}
406405

407-
this._sender!.sendTimeoutInSeconds =
408-
(timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
406+
sender.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
409407
try {
410-
const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700, {
408+
const delivery = await sender.send(rheaMessage, undefined, 0x80013700, {
411409
abortSignal
412410
});
413411
logger.info(
@@ -444,22 +442,22 @@ export class EventHubSender extends LinkEntity {
444442
}
445443
}
446444

447-
private async _createLinkIfNotOpen(
445+
private async _getLink(
448446
options: {
449447
retryOptions?: RetryOptions;
450448
abortSignal?: AbortSignalLike;
451449
} = {}
452-
): Promise<void> {
453-
if (this.isOpen()) {
454-
return;
450+
): Promise<AwaitableSender> {
451+
if (this.isOpen() && this._sender) {
452+
return this._sender;
455453
}
456454
const retryOptions = options.retryOptions || {};
457455
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
458456
retryOptions.timeoutInMs = timeoutInMs;
459457
const senderOptions = this._createSenderOptions(timeoutInMs);
460458

461459
const startTime = Date.now();
462-
const createLinkPromise = async (): Promise<void> => {
460+
const createLinkPromise = async (): Promise<AwaitableSender> => {
463461
return defaultCancellableLock.acquire(
464462
this.senderLock,
465463
() => {
@@ -475,7 +473,7 @@ export class EventHubSender extends LinkEntity {
475473
);
476474
};
477475

478-
const config: RetryConfig<void> = {
476+
const config: RetryConfig<AwaitableSender> = {
479477
operation: createLinkPromise,
480478
connectionId: this._context.connectionId,
481479
operationType: RetryOperationType.senderLink,
@@ -484,7 +482,7 @@ export class EventHubSender extends LinkEntity {
484482
};
485483

486484
try {
487-
await retry<void>(config);
485+
return await retry<AwaitableSender>(config);
488486
} catch (err) {
489487
const translatedError = translate(err);
490488
logger.warning(
@@ -500,18 +498,17 @@ export class EventHubSender extends LinkEntity {
500498

501499
/**
502500
* Initializes the sender session on the connection.
501+
* Should only be called from _createLinkIfNotOpen
503502
* @hidden
504503
*/
505504
private async _init(
506505
options: AwaitableSenderOptions & {
507506
abortSignal: AbortSignalLike | undefined;
508507
timeoutInMs: number;
509508
}
510-
): Promise<void> {
509+
): Promise<AwaitableSender> {
511510
try {
512-
if (!this.isOpen() && !this.isConnecting) {
513-
this.isConnecting = true;
514-
511+
if (!this.isOpen() || !this._sender) {
515512
// Wait for the connectionContext to be ready to open the link.
516513
await this._context.readyToOpenLink();
517514
await this._negotiateClaim({
@@ -526,33 +523,32 @@ export class EventHubSender extends LinkEntity {
526523
this.name
527524
);
528525

529-
this._sender = await this._context.connection.createAwaitableSender(options);
530-
this.isConnecting = false;
526+
const sender = await this._context.connection.createAwaitableSender(options);
527+
this._sender = sender;
531528
logger.verbose(
532529
"[%s] Sender '%s' created with sender options: %O",
533530
this._context.connectionId,
534531
this.name,
535532
options
536533
);
537-
this._sender.setMaxListeners(1000);
534+
sender.setMaxListeners(1000);
538535

539536
// It is possible for someone to close the sender and then start it again.
540537
// Thus make sure that the sender is present in the client cache.
541538
if (!this._context.senders[this.name]) this._context.senders[this.name] = this;
542-
await this._ensureTokenRenewal();
539+
this._ensureTokenRenewal();
540+
return sender;
543541
} else {
544542
logger.verbose(
545-
"[%s] The sender '%s' with address '%s' is open -> %s and is connecting " +
546-
"-> %s. Hence not reconnecting.",
543+
"[%s] The sender '%s' with address '%s' is open -> %s. Hence not reconnecting.",
547544
this._context.connectionId,
548545
this.name,
549546
this.address,
550-
this.isOpen(),
551-
this.isConnecting
547+
this.isOpen()
552548
);
549+
return this._sender;
553550
}
554551
} catch (err) {
555-
this.isConnecting = false;
556552
const translatedError = translate(err);
557553
logger.warning(
558554
"[%s] An error occurred while creating the sender %s: %s",

sdk/eventhub/event-hubs/src/linkEntity.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export class LinkEntity {
215215
* Ensures that the token is renewed within the predefined renewal margin.
216216
* @hidden
217217
*/
218-
protected async _ensureTokenRenewal(): Promise<void> {
218+
protected _ensureTokenRenewal(): void {
219219
if (!this._tokenTimeoutInMs) {
220220
return;
221221
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import chai from "chai";
5+
const should = chai.should();
6+
import chaiAsPromised from "chai-as-promised";
7+
chai.use(chaiAsPromised);
8+
import { EnvVarKeys, getEnvVars } from "../../public/utils/testUtils";
9+
import { EventHubSender } from "../../../src/eventHubSender";
10+
import { createConnectionContext } from "../../../src/connectionContext";
11+
import { stub } from "sinon";
12+
import { MessagingError } from "@azure/core-amqp";
13+
const env = getEnvVars();
14+
15+
describe("disconnected", function() {
16+
const service = {
17+
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
18+
path: env[EnvVarKeys.EVENTHUB_NAME]
19+
};
20+
before("validate environment", function(): void {
21+
should.exist(
22+
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
23+
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
24+
);
25+
should.exist(
26+
env[EnvVarKeys.EVENTHUB_NAME],
27+
"define EVENTHUB_NAME in your environment before running integration tests."
28+
);
29+
});
30+
31+
describe("EventHubSender", function() {
32+
/**
33+
* Test added for issue https://github.com/Azure/azure-sdk-for-js/issues/15002
34+
* Prior to fixing this issue, a TypeError would be thrown when this test was ran.
35+
*/
36+
it("send works after disconnect", async () => {
37+
const context = createConnectionContext(service.connectionString, service.path);
38+
const sender = EventHubSender.create(context);
39+
40+
// Create the sender link via getMaxMessageSize() so we can check when 'send' is about to be called on it.
41+
await sender.getMaxMessageSize();
42+
should.equal(sender.isOpen(), true, "Expected sender to be open.");
43+
44+
// Here we stub out the 'send' call on the AwaitableSender.
45+
// We do 2 things:
46+
// 1. Call `idle()` on the underlying rhea connection so that a disconnect is triggered.
47+
// 2. Reject with a MessagingError.
48+
// The MessagingError is thrown so that the send operation will be retried.
49+
// The disconnect that's triggered will cause the existing AwaitableSender to be closed.
50+
51+
// If everything works as expected, then a new AwaitableSender should be created on the next
52+
// retry attempt and the event should be successfully sent.
53+
const senderLink = sender["_sender"]!;
54+
const sendStub = stub(senderLink, "send");
55+
sendStub.callsFake(async () => {
56+
context.connection["_connection"].idle();
57+
throw new MessagingError("Fake rejection!");
58+
});
59+
60+
await sender.send([{ body: "foo" }]);
61+
62+
await context.close();
63+
});
64+
});
65+
});

0 commit comments

Comments
 (0)