Skip to content

Commit 090f85c

Browse files
authored
[Event Hubs] Rewrite partition receiver (Azure#24731)
# Re-implementing the Event Receiver This PR re-implements the event receiver using promises and a single queue to fix an ordering issue and to correct waiting behavior. ## Problem Statement [Issue Azure#23993] A customer reported that the list of events passed into the `processEvents` callback is not always ordered by `sequenceNumber`. This leads to processing the events in a wrong order. The customer provided a sample that prints an out of order message when the `sequenceNumber` of received messages is not in order and I confirm that I see the message printed sometimes. ## Analysis The customer-provided callback, `processEvents`, gets called every time a batch of events is received from the service. This batch is coming from a single partition. Events are ordered within a partition by their `sequenceNumber`, and events received by `processEvents` should be in the same order. However currently, the list of events the `processEvents` callback gets called on is not always in-order. Upon further investigation, it was found that the library implements a complex logic to read events from the service. It maintains two queues for reading events, one for building a batch of events that will be sent to the next call of the `processEvents` callback, and another for when errors occur or there are no active listeners. The coordination to read events from the two queues is subtle and is the source of the ordering bug. ## Re-design The most straightforward way to simplify this design and to ensure ordering is to use a single queue and add incoming events to it in the order they're received. Reading from this queue is as simple as the following: - If the queue contains any events, check if their count is already the `maxMessageCount` or more: - If yes, remove `maxMessageCount` events and return them immediately - If no, wait for a few milliseconds and then remove up to `maxMessageCount` and return them - If the queue doesn't contain any events, wait until the `maxWaitTimeInSeconds` and then return an empty list, or until one or more event arrive and then return those ### Abstraction The idea is concisely captured by `waitForEvents`, a newly introduced function that races a list of promises, one for each of the scenarios listed above: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L733-L739 The first promise resolves right away and is returned if the queue already has `maxMessageCount` events or more. It corresponds to the first scenario listed above. The second promise is created by the `checkOnInterval` function. The promise is resolved only if the queue has any events in it. Otherwise, it keeps checking every number of milliseconds. Note that chained to it is a timer promise that waits another number of milliseconds to give the service a chance to send more events. This corresponds to the second scenario listed above. The third promise is a simple timer promise that is resolved after the `maxWaitTime` has elapsed. This promise corresponds to the third scenario. ### Rewrite In addition to some other minor improvements, the `receiveBatch` method is concisely rewritten using that abstraction as follows: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L578-L628 Notice that the chain of promises makes the algorithm simple to read: a link is established first, credits are added to it as needed, and then the waiting starts. Also, notice that at this point, no actual events were read from the queue yet, all what this does is waiting until one of the promises resolve. The actual reading from the queue is thened to that chain so that it happens only after everything else is said and done. For example, if an error occurred, it should be handled and we don't want to prematurely mutate the queue. The reading from the queue is as simple as the following: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/src/eventHubReceiver.ts#L630 ## Other changes ### Exporting `core-util`'s `createAbortablePromise` This function was added in Azure#24821 and proved to be useful in this re-write so I am exporting it. I am planning on using it in core-lro too. ### Updating tests There are two tests updated, one for authentication and one for returning events in the presence of retryable and non-retryable errors. In the former, the receiver is expected to receive events after the auth token has been invalidated but not yet refreshed. However, I am observing that a disconnected event has been received at that moment and the receiver has been deleted. The old receiver's behavior is to continue receiving despite the deletion but the new one's behavior correctly cleans up the receiver. I deleted this expectation for now. In the latter, the test forces an error on the receiver after 50 milliseconds but the receiver already finishes around 40 milliseconds, so I updated the forced error to happen sooner, at 10 milliseconds: https://github.com/Azure/azure-sdk-for-js/blob/10826927554e7254dce0a4849f1e0c8219373522/sdk/eventhub/event-hubs/test/internal/receiveBatch.spec.ts#L107 Finally, a couple test suites were added for `waitForEvents` and `checkOnInterval` functions. ## Updates in action Live tests succeed [[here](https://dev.azure.com/azure-sdk/internal/_build/results?buildId=2201768&view=results)]. Please ignore the timeout in the deployed resources script in canary, it is an unrelated service issue, see [[here](https://dev.azure.com/azure-sdk/internal/_build/results?buildId=2198994&view=results)]. A log for how the updated receiver behaves when used by the customer sample can be found in [log2.txt](https://github.com/Azure/azure-sdk-for-js/files/10775378/log2.txt). Notice that the out of order message was never printed. ## Reviewing tips The changes in `eventHubReceiver.ts` are too many and the diff is not easily readable. I highly suggest to review Azure@1082692 instead because it is on top of a deleting commit so there is no diff to wrestle with. The main changes are in `receiveBatch` but please feel free to review the rest of the module too.
1 parent 29773b2 commit 090f85c

File tree

18 files changed

+576
-464
lines changed

18 files changed

+576
-464
lines changed

sdk/core/core-util/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# Release History
22

3-
## 1.1.2 (Unreleased)
3+
## 1.2.0 (Unreleased)
44

55
### Features Added
66

7+
- Add `createAbortablePromise` which creates promises that can be aborted.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/core/core-util/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@azure/core-util",
3-
"version": "1.1.2",
3+
"version": "1.2.0",
44
"description": "Core library for shared utility methods",
55
"sdk-type": "client",
66
"main": "dist/index.js",

sdk/core/core-util/review/core-util.api.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ export function computeSha256Hash(content: string, encoding: "base64" | "hex"):
1212
// @public
1313
export function computeSha256Hmac(key: string, stringToSign: string, encoding: "base64" | "hex"): Promise<string>;
1414

15+
// @public
16+
export function createAbortablePromise<T>(buildPromise: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void, options?: CreateAbortablePromiseOptions): Promise<T>;
17+
18+
// @public
19+
export interface CreateAbortablePromiseOptions {
20+
abortErrorMsg?: string;
21+
abortSignal?: AbortSignalLike;
22+
cleanupBeforeAbort?: () => void;
23+
}
24+
1525
// @public
1626
export function delay(timeInMs: number, options?: DelayOptions): Promise<void>;
1727

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
5+
6+
/**
7+
* Options for the createAbortablePromise function.
8+
*/
9+
export interface CreateAbortablePromiseOptions {
10+
/** A function to be called if the promise was aborted */
11+
cleanupBeforeAbort?: () => void;
12+
/** An abort signal */
13+
abortSignal?: AbortSignalLike;
14+
/** An abort error message */
15+
abortErrorMsg?: string;
16+
}
17+
18+
/**
19+
* Creates an abortable promise.
20+
* @param buildPromise - A function that takes the resolve and reject functions as parameters.
21+
* @param options - The options for the abortable promise.
22+
* @returns A promise that can be aborted.
23+
*/
24+
export function createAbortablePromise<T>(
25+
buildPromise: (
26+
resolve: (value: T | PromiseLike<T>) => void,
27+
reject: (reason?: any) => void
28+
) => void,
29+
options?: CreateAbortablePromiseOptions
30+
): Promise<T> {
31+
const { cleanupBeforeAbort, abortSignal, abortErrorMsg } = options ?? {};
32+
return new Promise((resolve, reject) => {
33+
function rejectOnAbort(): void {
34+
reject(new AbortError(abortErrorMsg ?? "The operation was aborted."));
35+
}
36+
function removeListeners(): void {
37+
abortSignal?.removeEventListener("abort", onAbort);
38+
}
39+
function onAbort(): void {
40+
cleanupBeforeAbort?.();
41+
removeListeners();
42+
rejectOnAbort();
43+
}
44+
if (abortSignal?.aborted) {
45+
return rejectOnAbort();
46+
}
47+
try {
48+
buildPromise(
49+
(x) => {
50+
removeListeners();
51+
resolve(x);
52+
},
53+
(x) => {
54+
removeListeners();
55+
reject(x);
56+
}
57+
);
58+
} catch (err) {
59+
reject(err);
60+
}
61+
abortSignal?.addEventListener("abort", onAbort);
62+
});
63+
}

sdk/core/core-util/src/delay.ts

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
4+
import { AbortSignalLike } from "@azure/abort-controller";
5+
import { createAbortablePromise } from "./createAbortablePromise";
56

67
const StandardAbortMessage = "The delay was aborted.";
78

@@ -19,58 +20,6 @@ export interface DelayOptions {
1920
abortErrorMsg?: string;
2021
}
2122

22-
/**
23-
* Creates an abortable promise.
24-
* @param buildPromise - A function that takes the resolve and reject functions as parameters.
25-
* @param options - The options for the abortable promise.
26-
* @returns A promise that can be aborted.
27-
* @internal
28-
*/
29-
export function createAbortablePromise<T>(
30-
buildPromise: (
31-
resolve: (value: T | PromiseLike<T>) => void,
32-
reject: (reason?: any) => void
33-
) => void,
34-
options?: {
35-
cleanupBeforeAbort?: () => void;
36-
abortSignal?: AbortSignalLike;
37-
abortErrorMsg?: string;
38-
}
39-
): Promise<T> {
40-
const { cleanupBeforeAbort, abortSignal, abortErrorMsg } = options ?? {};
41-
return new Promise((resolve, reject) => {
42-
function rejectOnAbort(): void {
43-
reject(new AbortError(abortErrorMsg ?? "The operation was aborted."));
44-
}
45-
function removeListeners(): void {
46-
abortSignal?.removeEventListener("abort", onAbort);
47-
}
48-
function onAbort(): void {
49-
cleanupBeforeAbort?.();
50-
removeListeners();
51-
rejectOnAbort();
52-
}
53-
if (abortSignal?.aborted) {
54-
return rejectOnAbort();
55-
}
56-
try {
57-
buildPromise(
58-
(x) => {
59-
removeListeners();
60-
resolve(x);
61-
},
62-
(x) => {
63-
removeListeners();
64-
reject(x);
65-
}
66-
);
67-
} catch (err) {
68-
reject(err);
69-
}
70-
abortSignal?.addEventListener("abort", onAbort);
71-
});
72-
}
73-
7423
/**
7524
* A wrapper for setTimeout that resolves a promise after timeInMs milliseconds.
7625
* @param timeInMs - The number of milliseconds to be delayed.

sdk/core/core-util/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
export { isNode } from "./isNode";
55
export { delay, DelayOptions } from "./delay";
6+
export { createAbortablePromise, CreateAbortablePromiseOptions } from "./createAbortablePromise";
67
export { getRandomIntegerInclusive } from "./random";
78
export { isObject, UnknownObject } from "./object";
89
export { isError, getErrorMessage } from "./error";

sdk/core/core-util/test/internal/createAbortablePromise.spec.ts renamed to sdk/core/core-util/test/public/createAbortablePromise.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as sinon from "sinon";
55
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
66
import chai from "chai";
77
import chaiAsPromised from "chai-as-promised";
8-
import { createAbortablePromise } from "../../src/delay";
8+
import { createAbortablePromise } from "../../src/createAbortablePromise";
99

1010
chai.use(chaiAsPromised);
1111
const { assert } = chai;

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
# Release History
22

3-
## 5.8.1 (Unreleased)
3+
## 5.9.0 (Unreleased)
44

55
### Features Added
66

77
### Breaking Changes
88

99
### Bugs Fixed
1010

11+
- Fixing a bug where events were not always received in order [#23993](https://github.com/Azure/azure-sdk-for-js/issues/23993).
12+
1113
### Other Changes
1214

15+
- The receiver no longer attempts to build batches of `maxMessageCount` size, instead, it returns batches as soon as they are received from the service, up to `maxMessageCount`.
16+
1317
## 5.8.0 (2022-05-10)
1418

1519
### Breaking Changes

sdk/eventhub/event-hubs/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/event-hubs",
33
"sdk-type": "client",
4-
"version": "5.8.1",
4+
"version": "5.9.0",
55
"description": "Azure Event Hubs SDK for JS.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",
@@ -49,7 +49,7 @@
4949
"build:node": "tsc -p . && dev-tool run bundle",
5050
"build:samples": "echo Obsolete.",
5151
"build:test:browser": "tsc -p . && rollup -c rollup.test.config.js 2>&1",
52-
"build:test:node": "tsc -p . && dev-tool run bundle",
52+
"build:test:node": "tsc -p .",
5353
"build:test": "tsc -p . && rollup -c rollup.test.config.js 2>&1 && npm run generate-certs && copyfiles -f ./test/internal/node/partitionKeyHashMap.json ./dist-esm/test/internal/node",
5454
"build:types": "downlevel-dts types/latest types/3.1",
5555
"build": "npm run clean && tsc -p . && dev-tool run bundle && api-extractor run --local && npm run build:types",
@@ -70,7 +70,7 @@
7070
"test:node": "npm run build:test && npm run unit-test:node && npm run integration-test:node",
7171
"test": "npm run build:test && npm run unit-test && npm run integration-test",
7272
"unit-test:browser": "echo skipped",
73-
"unit-test:node": "cross-env NODE_EXTRA_CA_CERTS=\"./certs/my-private-root-ca.crt.pem\" TEST_TARGET=mock DISABLE_MULTI_VERSION_TESTING=true nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"dist-esm/test/internal/*.spec.js\" \"dist-esm/test/public/*.spec.js\" \"dist-esm/test/public/**/*.spec.js\" \"dist-esm/test/internal/**/*.spec.js\"",
73+
"unit-test:node": "cross-env NODE_EXTRA_CA_CERTS=\"./certs/my-private-root-ca.crt.pem\" TEST_TARGET=mock DISABLE_MULTI_VERSION_TESTING=true mocha -r esm -r ts-node/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 1200000 --full-trace \"test/internal/*.spec.ts\" \"test/public/*.spec.ts\" \"test/public/**/*.spec.ts\" \"test/internal/**/*.spec.ts\"",
7474
"unit-test": "npm run unit-test:node && npm run unit-test:browser"
7575
},
7676
"//metadata": {

0 commit comments

Comments
 (0)