Skip to content

Commit dec897a

Browse files
authored
[core-amqp][event-hubs][service-bus] Fixes _process of undefined TypeError (Azure#15597)
This PR fixes Azure#13500. `rhea` 2.0.1 contains the fix to this specific error. We currently use `rhea` 1.x, so there's additional work in this PR to workaround the single breaking change in `rhea`, and the breaking changes in `rhea-promise`. ### rhea breaking change `rhea` contains 1 breaking change between versions 1.x and 2.x: timestamp types are now deserialized as Date objects instead of numbers. Unfortunately since this changes the way users' data might be deserialized in their service bus messages or event hubs events, we have to convert Date objects back to numbers in our client libraries until we do a major version bump. (Shorter term we can look at using rhea's default behavior behind a flag.) ### rhea-promise breaking changes Some of the `rhea-promise` APIs that accepted multiple optional positional arguments have been updated to take a single options bag parameter at the end of their method parameter list. AwaitableSender was also updated so that a timeout is no provided at instantiation. Instead, it must be provided per each `send()` call. ### core-amqp v3 Since core-amqp is being updated to depend on rhea 2.x, core-amqp dependencies will also pull in rhea 2.x transitively. To ensure that existing versions of event hubs and service bus don't break by deserializing timestamps as Date objects, core-amqp is updated to a new major version: v3. Once Azure#15349 is merged, we can also remove `AsyncLock` completely, so I'd like to merge that PR in before releasing the changes in this PR.
1 parent 8c726b8 commit dec897a

File tree

17 files changed

+1556
-1222
lines changed

17 files changed

+1556
-1222
lines changed

common/config/rush/pnpm-lock.yaml

Lines changed: 1332 additions & 1161 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# Release History
22

3-
## 2.3.1 (Unreleased)
3+
## 3.0.0 (Unreleased)
44

5+
### Breaking changes
6+
7+
- Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects.
58

69
## 2.3.0 (2021-04-29)
710

sdk/core/core-amqp/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/core-amqp",
33
"sdk-type": "client",
4-
"version": "2.3.1",
4+
"version": "3.0.0",
55
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",
@@ -78,8 +78,8 @@
7878
"events": "^3.0.0",
7979
"jssha": "^3.1.0",
8080
"process": "^0.11.10",
81-
"rhea": "^1.0.24",
82-
"rhea-promise": "^1.2.1",
81+
"rhea": "^2.0.2",
82+
"rhea-promise": "^2.0.0",
8383
"tslib": "^2.0.0",
8484
"url": "^0.11.0",
8585
"util": "^0.12.1"

sdk/core/core-amqp/src/messageProperties.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ export const AmqpMessageProperties = {
7979
toRheaMessageProperties(props: AmqpMessageProperties): RheaMessageProperties {
8080
const amqpProperties: RheaMessageProperties = {};
8181
if (props.absoluteExpiryTime != undefined) {
82-
amqpProperties.absolute_expiry_time = props.absoluteExpiryTime;
82+
amqpProperties.absolute_expiry_time = new Date(props.absoluteExpiryTime);
8383
}
8484
if (props.contentEncoding != undefined) {
8585
amqpProperties.content_encoding = props.contentEncoding;
@@ -91,7 +91,7 @@ export const AmqpMessageProperties = {
9191
amqpProperties.correlation_id = props.correlationId;
9292
}
9393
if (props.creationTime != undefined) {
94-
amqpProperties.creation_time = props.creationTime;
94+
amqpProperties.creation_time = new Date(props.creationTime);
9595
}
9696
if (props.groupId != undefined) {
9797
amqpProperties.group_id = props.groupId;
@@ -130,7 +130,7 @@ export const AmqpMessageProperties = {
130130
fromRheaMessageProperties(props: RheaMessageProperties): AmqpMessageProperties {
131131
const msgProperties: AmqpMessageProperties = {};
132132
if (props.absolute_expiry_time != undefined) {
133-
msgProperties.absoluteExpiryTime = props.absolute_expiry_time;
133+
msgProperties.absoluteExpiryTime = props.absolute_expiry_time.getTime();
134134
}
135135
if (props.content_encoding != undefined) {
136136
msgProperties.contentEncoding = props.content_encoding;
@@ -142,7 +142,7 @@ export const AmqpMessageProperties = {
142142
msgProperties.correlationId = props.correlation_id;
143143
}
144144
if (props.creation_time != undefined) {
145-
msgProperties.creationTime = props.creation_time;
145+
msgProperties.creationTime = props.creation_time.getTime();
146146
}
147147
if (props.group_id != undefined) {
148148
msgProperties.groupId = props.group_id;

sdk/core/core-amqp/test/message.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ describe("message", function() {
9393
// userId: ""
9494
};
9595
const amqpMsgPropertiesExpected: RheaMessageProperties = {
96-
absolute_expiry_time: 0,
96+
absolute_expiry_time: new Date(0),
9797
content_encoding: "",
9898
content_type: "",
9999
correlation_id: 0,
100-
creation_time: 0,
100+
creation_time: new Date(0),
101101
group_id: "",
102102
group_sequence: 0,
103103
message_id: "",
@@ -140,11 +140,11 @@ describe("message", function() {
140140
// userId: ""
141141
};
142142
const amqpMsgProperties: RheaMessageProperties = {
143-
absolute_expiry_time: 0,
143+
absolute_expiry_time: new Date(0),
144144
content_encoding: "",
145145
content_type: "",
146146
correlation_id: 0,
147-
creation_time: 0,
147+
creation_time: new Date(0),
148148
group_id: "",
149149
group_sequence: 0,
150150
message_id: "",

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 5.5.2 (Unreleased)
44

5+
### Bug fixes
6+
7+
- Fixes issue [#13500](https://github.com/Azure/azure-sdk-for-js/issues/13500) where a `TypeError: Cannot read property '_process' of undefined` could be thrown in rare cases.
58

69
## 5.5.1 (2021-04-29)
710

sdk/eventhub/event-hubs/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
},
9191
"dependencies": {
9292
"@azure/abort-controller": "^1.0.0",
93-
"@azure/core-amqp": "^2.3.0",
93+
"@azure/core-amqp": "^3.0.0",
9494
"@azure/core-asynciterator-polyfill": "^1.0.0",
9595
"@azure/core-auth": "^1.3.0",
9696
"@azure/core-tracing": "1.0.0-preview.11",
@@ -99,7 +99,7 @@
9999
"is-buffer": "^2.0.3",
100100
"jssha": "^3.1.0",
101101
"process": "^0.11.10",
102-
"rhea-promise": "^1.2.1",
102+
"rhea-promise": "^2.0.0",
103103
"tslib": "^2.0.0",
104104
"uuid": "^8.3.0"
105105
},

sdk/eventhub/event-hubs/src/eventData.ts

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise";
55
import { Constants } from "@azure/core-amqp";
6-
import { isDefined } from "./util/typeGuards";
6+
import { isDefined, objectHasProperty } from "./util/typeGuards";
77

88
/**
99
* Describes the delivery annotations.
@@ -156,13 +156,15 @@ export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
156156
if (!data.systemProperties) {
157157
data.systemProperties = {};
158158
}
159-
data.systemProperties[annotationKey] = msg.message_annotations[annotationKey];
159+
data.systemProperties[annotationKey] = convertDatesToNumbers(
160+
msg.message_annotations[annotationKey]
161+
);
160162
break;
161163
}
162164
}
163165
}
164166
if (msg.application_properties) {
165-
data.properties = msg.application_properties;
167+
data.properties = convertDatesToNumbers(msg.application_properties);
166168
}
167169
if (msg.delivery_annotations) {
168170
data.lastEnqueuedOffset = msg.delivery_annotations.last_enqueued_offset;
@@ -181,7 +183,9 @@ export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
181183
data.systemProperties = {};
182184
}
183185
if (msg[messageProperty] != null) {
184-
data.systemProperties[messagePropertiesMap[messageProperty]] = msg[messageProperty];
186+
data.systemProperties[messagePropertiesMap[messageProperty]] = convertDatesToNumbers(
187+
msg[messageProperty]
188+
);
185189
}
186190
}
187191

@@ -284,3 +288,43 @@ export interface ReceivedEventData {
284288
[key: string]: any;
285289
};
286290
}
291+
292+
/**
293+
* Converts any Date objects into a number representing date.getTime().
294+
* Recursively checks for any Date objects in arrays and objects.
295+
* @internal
296+
*/
297+
function convertDatesToNumbers<T = unknown>(thing: T): T {
298+
// fast exit
299+
if (!isDefined(thing)) return thing;
300+
301+
// When 'thing' is a Date, return the number representation
302+
if (
303+
typeof thing === "object" &&
304+
objectHasProperty(thing, "getTime") &&
305+
typeof thing.getTime === "function"
306+
) {
307+
return thing.getTime();
308+
}
309+
310+
/*
311+
Examples:
312+
[0, 'foo', new Date(), { nested: new Date()}]
313+
*/
314+
if (Array.isArray(thing)) {
315+
return (thing.map(convertDatesToNumbers) as unknown) as T;
316+
}
317+
318+
/*
319+
Examples:
320+
{ foo: new Date(), children: { nested: new Date() }}
321+
*/
322+
if (typeof thing === "object" && isDefined(thing)) {
323+
thing = { ...thing };
324+
for (const key of Object.keys(thing)) {
325+
(thing as any)[key] = convertDatesToNumbers((thing as any)[key]);
326+
}
327+
}
328+
329+
return thing;
330+
}

sdk/eventhub/event-hubs/src/eventHubSender.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ export class EventHubSender extends LinkEntity {
303303
);
304304
}
305305

306-
private _createSenderOptions(timeoutInMs: number, newName?: boolean): AwaitableSenderOptions {
306+
private _createSenderOptions(newName?: boolean): AwaitableSenderOptions {
307307
if (newName) this.name = `${uuid()}`;
308308
const srOptions: AwaitableSenderOptions = {
309309
name: this.name,
@@ -313,8 +313,7 @@ export class EventHubSender extends LinkEntity {
313313
onError: this._onAmqpError,
314314
onClose: this._onAmqpClose,
315315
onSessionError: this._onSessionError,
316-
onSessionClose: this._onSessionClose,
317-
sendTimeoutInSeconds: timeoutInMs / 1000
316+
onSessionClose: this._onSessionClose
318317
};
319318
logger.verbose("Creating sender with options: %O", srOptions);
320319
return srOptions;
@@ -403,9 +402,10 @@ export class EventHubSender extends LinkEntity {
403402
throw translate(e);
404403
}
405404

406-
sender.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
407405
try {
408-
const delivery = await sender.send(rheaMessage, undefined, 0x80013700, {
406+
const delivery = await sender.send(rheaMessage, {
407+
format: 0x80013700,
408+
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
409409
abortSignal
410410
});
411411
logger.info(
@@ -454,7 +454,7 @@ export class EventHubSender extends LinkEntity {
454454
const retryOptions = options.retryOptions || {};
455455
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
456456
retryOptions.timeoutInMs = timeoutInMs;
457-
const senderOptions = this._createSenderOptions(timeoutInMs);
457+
const senderOptions = this._createSenderOptions();
458458

459459
const startTime = Date.now();
460460
const createLinkPromise = async (): Promise<AwaitableSender> => {

sdk/eventhub/event-hubs/test/internal/eventdata.spec.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,12 @@ describe("EventData", function(): void {
8686
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
8787
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
8888
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
89-
testEventData.systemProperties!["x-iot-foo-prop"] = extraAnnotations["x-iot-foo-prop"];
90-
testEventData.systemProperties!["x-iot-bar-prop"] = extraAnnotations["x-iot-bar-prop"];
89+
testEventData.systemProperties!["x-iot-foo-prop"].should.eql(
90+
extraAnnotations["x-iot-foo-prop"]
91+
);
92+
testEventData.systemProperties!["x-iot-bar-prop"].should.eql(
93+
extraAnnotations["x-iot-bar-prop"]
94+
);
9195
});
9296

9397
it("returns systemProperties for special known properties", function(): void {
@@ -104,8 +108,8 @@ describe("EventData", function(): void {
104108
content_encoding: "utf-8",
105109
content_type: "application/json",
106110
correlation_id: "id2",
107-
absolute_expiry_time: 0,
108-
creation_time: 0,
111+
absolute_expiry_time: new Date(0),
112+
creation_time: new Date(0),
109113
group_id: "groupId",
110114
group_sequence: 1
111115
});
@@ -131,6 +135,41 @@ describe("EventData", function(): void {
131135
testEventData.systemProperties!["groupSequence"].should.equal(1);
132136
});
133137
});
138+
139+
it("deserializes Dates to numbers in properties and annotations", () => {
140+
const timestamp = new Date();
141+
const extraAnnotations = {
142+
"x-date": timestamp,
143+
"x-number": timestamp.getTime()
144+
};
145+
const testEventData = fromRheaMessage({
146+
body: testBody,
147+
application_properties: {
148+
topLevelDate: timestamp,
149+
child: {
150+
nestedDate: timestamp,
151+
children: [timestamp, { deepDate: timestamp }]
152+
}
153+
},
154+
message_annotations: {
155+
...testAnnotations,
156+
...extraAnnotations
157+
}
158+
});
159+
testEventData.enqueuedTimeUtc!.getTime().should.equal(testAnnotations["x-opt-enqueued-time"]);
160+
testEventData.offset!.should.equal(testAnnotations["x-opt-offset"]);
161+
testEventData.sequenceNumber!.should.equal(testAnnotations["x-opt-sequence-number"]);
162+
testEventData.partitionKey!.should.equal(testAnnotations["x-opt-partition-key"]);
163+
testEventData.systemProperties!["x-date"].should.eql(extraAnnotations["x-date"].getTime());
164+
testEventData.systemProperties!["x-number"].should.eql(extraAnnotations["x-number"]);
165+
testEventData.properties!.should.eql({
166+
topLevelDate: timestamp.getTime(),
167+
child: {
168+
nestedDate: timestamp.getTime(),
169+
children: [timestamp.getTime(), { deepDate: timestamp.getTime() }]
170+
}
171+
});
172+
});
134173
});
135174

136175
describe("toAmqpMessage", function(): void {

0 commit comments

Comments
 (0)