Skip to content

Commit 21a0957

Browse files
authored
[event-hubs] fixes issue where processEvents ignores maxWaitTime after retryable disconnect event (Azure#12280)
Fixes Azure#12278 ## Description This PR adds a check within the `PartitionPump`'s receive events loop to determine whether the receiver has been closed. If the receiver has been closed, it will create a new receiver using an event start position that matches the last event seen by the pump. The receiver is explicitly closed when a disconnected event is received on the underlying AMQP connection, which causes calls to `receiveBatch` to immediately return any events it had collected up to this point. Note that once the receiver is closed, the receiver's onAmqpMessage handler is removed so it won't receive any additional events. ## Updates to testing I manually tested the changes in this PR against the sample code in the linked issue. I also updated the existing `disconnect` test to confirm that - the `maxWaitTimeInSeconds` is honoured after a disconnected event is encountered. - new events can be received on subsequent `processEvents` invocations.
1 parent e734672 commit 21a0957

File tree

4 files changed

+123
-15
lines changed

4 files changed

+123
-15
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 5.3.1 (Unreleased)
44

5+
- Fixes issue [#12278](https://github.com/Azure/azure-sdk-for-js/issues/12278)
6+
where the `processEvents` handler could ignore the `maxWaitTimeInSeconds`
7+
parameter after a disconnection.
58

69
## 5.3.0 (2020-09-08)
710

sdk/eventhub/event-hubs/src/eventHubReceiver.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,13 @@ export class EventHubReceiver extends LinkEntity {
176176
return this._isReceivingMessages;
177177
}
178178

179+
/**
180+
* Indicates if the receiver has been closed.
181+
*/
182+
get isClosed(): boolean {
183+
return this._isClosed;
184+
}
185+
179186
/**
180187
* @property The last enqueued event information. This property will only
181188
* be enabled when `trackLastEnqueuedEventProperties` option is set to true

sdk/eventhub/event-hubs/src/partitionPump.ts

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,38 +59,77 @@ export class PartitionPump {
5959
);
6060
}
6161

62-
private async _receiveEvents(partitionId: string): Promise<void> {
62+
/**
63+
* Creates a new `EventHubReceiver` and replaces any existing receiver.
64+
* @param partitionId The partition the receiver should read messages from.
65+
* @param lastSeenSequenceNumber The sequence number to begin receiving messages from (exclusive).
66+
* If `-1`, then the PartitionPump's startPosition will be used instead.
67+
*/
68+
private _setOrReplaceReceiver(
69+
partitionId: string,
70+
lastSeenSequenceNumber: number
71+
): EventHubReceiver {
72+
// Determine what the new EventPosition should be.
73+
// If this PartitionPump has received events, we'll start from the last
74+
// seen sequenceNumber (exclusive).
75+
// Otherwise, use the `_startPosition`.
76+
const currentEventPosition: EventPosition =
77+
lastSeenSequenceNumber >= 0
78+
? {
79+
sequenceNumber: lastSeenSequenceNumber,
80+
isInclusive: false
81+
}
82+
: this._startPosition;
83+
84+
// Set or replace the PartitionPump's receiver.
6385
this._receiver = new EventHubReceiver(
6486
this._context,
6587
this._partitionProcessor.consumerGroup,
6688
partitionId,
67-
this._startPosition,
89+
currentEventPosition,
6890
{
6991
ownerLevel: this._processorOptions.ownerLevel,
7092
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
7193
retryOptions: this._processorOptions.retryOptions
7294
}
7395
);
7496

97+
return this._receiver;
98+
}
99+
100+
private async _receiveEvents(partitionId: string): Promise<void> {
101+
let lastSeenSequenceNumber = -1;
102+
let receiver = this._setOrReplaceReceiver(partitionId, lastSeenSequenceNumber);
103+
75104
while (this._isReceiving) {
76105
try {
77-
const receivedEvents = await this._receiver.receiveBatch(
106+
// Check if the receiver was closed so we can recreate it.
107+
if (receiver.isClosed) {
108+
receiver = this._setOrReplaceReceiver(partitionId, lastSeenSequenceNumber);
109+
}
110+
111+
const receivedEvents = await receiver.receiveBatch(
78112
this._processorOptions.maxBatchSize,
79113
this._processorOptions.maxWaitTimeInSeconds,
80114
this._abortController.signal
81115
);
82116

83117
if (
84118
this._processorOptions.trackLastEnqueuedEventProperties &&
85-
this._receiver.lastEnqueuedEventProperties
119+
receiver.lastEnqueuedEventProperties
86120
) {
87-
this._partitionProcessor.lastEnqueuedEventProperties = this._receiver.lastEnqueuedEventProperties;
121+
this._partitionProcessor.lastEnqueuedEventProperties =
122+
receiver.lastEnqueuedEventProperties;
88123
}
89124
// avoid calling user's processEvents handler if the pump was stopped while receiving events
90125
if (!this._isReceiving) {
91126
return;
92127
}
93128

129+
if (receivedEvents.length) {
130+
lastSeenSequenceNumber = receivedEvents[receivedEvents.length - 1].sequenceNumber;
131+
}
132+
94133
const span = createProcessingSpan(
95134
receivedEvents,
96135
{

sdk/eventhub/event-hubs/test/node/disconnects.spec.ts

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,30 +50,86 @@ describe("disconnected", function() {
5050
});
5151

5252
it("should receive after a disconnect", async () => {
53-
const client = new EventHubConsumerClient(
53+
/**
54+
* This test validates that an `EventHubConsumerClient.subscribe()` call continues
55+
* receiving events after a `disconnected` event occurs on the underlying connection.
56+
*
57+
* https://github.com/Azure/azure-sdk-for-js/pull/12280 describes an issue where `processEvents`
58+
* would be invoked with 0 events and ignoring the `maxWaitTimeInSeconds` after a `disconnected` event.
59+
*
60+
* For a single `subscribe()` call, this test does the following:
61+
* 1. Ensure events can be received normally before the `disconnected` event.
62+
* 2. Ensure that the `maxWaitTimeInSeconds` is honoured after a `disconnected` event.
63+
* 3. Ensure that events can be received normally after the `disconnected` event.
64+
*/
65+
const consumer = new EventHubConsumerClient(
5466
EventHubConsumerClient.defaultConsumerGroupName,
5567
service.connectionString,
5668
service.path
5769
);
70+
71+
const producer = new EventHubProducerClient(service.connectionString, service.path);
72+
const eventSentBeforeDisconnect = { body: "the first event" };
73+
const eventSentAfterDisconnect = { body: "the second event" };
74+
75+
const maxWaitTimeInSeconds = 10;
5876
const partitionId = "0";
59-
const partitionProperties = await client.getPartitionProperties(partitionId);
60-
const clientConnectionContext = client["_context"];
77+
const partitionProperties = await consumer.getPartitionProperties(partitionId);
78+
const clientConnectionContext = consumer["_context"];
79+
80+
// Send the first event after getting partition properties so that we can expect to receive it.
81+
await producer.sendBatch([eventSentBeforeDisconnect], { partitionId });
6182

6283
let subscription: Subscription | undefined;
6384
let originalConnectionId: string;
6485

86+
let processEventsInvocationCount = 0;
87+
let firstInvocationEndTime = 0;
6588
await new Promise((resolve, reject) => {
66-
subscription = client.subscribe(
89+
subscription = consumer.subscribe(
6790
partitionId,
6891
{
6992
processEvents: async (data) => {
93+
processEventsInvocationCount++;
7094
should.exist(data);
71-
should.equal(data.length, 0);
72-
if (!originalConnectionId) {
95+
if (processEventsInvocationCount === 1) {
96+
// 1. Ensure events can be received normally before the `disconnected` event.
97+
should.equal(
98+
data.length,
99+
1,
100+
"Expected to receive 1 event in first processEvents invocation."
101+
);
102+
should.equal(data[0].body, eventSentBeforeDisconnect.body);
73103
originalConnectionId = clientConnectionContext.connectionId;
74104
// Trigger a disconnect on the underlying connection.
75105
clientConnectionContext.connection["_connection"].idle();
76-
} else {
106+
firstInvocationEndTime = Date.now();
107+
} else if (processEventsInvocationCount === 2) {
108+
// 2. Ensure that the `maxWaitTimeInSeconds` is honoured after a `disconnected` event.
109+
// No new events should have been received at this point since we received the last event in the previous invocation.
110+
should.equal(
111+
data.length,
112+
0,
113+
"Expected to receive 0 events in second processEvents invocation."
114+
);
115+
// The elapsed time since the last processEvents invocation should be >= maxWaitTimeInSeconds
116+
should.equal(
117+
Date.now() - firstInvocationEndTime >= maxWaitTimeInSeconds,
118+
true,
119+
"Expected elapsed time between first and second processEvents invocations to be >= maxWaitTimeInSeconds."
120+
);
121+
const newConnectionId = clientConnectionContext.connectionId;
122+
should.not.equal(originalConnectionId, newConnectionId);
123+
// Send a new event that will be immediately receivable.
124+
await producer.sendBatch([eventSentAfterDisconnect], { partitionId });
125+
} else if (processEventsInvocationCount === 3) {
126+
// 3. Ensure that events can be received normally after the `disconnected` event.
127+
should.equal(
128+
data.length,
129+
1,
130+
"Expected to receive 1 event in third processEvents invocation."
131+
);
132+
should.equal(data[0].body, eventSentAfterDisconnect.body);
77133
const newConnectionId = clientConnectionContext.connectionId;
78134
should.not.equal(originalConnectionId, newConnectionId);
79135
resolve();
@@ -84,13 +140,16 @@ describe("disconnected", function() {
84140
}
85141
},
86142
{
87-
startPosition: { sequenceNumber: partitionProperties.lastEnqueuedSequenceNumber },
88-
maxWaitTimeInSeconds: 2
143+
startPosition: {
144+
sequenceNumber: partitionProperties.lastEnqueuedSequenceNumber
145+
},
146+
maxWaitTimeInSeconds
89147
}
90148
);
91149
});
92150
await subscription!.close();
93-
await client.close();
151+
await consumer.close();
152+
await producer.close();
94153
});
95154
});
96155

0 commit comments

Comments
 (0)