Skip to content

Commit 93444da

Browse files
authored
[core-amqp][event-hubs][service-bus] move DataTransformer from core-amqp to client packages (Azure#12415)
Part of the list of breaking changes to core-amqp v2 in Azure#12116 Replaces Azure#12320 (precipitated by Azure#12320 (comment)) This change moves the `DataTransformer` interface and `DefaultDataTransformer` class to service-bus and event-hub packages. When we establish what our data serde strategy is, we can revisit using a shared common serde solution.
1 parent 3dac5ee commit 93444da

23 files changed

+328
-97
lines changed

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
## 2.0.0 (Unreleased)
44

5-
### Breaking Changes
5+
### Breaking changes
66

7+
- Continuing our work to clean the public API surface that we started in 2.0.0-beta.1, `DataTransformer` and `DefaultDataTransformer` are no longer exported.
8+
`dataTransformer` has been removed from `ConnectionContextBase` and `ConnectionContextBaseParameters`.
9+
This allows us to consider other forms of implementing serializers in the future.
710
- Previously, `ConnectionConfig.validate()` overridden entityPath if `undefined` with `String(undefined) = "undefined"`. This has been updated to retain `undefined` in the validation.
811
[PR 12321](https://github.com/Azure/azure-sdk-for-js/pull/12321)
912

sdk/core/core-amqp/review/core-amqp.api.md

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ export interface ConnectionContextBase {
232232
connection: Connection;
233233
connectionId: string;
234234
connectionLock: string;
235-
dataTransformer: DataTransformer;
236235
negotiateClaimLock: string;
237236
refreshConnection: () => void;
238237
wasConnectionCloseCalled: boolean;
@@ -373,23 +372,10 @@ export const Constants: {
373372
export interface CreateConnectionContextBaseParameters {
374373
config: ConnectionConfig;
375374
connectionProperties: ConnectionProperties;
376-
dataTransformer?: DataTransformer;
377375
isEntityPathRequired?: boolean;
378376
operationTimeoutInMs?: number;
379377
}
380378

381-
// @public
382-
export interface DataTransformer {
383-
decode: (body: any) => any;
384-
encode: (body: any) => any;
385-
}
386-
387-
// @public
388-
export class DefaultDataTransformer implements DataTransformer {
389-
decode(body: any): any;
390-
encode(body: any): any;
391-
}
392-
393379
// @public
394380
export const defaultLock: AsyncLock;
395381

sdk/core/core-amqp/src/ConnectionContextBase.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
import { Connection, ConnectionOptions, generate_uuid } from "rhea-promise";
55
import { CbsClient } from "./cbs";
6-
import { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
76
import { ConnectionConfig } from "./connectionConfig/connectionConfig";
87

98
import { Constants } from "./util/constants";
@@ -44,12 +43,6 @@ export interface ConnectionContextBase {
4443
* called on the connection object.
4544
*/
4645
wasConnectionCloseCalled: boolean;
47-
/**
48-
* @property {DataTransformer} dataTransformer A DataTransformer object that has methods named
49-
* - encode Responsible for encoding the AMQP message before sending it on the wire.
50-
* - decode Responsible for decoding the received AMQP message before passing it to the customer.
51-
*/
52-
dataTransformer: DataTransformer;
5346
/**
5447
* @property {CbsClient} cbsSession A reference to the cbs session ($cbs endpoint) on the
5548
* underlying AMQP connection for the EventHub Client.
@@ -95,11 +88,6 @@ export interface CreateConnectionContextBaseParameters {
9588
* the AMQP connection.
9689
*/
9790
connectionProperties: ConnectionProperties;
98-
/**
99-
* @property {DataTransformer} [dataTransformer] The datatransformer to be used for encoding and
100-
* decoding messages. Default value: DefaultDataTransformer
101-
*/
102-
dataTransformer?: DataTransformer;
10391
/**
10492
* @property {boolean} [isEntityPathRequired] Determines whether entity path should be a part of
10593
* the connection config. If `true` it must be present, `false` otherwise. Default value false.
@@ -178,7 +166,6 @@ export const ConnectionContextBase = {
178166
connectionId: connection.id,
179167
cbsSession: new CbsClient(connection, connectionLock),
180168
config: parameters.config,
181-
dataTransformer: parameters.dataTransformer || new DefaultDataTransformer(),
182169
refreshConnection() {
183170
const connection = new Connection(connectionOptions);
184171
const connectionLock = `${Constants.establishConnection}-${generate_uuid()}`;

sdk/core/core-amqp/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
export { RequestResponseLink, SendRequestOptions } from "./requestResponseLink";
77
export { retry, RetryOptions, RetryConfig, RetryOperationType, RetryMode } from "./retry";
8-
export { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
98
export { TokenType } from "./auth/token";
109

1110
export { ConnectionConfig, ConnectionConfigOptions } from "./connectionConfig/connectionConfig";

sdk/core/core-amqp/test/context.spec.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import * as chai from "chai";
55
const should = chai.should();
6-
import { CbsClient, ConnectionConfig, ConnectionContextBase, DefaultDataTransformer } from "../src";
6+
import { CbsClient, ConnectionConfig, ConnectionContextBase } from "../src";
77
import { Connection } from "rhea-promise";
88
import { isNode } from "../src/util/utils";
99

@@ -26,14 +26,12 @@ describe("ConnectionContextBase", function() {
2626
should.exist(context.connectionId);
2727
should.exist(context.connectionLock);
2828
should.exist(context.negotiateClaimLock);
29-
should.exist(context.dataTransformer);
3029
context.wasConnectionCloseCalled.should.equal(false);
3130
context.connection.should.instanceOf(Connection);
3231
context.connection.options.properties!.product.should.equal("MSJSClient");
3332
context.connection.options.properties!["user-agent"].should.equal("/js-amqp-client");
3433
context.connection.options.properties!.version.should.equal("1.0.0");
3534
context.cbsSession.should.instanceOf(CbsClient);
36-
context.dataTransformer.should.instanceOf(DefaultDataTransformer);
3735
done();
3836
});
3937

@@ -133,7 +131,6 @@ describe("ConnectionContextBase", function() {
133131
should.exist(context.connectionId);
134132
should.exist(context.connectionLock);
135133
should.exist(context.negotiateClaimLock);
136-
should.exist(context.dataTransformer);
137134
context.wasConnectionCloseCalled.should.equal(false);
138135
context.cbsSession.should.instanceOf(CbsClient);
139136

sdk/eventhub/event-hubs/rollup.base.config.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ export function browserConfig(test = false) {
122122

123123
cjs({
124124
namedExports: {
125-
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"]
125+
"@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"],
126+
chai: ["should", "assert"],
127+
assert: ["equal", "deepEqual", "notEqual"]
126128
}
127129
}),
128130

sdk/core/core-amqp/src/dataTransformer.ts renamed to sdk/eventhub/event-hubs/src/dataTransformer.ts

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,16 @@
22
// Licensed under the MIT license.
33

44
import { message } from "rhea-promise";
5-
import { logErrorStackTrace, logger } from "./log";
65
import isBuffer from "is-buffer";
76
import { Buffer } from "buffer";
8-
9-
/**
10-
* Describes the transformations that can be performed to encode/decode the data before sending it
11-
* on (or receiving it from) the wire.
12-
*/
13-
export interface DataTransformer {
14-
/**
15-
* @property {Function} encode A function that takes the body property from an EventData object
16-
* and returns an encoded body (some form of AMQP type).
17-
*/
18-
encode: (body: any) => any;
19-
/**
20-
* @property {Function} decode A function that takes the body property from an AMQP message
21-
* and returns the decoded message body. If it cannot decode the body then it returns the body
22-
* as-is.
23-
*/
24-
decode: (body: any) => any;
25-
}
7+
import { logErrorStackTrace, logger } from "./log";
268

279
/**
2810
* The default data transformer that will be used by the Azure SDK.
11+
* @internal
12+
* @ingore
2913
*/
30-
export class DefaultDataTransformer implements DataTransformer {
14+
export const defaultDataTransformer = {
3115
/**
3216
* A function that takes the body property from an EventData object
3317
* and returns an encoded body (some form of AMQP type).
@@ -62,7 +46,7 @@ export class DefaultDataTransformer implements DataTransformer {
6246
}
6347
}
6448
return result;
65-
}
49+
},
6650

6751
/**
6852
* @property {Function} [decode] A function that takes the body property from an AMQP message
@@ -99,4 +83,4 @@ export class DefaultDataTransformer implements DataTransformer {
9983
}
10084
return processedBody;
10185
}
102-
}
86+
};

sdk/eventhub/event-hubs/src/eventDataBatch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { throwTypeErrorIfParameterMissing } from "./util/error";
88
import { Span, SpanContext } from "@opentelemetry/api";
99
import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData";
1010
import { createMessageSpan } from "./diagnostics/messageSpan";
11+
import { defaultDataTransformer } from "./dataTransformer";
1112

1213
/**
1314
* The amount of bytes to reserve as overhead for a small message.
@@ -303,7 +304,7 @@ export class EventDataBatchImpl implements EventDataBatch {
303304

304305
// Convert EventData to RheaMessage.
305306
const amqpMessage = toRheaMessage(eventData, this._partitionKey);
306-
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
307+
amqpMessage.body = defaultDataTransformer.encode(eventData.body);
307308
const encodedMessage = message.encode(amqpMessage);
308309

309310
let currentSize = this._sizeInBytes;

sdk/eventhub/event-hubs/src/eventHubReceiver.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { ConnectionContext } from "./connectionContext";
2525
import { LinkEntity } from "./linkEntity";
2626
import { EventPosition, getEventPositionFilter } from "./eventPosition";
2727
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
28+
import { defaultDataTransformer } from "./dataTransformer";
2829

2930
/**
3031
* @ignore
@@ -230,7 +231,7 @@ export class EventHubReceiver extends LinkEntity {
230231

231232
const data: EventDataInternal = fromRheaMessage(context.message);
232233
const receivedEventData: ReceivedEventData = {
233-
body: this._context.dataTransformer.decode(context.message.body),
234+
body: defaultDataTransformer.decode(context.message.body),
234235
properties: data.properties,
235236
offset: data.offset!,
236237
sequenceNumber: data.sequenceNumber!,

sdk/eventhub/event-hubs/src/eventHubSender.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { SendOptions } from "./models/public";
3131
import { getRetryAttemptTimeoutInMs } from "./util/retries";
3232
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
3333
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
34+
import { defaultDataTransformer } from "./dataTransformer";
3435

3536
/**
3637
* Describes the EventHubSender that will send event data to EventHub.
@@ -322,7 +323,7 @@ export class EventHubSender extends LinkEntity {
322323
// Convert EventData to RheaMessage.
323324
for (let i = 0; i < events.length; i++) {
324325
const message = toRheaMessage(events[i], partitionKey);
325-
message.body = this._context.dataTransformer.encode(events[i].body);
326+
message.body = defaultDataTransformer.encode(events[i].body);
326327
messages[i] = message;
327328
}
328329
// Encode every amqp message and then convert every encoded message to amqp data section

0 commit comments

Comments
 (0)