Skip to content

Commit 7b7b6ac

Browse files
authored
[Event Hubs] Fix sending Uint8Array (Azure#21185)
### Packages impacted by this PR @azure/event-hubs ### Issues associated with this PR Fixes Azure#21184 ### Describe the problem that is addressed by this PR When sending a message with `Uint8Array` (which is not strictly a `Buffer`), the client will stringify the payload first, see the last branch in: https://github.com/Azure/azure-sdk-for-js/blob/7926782dcb48b46cafe51cf1648d9c53b5320ec7/sdk/eventhub/event-hubs/src/dataTransformer.ts#L42-L67 This behavior forces the receiver on the other end to unnecessarily unstringify the payload first. ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? The design proposed in this PR is to handle the Uint8Array case the same as Buffer but that is a breaking change. This problem mainly affects sending avro-serialized payloads so alternatively, without introducing breaking changes, the message adapter can convert Uint8Array to a Buffer before handing it to the client for sending. This could work but will likely create confusion for customers who send Uint8Array payloads coming from both the serializer and other sources. ### Are there test cases added in this PR? _(If not, why?)_ Will be added after approval. ### Provide a list of related PRs _(if any)_ N/A ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [x] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [ ] Added a changelog (if necessary)
1 parent b9ef534 commit 7b7b6ac

File tree

4 files changed

+43
-2
lines changed

4 files changed

+43
-2
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- `MessageAdapter`'s `consumeMessage` and `produceMessage` have been renamed to `consume` and `produce`.
99

1010
### Bugs Fixed
11+
- The Uint8Array payload was being stringified first before it gets sent which caused the receiver to treat it as an object instead of a Uint8Array. This is now fixed and Uint8Array is being treated the same as a Buffer.
1112
- The hashing algorithm used to map partition keys to IDs in the buffered producer is no longer sensitive to the endianness of the local machine [Issue #21190](https://github.com/Azure/azure-sdk-for-js/issues/21190).
1213

1314
### Other Changes

sdk/eventhub/event-hubs/src/dataTransformer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export const defaultDataTransformer = {
4646
result.typecode = valueSectionTypeCode;
4747
} else if (bodyType === "sequence") {
4848
result = message.sequence_section(body);
49-
} else if (isBuffer(body)) {
49+
} else if (isBuffer(body) || body instanceof Uint8Array) {
5050
result = message.data_section(body);
5151
} else if (body === null && bodyType === "data") {
5252
result = message.data_section(null);

sdk/eventhub/event-hubs/test/public/eventHubConsumerClient.spec.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { EnvVarKeys, getEnvVars, getStartingPositionsForTests, loopUntil } from
1818
import { LogTester } from "./utils/logHelpers";
1919
import { ReceivedMessagesTester } from "./utils/receivedMessagesTester";
2020
import { TestInMemoryCheckpointStore } from "./utils/testInMemoryCheckpointStore";
21-
import chai from "chai";
21+
import chai, { expect } from "chai";
2222
import { createMockServer } from "./utils/mockService";
2323
import debugModule from "debug";
2424
import { testWithServiceTypes } from "./utils/testWithServiceTypes";
@@ -1267,5 +1267,41 @@ testWithServiceTypes((serviceVersion) => {
12671267
should.equal((caughtErr as MessagingError).code, "ArgumentOutOfRangeError");
12681268
});
12691269
});
1270+
1271+
describe("Types of received payloads", function (): void {
1272+
it(`Uint8Array is received as Buffer`, async function (): Promise<void> {
1273+
const partitionId = partitionIds[0];
1274+
const startingPositions = await getStartingPositionsForTests(consumerClient);
1275+
const data = [0, 1, 2];
1276+
await producerClient.sendBatch(
1277+
[{ body: Uint8Array.from(data), contentType: "avro/binary+1234" }],
1278+
{
1279+
partitionId,
1280+
}
1281+
);
1282+
let subscription: Subscription;
1283+
const receivedEvent = await new Promise<ReceivedEventData>((resolve, reject) => {
1284+
subscription = consumerClient.subscribe(
1285+
partitionId,
1286+
{
1287+
processEvents: async (events: ReceivedEventData[]) => {
1288+
resolve(events[0]);
1289+
},
1290+
processError: async (err) => {
1291+
reject(err);
1292+
},
1293+
},
1294+
{
1295+
startPosition: startingPositions,
1296+
}
1297+
);
1298+
});
1299+
await subscription!.close();
1300+
should.exist(receivedEvent);
1301+
const body = receivedEvent.body as Buffer;
1302+
expect(Buffer.isBuffer(body)).to.be.true;
1303+
expect(body.toJSON().data).to.be.deep.equal(data);
1304+
});
1305+
});
12701306
}).timeout(120000);
12711307
});

sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsBufferedProducerClient.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const groupName = process.env["SCHEMA_REGISTRY_GROUP"] || "AzureSdkSampleGroup";
2424
// The connection string for Event Hubs
2525
const eventHubsConnectionString = process.env["EVENTHUB_CONNECTION_STRING"] || "";
2626

27+
// The name of Event Hub the client will connect to
28+
const eventHubName = process.env["EVENTHUB_NAME"] || "";
29+
2730
// Sample Avro Schema for user with first and last names
2831
const schemaObject = {
2932
type: "record",
@@ -81,6 +84,7 @@ export async function main() {
8184

8285
const eventHubsBufferedProducerClient = new EventHubBufferedProducerClient(
8386
eventHubsConnectionString,
87+
eventHubName,
8488
{
8589
onSendEventsErrorHandler: handleError,
8690
}

0 commit comments

Comments
 (0)