Skip to content

Commit bc9c56f

Browse files
[Event Hubs] Add a sample for buffered producer (Azure#18674)
* [Event Hubs] Add a sample for buffered producer * edit * Apply suggestions from code review Ramya's feedback Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com> * address feedback * unpublish the sample for now * address feedback * fix format * address feedback * document the sending behavior * update comment for maxWaitTimeInMs * update formatting * address feedback * remove unused imports Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>
1 parent fa452d9 commit bc9c56f

File tree

1 file changed

+72
-0
lines changed

1 file changed

+72
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT Licence.
3+
4+
/**
5+
@summary Demonstrates how to send events to an Event Hub using the `EventHubBufferedProducerClient`.
6+
* This sample is different from the one in `sendEvent.ts` in that the client manages batching of events and sending
7+
* after a given amount of time or after a given amount of events are in a batch instead of you managing the same explicitly.
8+
*
9+
* @azsdk-weight 60
10+
*/
11+
12+
import { EventHubBufferedProducerClient, OnSendEventsErrorContext } from "@azure/event-hubs";
13+
14+
// Load the .env file if it exists
15+
import * as dotenv from "dotenv";
16+
dotenv.config();
17+
18+
const connectionString = process.env["EVENTHUB_CONNECTION_STRING"] || "";
19+
20+
async function handleError(ctx: OnSendEventsErrorContext): Promise<void> {
21+
console.log(`The following error occurred:`);
22+
console.log(JSON.stringify(ctx.error, undefined, 2));
23+
console.log(
24+
`The following events were not sent as a result to the partition with ID ${ctx.partitionId}:`
25+
);
26+
for (const event of ctx.events) {
27+
console.log(JSON.stringify(event, undefined, 2));
28+
console.log("\n");
29+
}
30+
}
31+
32+
export async function main(): Promise<void> {
33+
console.log(`Running sendBufferedEvents sample`);
34+
35+
/**
36+
* Create a buffered client that batches the enqueued events and sends it either
37+
* after 750ms or after batching 1000 events, whichever occurs first.
38+
*/
39+
const client = new EventHubBufferedProducerClient(connectionString, {
40+
/** An error handler must be provided */
41+
onSendEventsErrorHandler: handleError,
42+
43+
/** wait for up to 750 milliseconds before sending a batch */
44+
maxWaitTimeInMs: 750,
45+
46+
/** buffer up to 1000 events per partition before sending */
47+
maxEventBufferLengthPerPartition: 1000,
48+
});
49+
50+
function createData(count: number): number[] {
51+
return [...Array(count).keys()];
52+
}
53+
54+
console.log("Enqueuing events...");
55+
56+
for (const item of createData(2000)) {
57+
client.enqueueEvent({ body: item });
58+
}
59+
60+
/**
61+
* Flushing ensures buffered events that were not sent yet will be sent before
62+
* closing the connection. Flushing can also be invoked directly using
63+
* client.flush().
64+
*/
65+
await client.close({ flush: true });
66+
console.log(`Exiting sendBufferedEvents sample`);
67+
}
68+
69+
main().catch((error) => {
70+
console.error("Error running sample:", error);
71+
process.exit(1);
72+
});

0 commit comments

Comments
 (0)