Skip to content

Commit 4bd3842

Browse files
authored
[event-hubs] add tests for cancellation (Azure#15094)
1 parent 39591c2 commit 4bd3842

File tree

10 files changed

+335
-711
lines changed

10 files changed

+335
-711
lines changed

sdk/eventhub/event-hubs/src/util/timeoutAbortSignalUtils.ts

Lines changed: 0 additions & 88 deletions
This file was deleted.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import { AbortController } from "@azure/abort-controller";
5+
import chai from "chai";
6+
const should = chai.should();
7+
import chaiAsPromised from "chai-as-promised";
8+
import { createConnectionContext } from "../../src/connectionContext";
9+
import { EventHubReceiver } from "../../src/eventHubReceiver";
10+
import { EventHubSender } from "../../src/eventHubSender";
11+
chai.use(chaiAsPromised);
12+
13+
import { EnvVarKeys, getEnvVars } from "../public/utils/testUtils";
14+
const env = getEnvVars();
15+
16+
describe("Cancellation via AbortSignal", () => {
17+
const service = {
18+
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
19+
path: env[EnvVarKeys.EVENTHUB_NAME]
20+
};
21+
before("validate environment", () => {
22+
should.exist(
23+
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
24+
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
25+
);
26+
should.exist(
27+
env[EnvVarKeys.EVENTHUB_NAME],
28+
"define EVENTHUB_NAME in your environment before running integration tests."
29+
);
30+
});
31+
32+
let context: ReturnType<typeof createConnectionContext>;
33+
beforeEach("create connection context", function() {
34+
context = createConnectionContext(service.connectionString, service.path);
35+
});
36+
37+
afterEach("close connection context", function() {
38+
return context.close();
39+
});
40+
41+
const TEST_FAILURE = "Test failure";
42+
43+
const cancellationCases = [
44+
{
45+
type: "pre-aborted",
46+
getSignal() {
47+
const controller = new AbortController();
48+
controller.abort();
49+
return controller.signal;
50+
}
51+
},
52+
{
53+
type: "aborted after timeout",
54+
getSignal() {
55+
const controller = new AbortController();
56+
setTimeout(() => {
57+
controller.abort();
58+
}, 0);
59+
return controller.signal;
60+
}
61+
}
62+
];
63+
64+
describe("EventHubReceiver", () => {
65+
let client: EventHubReceiver;
66+
beforeEach("instantiate EventHubReceiver", () => {
67+
client = new EventHubReceiver(
68+
context,
69+
"$default", // consumer group
70+
"0", // partition id
71+
{
72+
enqueuedOn: Date.now()
73+
}
74+
);
75+
});
76+
77+
afterEach("close EventHubReceiver", () => {
78+
return client.close();
79+
});
80+
81+
for (const { type: caseType, getSignal } of cancellationCases) {
82+
it(`initialize supports cancellation (${caseType})`, async () => {
83+
const abortSignal = getSignal();
84+
try {
85+
await client.initialize({ abortSignal, timeoutInMs: 60000 });
86+
throw new Error(TEST_FAILURE);
87+
} catch (err) {
88+
should.equal(err.name, "AbortError");
89+
should.equal(err.message, "The operation was aborted.");
90+
}
91+
});
92+
93+
it(`receiveBatch supports cancellation (${caseType})`, async () => {
94+
const abortSignal = getSignal();
95+
try {
96+
await client.receiveBatch(10, undefined, abortSignal);
97+
throw new Error(TEST_FAILURE);
98+
} catch (err) {
99+
should.equal(err.name, "AbortError");
100+
should.equal(err.message, "The operation was aborted.");
101+
}
102+
});
103+
104+
it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => {
105+
// Open the connection.
106+
await client.initialize({ abortSignal: undefined, timeoutInMs: 60000 });
107+
try {
108+
const abortSignal = getSignal();
109+
await client.receiveBatch(10, undefined, abortSignal);
110+
throw new Error(TEST_FAILURE);
111+
} catch (err) {
112+
should.equal(err.name, "AbortError");
113+
should.equal(err.message, "The operation was aborted.");
114+
}
115+
});
116+
}
117+
});
118+
119+
describe("EventHubSender", () => {
120+
let client: EventHubSender;
121+
beforeEach("instantiate EventHubSender", () => {
122+
client = new EventHubSender(context);
123+
});
124+
125+
afterEach("close EventHubSender", () => {
126+
return client.close();
127+
});
128+
129+
for (const { type: caseType, getSignal } of cancellationCases) {
130+
it(`_getLink supports cancellation (${caseType})`, async () => {
131+
const abortSignal = getSignal();
132+
try {
133+
await client["_getLink"]({ abortSignal });
134+
throw new Error(TEST_FAILURE);
135+
} catch (err) {
136+
should.equal(err.name, "AbortError");
137+
should.equal(err.message, "The operation was aborted.");
138+
}
139+
});
140+
141+
it(`getMaxMessageSize supports cancellation (${caseType})`, async () => {
142+
const abortSignal = getSignal();
143+
try {
144+
await client.getMaxMessageSize({ abortSignal });
145+
throw new Error(TEST_FAILURE);
146+
} catch (err) {
147+
should.equal(err.name, "AbortError");
148+
should.equal(err.message, "The operation was aborted.");
149+
}
150+
});
151+
152+
it(`send supports cancellation (${caseType})`, async () => {
153+
const abortSignal = getSignal();
154+
try {
155+
await client.send([{ body: "unsung hero" }], { abortSignal });
156+
throw new Error(TEST_FAILURE);
157+
} catch (err) {
158+
should.equal(err.name, "AbortError");
159+
should.equal(err.message, "The operation was aborted.");
160+
}
161+
});
162+
}
163+
});
164+
});

sdk/eventhub/event-hubs/test/internal/eventProcessor.spec.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -618,28 +618,20 @@ describe("Event Processor", function(): void {
618618
// errors that occur within the user's own event handlers will get
619619
// routed to their processError() handler
620620
eventProcessor.start();
621-
console.log("event processor started");
622621
try {
623622
await loopUntil({
624623
name: "waiting for errors thrown from user's handlers",
625624
timeBetweenRunsMs: 1000,
626625
maxTimes: 30,
627626
until: async () => {
628-
console.log(partitionIds.length);
629-
console.dir(errors);
630627
return errors.size >= partitionIds.length * 3;
631628
}
632629
});
633-
console.log("event processor loop completed");
634630
const messages = [...errors].map((e) => e.message);
635631
messages.sort();
636-
console.dir(messages);
637-
console.dir(expectedErrorMessages);
638632
messages.should.deep.equal(expectedErrorMessages);
639633
} finally {
640-
console.log("attempting to stop");
641634
await eventProcessor.stop();
642-
console.log("stopped");
643635
}
644636
});
645637

sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ describe("disconnected", function() {
133133

134134
context.connection["_connection"].idle();
135135
await context.readyToOpenLink();
136-
console.log("open", context.connection.isOpen());
137136
await context.close();
138137
});
139138
});

0 commit comments

Comments
 (0)