Skip to content

Commit 94fba96

Browse files
connieysrnagar
andauthored
Rename to SchemaRegistryApacheAvroSerializer. Group name is optional. (Azure#24622)
* Rename parameter from schemaFormat -> format. * Updating schemaId -> Id. * Updating schemaId -> Id. * Update sample to mention getting properties. * Rename to SchemaRegistryApacheAvroSerializer. * Move optional parameters to options bag. * Renaming to ApacheAvroSerializerBuilder * Using Context.NONE if context is not passed in. Co-authored-by: Srikanta <51379715+srnagar@users.noreply.github.com>
1 parent 8eeccb2 commit 94fba96

File tree

16 files changed

+284
-168
lines changed

16 files changed

+284
-168
lines changed
Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,33 @@
2424
import static com.azure.core.util.FluxUtil.monoError;
2525

2626
/**
27-
* Schema Registry-based serializer implementation for Avro data format.
27+
* Schema Registry-based serializer implementation for Avro data format using Apache Avro.
2828
*/
29-
public final class SchemaRegistryAvroSerializer implements ObjectSerializer {
29+
public final class SchemaRegistryApacheAvroSerializer implements ObjectSerializer {
3030

3131
static final byte[] RECORD_FORMAT_INDICATOR = new byte[]{0x00, 0x00, 0x00, 0x00};
3232
static final int SCHEMA_ID_SIZE = 32;
3333
static final int RECORD_FORMAT_INDICATOR_SIZE = 4;
3434

35-
private final ClientLogger logger = new ClientLogger(SchemaRegistryAvroSerializer.class);
35+
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializer.class);
3636
private final SchemaRegistryAsyncClient schemaRegistryClient;
3737
private final AvroSerializer avroSerializer;
38-
private final String schemaGroup;
39-
private final boolean autoRegisterSchemas;
38+
private final SerializerOptions serializerOptions;
4039

41-
SchemaRegistryAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
42-
AvroSerializer avroSerializer, String schemaGroup, boolean autoRegisterSchemas) {
40+
/**
41+
* Creates a new instance.
42+
*
43+
* @param schemaRegistryClient Client that interacts with Schema Registry.
44+
* @param avroSerializer Serializer implemented using Apache Avro.
45+
* @param serializerOptions Options to configure the serializer with.
46+
*/
47+
SchemaRegistryApacheAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
48+
AvroSerializer avroSerializer, SerializerOptions serializerOptions) {
4349
this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
4450
"'schemaRegistryClient' cannot be null.");
4551
this.avroSerializer = Objects.requireNonNull(avroSerializer,
4652
"'avroSchemaRegistryUtils' cannot be null.");
47-
this.schemaGroup = Objects.requireNonNull(schemaGroup, "'schemaGroup' cannot be null.");
48-
this.autoRegisterSchemas = autoRegisterSchemas;
53+
this.serializerOptions = Objects.requireNonNull(serializerOptions, "'serializerOptions' cannot be null.");
4954
}
5055

5156
/**
@@ -166,7 +171,7 @@ public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
166171
return monoError(logger, exception);
167172
}
168173

169-
return this.maybeRegisterSchema(this.schemaGroup, schema.getFullName(), schema.toString())
174+
return this.maybeRegisterSchema(serializerOptions.getSchemaGroup(), schema.getFullName(), schema.toString())
170175
.handle((id, sink) -> {
171176
ByteBuffer recordFormatIndicatorBuffer = ByteBuffer
172177
.allocate(RECORD_FORMAT_INDICATOR_SIZE)
@@ -196,13 +201,13 @@ public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
196201
* @return string representation of schema ID
197202
*/
198203
private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName, String schemaString) {
199-
if (this.autoRegisterSchemas) {
204+
if (serializerOptions.autoRegisterSchemas()) {
200205
return this.schemaRegistryClient
201206
.registerSchema(schemaGroup, schemaName, schemaString, SchemaFormat.AVRO)
202-
.map(SchemaProperties::getSchemaId);
207+
.map(SchemaProperties::getId);
203208
} else {
204209
return this.schemaRegistryClient.getSchemaProperties(
205-
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO).map(properties -> properties.getSchemaId());
210+
schemaGroup, schemaName, schemaString, SchemaFormat.AVRO).map(properties -> properties.getId());
206211
}
207212
}
208213

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
import org.apache.avro.specific.SpecificRecord;
1212

1313
/**
14-
* The builder implementation for building {@link SchemaRegistryAvroSerializer}.
14+
* The builder implementation for building {@link SchemaRegistryApacheAvroSerializer}.
1515
*
16-
* @see SchemaRegistryAvroSerializer
16+
* @see SchemaRegistryApacheAvroSerializer
1717
*/
18-
public final class SchemaRegistryAvroSerializerBuilder {
18+
public final class SchemaRegistryApacheAvroSerializerBuilder {
1919
private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false;
20+
private static final int MAX_CACHE_SIZE = 128;
2021

2122
private Boolean autoRegisterSchemas;
2223
private Boolean avroSpecificReader;
@@ -26,22 +27,23 @@ public final class SchemaRegistryAvroSerializerBuilder {
2627
/**
2728
* Instantiates instance of Builder class. Supplies client defaults.
2829
*/
29-
public SchemaRegistryAvroSerializerBuilder() {
30+
public SchemaRegistryApacheAvroSerializerBuilder() {
3031
this.autoRegisterSchemas = false;
3132
this.avroSpecificReader = false;
3233
}
3334

3435
/**
35-
* Specifies schema group for interacting with Azure Schema Registry service.
36+
* Specifies schema group for interacting with Azure Schema Registry service. This is optional unless
37+
* {@link #autoRegisterSchema(boolean) autoRegisterSchema} is set to {@code true}.
3638
*
3739
* If auto-registering schemas, schema will be stored under this group. If not auto-registering, serializer will
3840
* request schema ID for matching data schema under specified group.
3941
*
4042
* @param schemaGroup Azure Schema Registry schema group
4143
*
42-
* @return updated {@link SchemaRegistryAvroSerializerBuilder} instance
44+
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
4345
*/
44-
public SchemaRegistryAvroSerializerBuilder schemaGroup(String schemaGroup) {
46+
public SchemaRegistryApacheAvroSerializerBuilder schemaGroup(String schemaGroup) {
4547
this.schemaGroup = schemaGroup;
4648
return this;
4749
}
@@ -57,9 +59,9 @@ public SchemaRegistryAvroSerializerBuilder schemaGroup(String schemaGroup) {
5759
*
5860
* @param autoRegisterSchemas flag for schema auto-registration
5961
*
60-
* @return updated {@link SchemaRegistryAvroSerializerBuilder} instance
62+
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
6163
*/
62-
public SchemaRegistryAvroSerializerBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
64+
public SchemaRegistryApacheAvroSerializerBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
6365
this.autoRegisterSchemas = autoRegisterSchemas;
6466
return this;
6567
}
@@ -71,9 +73,9 @@ public SchemaRegistryAvroSerializerBuilder autoRegisterSchema(boolean autoRegist
7173
* @param avroSpecificReader {@code true} to deserialize into {@link SpecificRecord} via {@link
7274
* SpecificDatumReader}; {@code false} otherwise.
7375
*
74-
* @return updated {@link SchemaRegistryAvroSerializerBuilder} instance.
76+
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
7577
*/
76-
public SchemaRegistryAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
78+
public SchemaRegistryApacheAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
7779
this.avroSpecificReader = avroSpecificReader;
7880
return this;
7981
}
@@ -83,31 +85,32 @@ public SchemaRegistryAvroSerializerBuilder avroSpecificReader(boolean avroSpecif
8385
*
8486
* @param schemaRegistryAsyncClient The {@link SchemaRegistryAsyncClient}.
8587
*
86-
* @return updated {@link SchemaRegistryAvroSerializerBuilder} instance.
88+
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
8789
*/
88-
public SchemaRegistryAvroSerializerBuilder schemaRegistryAsyncClient(
90+
public SchemaRegistryApacheAvroSerializerBuilder schemaRegistryAsyncClient(
8991
SchemaRegistryAsyncClient schemaRegistryAsyncClient) {
9092
this.schemaRegistryAsyncClient = schemaRegistryAsyncClient;
9193
return this;
9294
}
9395

9496
/**
95-
* Instantiates SchemaRegistry Avro serializer.
97+
* Creates a new instance of Schema Registry serializer.
9698
*
97-
* @return {@link SchemaRegistryAvroSerializer} instance
99+
* @return A new instance of {@link SchemaRegistryApacheAvroSerializer}.
98100
*
99101
* @throws NullPointerException if {@link #schemaRegistryAsyncClient(SchemaRegistryAsyncClient)} is {@code null}
100102
* or {@link #schemaGroup(String) schemaGroup} is {@code null}.
101103
* @throws IllegalArgumentException if credential is not set.
102104
*/
103-
public SchemaRegistryAvroSerializer buildSerializer() {
105+
public SchemaRegistryApacheAvroSerializer buildSerializer() {
104106
final boolean isAutoRegister = autoRegisterSchemas != null && autoRegisterSchemas;
105107
final boolean useAvroSpecificReader = avroSpecificReader == null
106108
? AVRO_SPECIFIC_READER_DEFAULT : avroSpecificReader;
107109
final Schema.Parser parser = new Schema.Parser();
108110
final AvroSerializer codec = new AvroSerializer(useAvroSpecificReader, parser,
109111
EncoderFactory.get(), DecoderFactory.get());
112+
final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE);
110113

111-
return new SchemaRegistryAvroSerializer(schemaRegistryAsyncClient, codec, schemaGroup, isAutoRegister);
114+
return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options);
112115
}
113116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.data.schemaregistry.avro;
5+
6+
import com.azure.core.annotation.Immutable;
7+
8+
/**
9+
* Package-private class that holds additional options when creating serializer.
10+
*/
11+
@Immutable
12+
class SerializerOptions {
13+
private final boolean autoRegisterSchemas;
14+
private final int maxCacheSize;
15+
private final String schemaGroup;
16+
17+
/**
18+
* Creates a new instance.
19+
*
20+
* @param schemaGroup Optional schema group when registering a schema is required.
21+
* @param autoRegisterSchemas {@code true} to register schema if it does not exist, {@code false} otherwise.
22+
* @param maxCacheSize The maximum cache size for the serializer.
23+
*/
24+
SerializerOptions(String schemaGroup, boolean autoRegisterSchemas, int maxCacheSize) {
25+
this.schemaGroup = schemaGroup;
26+
this.autoRegisterSchemas = autoRegisterSchemas;
27+
this.maxCacheSize = maxCacheSize;
28+
}
29+
30+
/**
31+
* Gets whether or not to auto-register schemas.
32+
*
33+
* @return {@code true} to register schema if it does not exist; {@code false} otherwise.
34+
*/
35+
boolean autoRegisterSchemas() {
36+
return autoRegisterSchemas;
37+
}
38+
39+
/**
40+
* Gets the maximum cache size.
41+
*
42+
* @return The maximum cache size.
43+
*/
44+
int getMaxCacheSize() {
45+
return maxCacheSize;
46+
}
47+
48+
/**
49+
* Gets the schema group to register schemas against.
50+
*
51+
* @return The schema group.
52+
*/
53+
String getSchemaGroup() {
54+
return schemaGroup;
55+
}
56+
}

sdk/schemaregistry/azure-data-schemaregistry-avro/src/samples/java/com/azure/data/schemaregistry/avro/ReadmeSamples.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@
2525
public class ReadmeSamples {
2626

2727
/**
28-
* Sample to demonstrate creation of {@link SchemaRegistryAvroSerializer}.
29-
* @return The {@link SchemaRegistryAvroSerializer}.
28+
* Sample to demonstrate creation of {@link SchemaRegistryApacheAvroSerializer}.
29+
* @return The {@link SchemaRegistryApacheAvroSerializer}.
3030
*/
31-
public SchemaRegistryAvroSerializer createAvroSchemaRegistrySerializer() {
31+
public SchemaRegistryApacheAvroSerializer createAvroSchemaRegistrySerializer() {
3232
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
3333

3434
SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
3535
.fullyQualifiedNamespace("{schema-registry-endpoint")
3636
.credential(tokenCredential)
3737
.buildAsyncClient();
3838

39-
SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
39+
SchemaRegistryApacheAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryApacheAvroSerializerBuilder()
4040
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
4141
.schemaGroup("{schema-group}")
4242
.buildSerializer();
@@ -48,7 +48,7 @@ public SchemaRegistryAvroSerializer createAvroSchemaRegistrySerializer() {
4848
* Serialize a strongly-typed object into avro payload compatible with schema registry.
4949
*/
5050
public void serializeSample() {
51-
SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = createAvroSchemaRegistrySerializer();
51+
SchemaRegistryApacheAvroSerializer schemaRegistryAvroSerializer = createAvroSchemaRegistrySerializer();
5252

5353
PlayingCard playingCard = new PlayingCard();
5454
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
@@ -65,7 +65,7 @@ public void serializeSample() {
6565
* Deserialize avro payload compatible with schema registry into a strongly-type object.
6666
*/
6767
public void deserializeSample() {
68-
SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = createAvroSchemaRegistrySerializer();
68+
SchemaRegistryApacheAvroSerializer schemaRegistryAvroSerializer = createAvroSchemaRegistrySerializer();
6969
InputStream inputStream = getSchemaRegistryAvroData();
7070
PlayingCard playingCard = schemaRegistryAvroSerializer.deserialize(inputStream,
7171
TypeReference.createInstance(PlayingCard.class));

sdk/schemaregistry/azure-data-schemaregistry-avro/src/samples/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserilizationSample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static void main(String[] args) throws IOException {
3636

3737
// Create the serializer instance by configuring the serializer with the schema registry client and
3838
// enabling auto registering of new schemas
39-
SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
39+
SchemaRegistryApacheAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryApacheAvroSerializerBuilder()
4040
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
4141
.schemaGroup("{schema-group}")
4242
.avroSpecificReader(true)

sdk/schemaregistry/azure-data-schemaregistry-avro/src/samples/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializationSample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.io.OutputStream;
1515

1616
/**
17-
* Sample to demonstrate using {@link SchemaRegistryAvroSerializer} for serialization of data.
17+
* Sample to demonstrate using {@link SchemaRegistryApacheAvroSerializer} for serialization of data.
1818
*/
1919
public class SchemaRegistryAvroSerializationSample {
2020
/**
@@ -34,7 +34,7 @@ public static void main(String[] args) {
3434

3535
// Create the serializer instance by configuring the serializer with the schema registry client and
3636
// enabling auto registering of new schemas
37-
SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()
37+
SchemaRegistryApacheAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryApacheAvroSerializerBuilder()
3838
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
3939
.schemaGroup("{schema-group}")
4040
.avroSpecificReader(true)

0 commit comments

Comments
 (0)