Skip to content

Commit acaccac

Browse files
authored
[event-hubs] fix race condition preventing connection recovery during connection disconnect (Azure#15042)
* [event-hubs] add tests for disconnect and entity close * add changelog * ensure waitForConnectionRefresh is always cleared
1 parent ce9b975 commit acaccac

File tree

4 files changed

+228
-73
lines changed

4 files changed

+228
-73
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 5.5.1 (Unreleased)
44

5+
- Fixes a race condition that would cause connection recovery to sometimes fail if a consumer or producer was closed at the same time a connection was disconnected.
6+
57
- Fixes issue [#14606](https://github.com/Azure/azure-sdk-for-js/issues/14606) where the `EventHubConsumerClient` could call subscribe's `processError` callback with a "Too much pending tasks" error. This could occur if the consumer was unable to connect to the service for an extended period of time.
68

79
- Fixes issue [#15002](https://github.com/Azure/azure-sdk-for-js/issues/15002) where in rare cases an unexpected `TypeError` could be thrown from `EventHubProducerClient.sendBatch` when the connection was disconnected while sending events was in progress.

sdk/eventhub/event-hubs/src/connectionContext.ts

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -236,17 +236,25 @@ export namespace ConnectionContext {
236236
try {
237237
if (this.connection.isOpen()) {
238238
// Close all the senders.
239-
for (const senderName of Object.keys(this.senders)) {
240-
await this.senders[senderName].close();
241-
}
239+
await Promise.all(
240+
Object.keys(connectionContext.senders).map((name) =>
241+
connectionContext.senders[name]?.close().catch(() => {
242+
/* error already logged, swallow it here */
243+
})
244+
)
245+
);
242246
// Close all the receivers.
243-
for (const receiverName of Object.keys(this.receivers)) {
244-
await this.receivers[receiverName].close();
245-
}
247+
await Promise.all(
248+
Object.keys(connectionContext.receivers).map((name) =>
249+
connectionContext.receivers[name]?.close().catch(() => {
250+
/* error already logged, swallow it here */
251+
})
252+
)
253+
);
246254
// Close the cbs session;
247255
await this.cbsSession.close();
248256
// Close the management session
249-
await this.managementSession!.close();
257+
await this.managementSession?.close();
250258
await this.connection.close();
251259
this.wasConnectionCloseCalled = true;
252260
logger.info("Closed the amqp connection '%s' on the client.", this.connectionId);
@@ -281,73 +289,90 @@ export namespace ConnectionContext {
281289
waitForConnectionRefreshPromise = new Promise((resolve) => {
282290
waitForConnectionRefreshResolve = resolve;
283291
});
284-
285-
logger.verbose(
286-
"[%s] 'disconnected' event occurred on the amqp connection.",
287-
connectionContext.connection.id
288-
);
289-
290-
if (context.connection && context.connection.error) {
292+
try {
291293
logger.verbose(
292-
"[%s] Accompanying error on the context.connection: %O",
293-
connectionContext.connection.id,
294-
context.connection && context.connection.error
294+
"[%s] 'disconnected' event occurred on the amqp connection.",
295+
connectionContext.connection.id
295296
);
296-
}
297-
if (context.error) {
297+
298+
if (context.connection && context.connection.error) {
299+
logger.verbose(
300+
"[%s] Accompanying error on the context.connection: %O",
301+
connectionContext.connection.id,
302+
context.connection && context.connection.error
303+
);
304+
}
305+
if (context.error) {
306+
logger.verbose(
307+
"[%s] Accompanying error on the context: %O",
308+
connectionContext.connection.id,
309+
context.error
310+
);
311+
}
312+
const state: Readonly<{
313+
wasConnectionCloseCalled: boolean;
314+
numSenders: number;
315+
numReceivers: number;
316+
}> = {
317+
wasConnectionCloseCalled: connectionContext.wasConnectionCloseCalled,
318+
numSenders: Object.keys(connectionContext.senders).length,
319+
numReceivers: Object.keys(connectionContext.receivers).length
320+
};
298321
logger.verbose(
299-
"[%s] Accompanying error on the context: %O",
322+
"[%s] Closing all open senders and receivers in the state: %O",
300323
connectionContext.connection.id,
301-
context.error
324+
state
302325
);
303-
}
304-
const state: Readonly<{
305-
wasConnectionCloseCalled: boolean;
306-
numSenders: number;
307-
numReceivers: number;
308-
}> = {
309-
wasConnectionCloseCalled: connectionContext.wasConnectionCloseCalled,
310-
numSenders: Object.keys(connectionContext.senders).length,
311-
numReceivers: Object.keys(connectionContext.receivers).length
312-
};
313-
logger.verbose(
314-
"[%s] Closing all open senders and receivers in the state: %O",
315-
connectionContext.connection.id,
316-
state
317-
);
318326

319-
// Clear internal map maintained by rhea to avoid reconnecting of old links once the
320-
// connection is back up.
321-
connectionContext.connection.removeAllSessions();
327+
// Clear internal map maintained by rhea to avoid reconnecting of old links once the
328+
// connection is back up.
329+
connectionContext.connection.removeAllSessions();
322330

323-
// Close the cbs session to ensure all the event handlers are released.
324-
await connectionContext.cbsSession.close().catch(() => {
325-
/* error already logged, swallow it here */
326-
});
327-
// Close the management session to ensure all the event handlers are released.
328-
await connectionContext.managementSession!.close().catch(() => {
329-
/* error already logged, swallow it here */
330-
});
331+
// Close the cbs session to ensure all the event handlers are released.
332+
await connectionContext.cbsSession?.close().catch(() => {
333+
/* error already logged, swallow it here */
334+
});
335+
// Close the management session to ensure all the event handlers are released.
336+
await connectionContext.managementSession?.close().catch(() => {
337+
/* error already logged, swallow it here */
338+
});
331339

332-
// Close all senders and receivers to ensure clean up of timers & other resources.
333-
if (state.numSenders || state.numReceivers) {
334-
for (const senderName of Object.keys(connectionContext.senders)) {
335-
const sender = connectionContext.senders[senderName];
336-
await sender.close().catch(() => {
337-
/* error already logged, swallow it here */
338-
});
339-
}
340-
for (const receiverName of Object.keys(connectionContext.receivers)) {
341-
const receiver = connectionContext.receivers[receiverName];
342-
await receiver.close().catch(() => {
343-
/* error already logged, swallow it here */
344-
});
340+
// Close all senders and receivers to ensure clean up of timers & other resources.
341+
if (state.numSenders || state.numReceivers) {
342+
await Promise.all(
343+
Object.keys(connectionContext.senders).map((name) =>
344+
connectionContext.senders[name]?.close().catch(() => {
345+
/* error already logged, swallow it here */
346+
})
347+
)
348+
);
349+
350+
await Promise.all(
351+
Object.keys(connectionContext.receivers).map((name) =>
352+
connectionContext.receivers[name]?.close().catch(() => {
353+
/* error already logged, swallow it here */
354+
})
355+
)
356+
);
345357
}
358+
} catch (err) {
359+
logger.verbose(
360+
`[${connectionContext.connectionId}] An error occurred while closing the connection in 'disconnected'. %O`,
361+
err
362+
);
346363
}
347364

348-
await refreshConnection(connectionContext);
349-
waitForConnectionRefreshResolve();
350-
waitForConnectionRefreshPromise = undefined;
365+
try {
366+
await refreshConnection(connectionContext);
367+
} catch (err) {
368+
logger.verbose(
369+
`[${connectionContext.connectionId}] An error occurred while refreshing the connection in 'disconnected'. %O`,
370+
err
371+
);
372+
} finally {
373+
waitForConnectionRefreshResolve();
374+
waitForConnectionRefreshPromise = undefined;
375+
}
351376
};
352377

353378
const protocolError: OnAmqpEvent = async (context: EventContext) => {

sdk/eventhub/event-hubs/src/eventHubReceiver.ts

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { v4 as uuid } from "uuid";
54
import { logErrorStackTrace, logger } from "./log";
65
import {
76
EventContext,
@@ -38,7 +37,6 @@ interface CreateReceiverOptions {
3837
onClose: OnAmqpEvent;
3938
onSessionError: OnAmqpEvent;
4039
onSessionClose: OnAmqpEvent;
41-
newName?: boolean;
4240
eventPosition?: EventPosition;
4341
}
4442

@@ -617,21 +615,18 @@ export class EventHubReceiver extends LinkEntity {
617615
* @hidden
618616
*/
619617
private _createReceiverOptions(options: CreateReceiverOptions): RheaReceiverOptions {
620-
if (options.newName) this.name = uuid();
621618
const rcvrOptions: RheaReceiverOptions = {
622619
name: this.name,
623620
autoaccept: true,
624621
source: {
625622
address: this.address
626623
},
627624
credit_window: 0,
628-
onMessage: options.onMessage || ((context: EventContext) => this._onAmqpMessage(context)),
629-
onError: options.onError || ((context: EventContext) => this._onAmqpError(context)),
630-
onClose: options.onClose || ((context: EventContext) => this._onAmqpClose(context)),
631-
onSessionError:
632-
options.onSessionError || ((context: EventContext) => this._onAmqpSessionError(context)),
633-
onSessionClose:
634-
options.onSessionClose || ((context: EventContext) => this._onAmqpSessionClose(context))
625+
onMessage: options.onMessage,
626+
onError: options.onError,
627+
onClose: options.onClose,
628+
onSessionError: options.onSessionError,
629+
onSessionClose: options.onSessionClose
635630
};
636631

637632
if (typeof this.ownerLevel === "number") {

sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import { EventHubSender } from "../../../src/eventHubSender";
1010
import { createConnectionContext } from "../../../src/connectionContext";
1111
import { stub } from "sinon";
1212
import { MessagingError } from "@azure/core-amqp";
13+
import { EventHubReceiver } from "../../../src/eventHubReceiver";
14+
import { EventHubConsumerClient, latestEventPosition } from "../../../src";
1315
const env = getEnvVars();
1416

1517
describe("disconnected", function() {
18+
let partitionIds: string[] = [];
1619
const service = {
1720
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
1821
path: env[EnvVarKeys.EVENTHUB_NAME]
@@ -28,6 +31,16 @@ describe("disconnected", function() {
2831
);
2932
});
3033

34+
before("get partition ids", async function() {
35+
const client = new EventHubConsumerClient(
36+
EventHubConsumerClient.defaultConsumerGroupName,
37+
service.connectionString,
38+
service.path
39+
);
40+
partitionIds = await client.getPartitionIds();
41+
return client.close();
42+
});
43+
3144
describe("EventHubSender", function() {
3245
/**
3346
* Test added for issue https://github.com/Azure/azure-sdk-for-js/issues/15002
@@ -62,4 +75,124 @@ describe("disconnected", function() {
6275
await context.close();
6376
});
6477
});
78+
79+
describe("ConnectionContext", function() {
80+
describe("onDisconnected", function() {
81+
it("does not fail when entities are closed concurrently", async () => {
82+
const context = createConnectionContext(service.connectionString, service.path);
83+
84+
// Add 2 receivers.
85+
const receiver1 = new EventHubReceiver(
86+
context,
87+
EventHubConsumerClient.defaultConsumerGroupName,
88+
partitionIds[0],
89+
latestEventPosition
90+
);
91+
const receiver2 = new EventHubReceiver(
92+
context,
93+
EventHubConsumerClient.defaultConsumerGroupName,
94+
partitionIds[1],
95+
latestEventPosition
96+
);
97+
98+
// Add 2 senders.
99+
const sender1 = new EventHubSender(context);
100+
const sender2 = new EventHubSender(context);
101+
102+
// Initialize sender links
103+
await sender1["_getLink"]();
104+
await sender2["_getLink"]();
105+
106+
// Initialize receiver links
107+
await receiver1.initialize({
108+
abortSignal: undefined,
109+
timeoutInMs: 60000
110+
});
111+
await receiver2.initialize({
112+
abortSignal: undefined,
113+
timeoutInMs: 60000
114+
});
115+
116+
// We are going to override sender1's close method so that it also invokes receiver2's close method.
117+
const sender1Close = sender1.close.bind(sender1);
118+
sender1.close = async function() {
119+
sender2.close().catch(() => {
120+
/* no-op */
121+
});
122+
return sender1Close();
123+
};
124+
125+
// We are going to override receiver1's close method so that it also invokes receiver2's close method.
126+
const receiver1Close = receiver1.close.bind(receiver1);
127+
receiver1.close = async function() {
128+
receiver2.close().catch(() => {
129+
/* no-op */
130+
});
131+
return receiver1Close();
132+
};
133+
134+
context.connection["_connection"].idle();
135+
await context.readyToOpenLink();
136+
console.log("open", context.connection.isOpen());
137+
await context.close();
138+
});
139+
});
140+
141+
describe("close", function() {
142+
it("does not fail when entities are closed concurrently", async () => {
143+
const context = createConnectionContext(service.connectionString, service.path);
144+
145+
// Add 2 receivers.
146+
const receiver1 = new EventHubReceiver(
147+
context,
148+
EventHubConsumerClient.defaultConsumerGroupName,
149+
partitionIds[0],
150+
latestEventPosition
151+
);
152+
const receiver2 = new EventHubReceiver(
153+
context,
154+
EventHubConsumerClient.defaultConsumerGroupName,
155+
partitionIds[1],
156+
latestEventPosition
157+
);
158+
159+
// Add 2 senders.
160+
const sender1 = new EventHubSender(context);
161+
const sender2 = new EventHubSender(context);
162+
163+
// Initialize sender links
164+
await sender1["_getLink"]();
165+
await sender2["_getLink"]();
166+
167+
// Initialize receiver links
168+
await receiver1.initialize({
169+
abortSignal: undefined,
170+
timeoutInMs: 60000
171+
});
172+
await receiver2.initialize({
173+
abortSignal: undefined,
174+
timeoutInMs: 60000
175+
});
176+
177+
// We are going to override sender1's close method so that it also invokes receiver2's close method.
178+
const sender1Close = sender1.close.bind(sender1);
179+
sender1.close = async function() {
180+
sender2.close().catch(() => {
181+
/* no-op */
182+
});
183+
return sender1Close();
184+
};
185+
186+
// We are going to override receiver1's close method so that it also invokes receiver2's close method.
187+
const originalClose = receiver1.close.bind(receiver1);
188+
receiver1.close = async function() {
189+
receiver2.close().catch(() => {
190+
/* no-op */
191+
});
192+
return originalClose();
193+
};
194+
await context.close();
195+
});
196+
});
197+
});
65198
});

0 commit comments

Comments
 (0)