Skip to content

Commit 12f7055

Browse files
authored
Fixes deserialization when reader/writer schemas are different. (Azure#26592)
* Add test case for portal. * Adding code to consider both reader and writer schemas. * Separating fields because there is ambiguity about generated class schemas. * Updating test cases to consider different reader/writer schemas. * Adding testcase for forwards compatibility checks. * Adding person2 java and avro file. * Adding recording file. * Specify avro files.
1 parent 02c6300 commit 12f7055

File tree

9 files changed

+891
-32
lines changed

9 files changed

+891
-32
lines changed

sdk/schemaregistry/azure-data-schemaregistry-apacheavro/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@
130130
<testSourceDirectory>src/samples/resources/avro/</testSourceDirectory>
131131
<testOutputDirectory>${project.basedir}/src/samples/java/</testOutputDirectory>
132132
<testIncludes>
133-
<testInclude>**/*.avro</testInclude>
133+
<testInclude>Person.avro</testInclude>
134+
<testInclude>HandOfCards.avro</testInclude>
134135
</testIncludes>
135136
<stringType>String</stringType>
136137
</configuration>

sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/AvroSerializer.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import java.io.ByteArrayOutputStream;
2424
import java.io.IOException;
2525
import java.io.UncheckedIOException;
26+
import java.lang.reflect.Constructor;
27+
import java.lang.reflect.InvocationTargetException;
2628
import java.nio.ByteBuffer;
2729
import java.nio.charset.StandardCharsets;
30+
import java.util.Arrays;
2831
import java.util.Collections;
2932
import java.util.HashMap;
3033
import java.util.Map;
3134
import java.util.Objects;
35+
import java.util.Optional;
3236

3337
/**
3438
* Class containing implementation of Apache Avro serializer
@@ -156,8 +160,8 @@ <T> T decode(byte[] bytes, byte[] schemaBytes, TypeReference<T> typeReference) {
156160
Objects.requireNonNull(bytes, "'bytes' must not be null.");
157161
Objects.requireNonNull(schemaBytes, "'schemaBytes' must not be null.");
158162

159-
String schemaString = new String(schemaBytes, StandardCharsets.UTF_8);
160-
Schema schemaObject = parseSchemaString(schemaString);
163+
final String schemaString = new String(schemaBytes, StandardCharsets.UTF_8);
164+
final Schema schemaObject = parseSchemaString(schemaString);
161165

162166
if (isSingleObjectEncoded(bytes)) {
163167
final BinaryMessageDecoder<T> messageDecoder = new BinaryMessageDecoder<>(SpecificData.get(), schemaObject);
@@ -229,6 +233,47 @@ static boolean isSingleObjectEncoded(byte[] schemaBytes) {
229233
return V1_HEADER[0] == schemaBytes[0] && V1_HEADER[1] == schemaBytes[1];
230234
}
231235

236+
/**
237+
* Gets the type's schema if there is one.
238+
*
239+
* @param clazz Class to get schema.
240+
* @param <T> The type of object.
241+
*
242+
* @return The {@link Schema} or {@code null} if it was not a GenericContainer, could not instantiate the type, or
243+
* there was no default constructor.
244+
*/
245+
<T> Schema getSchemaFromTypeReference(Class<T> clazz) {
246+
if (!GenericContainer.class.isAssignableFrom(clazz)) {
247+
return null;
248+
}
249+
250+
final Optional<Constructor<?>> defaultConstructor;
251+
try {
252+
defaultConstructor = Arrays.stream(clazz.getDeclaredConstructors())
253+
.filter(constructor -> constructor.getParameterCount() == 0)
254+
.findFirst();
255+
} catch (SecurityException e) {
256+
logger.info("Could not get declaring constructors for deserializing T ({}). Using writer schema.",
257+
clazz, e);
258+
return null;
259+
}
260+
261+
if (!defaultConstructor.isPresent()) {
262+
return null;
263+
}
264+
265+
Object instance = null;
266+
try {
267+
instance = defaultConstructor.get().newInstance();
268+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
269+
logger.info("Could not create new instance for deserializing T ({}). Using writer schema.", clazz, e);
270+
}
271+
272+
return instance instanceof GenericContainer
273+
? ((GenericContainer) instance).getSchema()
274+
: null;
275+
}
276+
232277
/**
233278
* Gets a schema for the given class if it is an Avro primitive type.
234279
*
@@ -256,10 +301,8 @@ private static Schema getPrimitiveSchema(Class<?> clazz) {
256301
*
257302
* @return correct Avro DatumReader object given encoder configuration
258303
*/
259-
@SuppressWarnings("unchecked")
260304
private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T> typeReference) {
261-
// Suppressing this warning because we know that the Type is a representation of the Class<T>
262-
final Class<T> clazz = (Class<T>) typeReference.getJavaType();
305+
final Class<T> clazz = typeReference.getJavaClass();
263306
final Schema primitiveSchema = getPrimitiveSchema(clazz);
264307

265308
if (primitiveSchema != null) {
@@ -270,6 +313,14 @@ private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T>
270313
}
271314
}
272315

316+
final Schema readerSchema = getSchemaFromTypeReference(clazz);
317+
if (readerSchema != null && !readerSchema.equals(writerSchema)) {
318+
logger.verbose("The writer schema is different than reader schema. Using reader schema. "
319+
+ "Writer: '{}'. Reader: '{}'", writerSchema, readerSchema);
320+
321+
return new SpecificDatumReader<>(writerSchema, readerSchema);
322+
}
323+
273324
if (SpecificRecord.class.isAssignableFrom(clazz)) {
274325
return new SpecificDatumReader<>(writerSchema);
275326
} else {

sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> ty
115115
new IllegalStateException("Illegal format: unsupported record format indicator in payload"));
116116
}
117117

118-
String schemaId = getSchemaIdFromPayload(buffer);
118+
final String schemaId = getSchemaIdFromPayload(buffer);
119119

120120
return this.schemaRegistryClient.getSchema(schemaId)
121121
.handle((registryObject, sink) -> {
@@ -171,7 +171,10 @@ public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
171171
return monoError(logger, exception);
172172
}
173173

174-
return this.maybeRegisterSchema(serializerOptions.getSchemaGroup(), schema.getFullName(), schema.toString())
174+
final String schemaFullName = schema.getFullName();
175+
final String schemaString = schema.toString();
176+
177+
return this.maybeRegisterSchema(serializerOptions.getSchemaGroup(), schemaFullName, schemaString)
175178
.handle((id, sink) -> {
176179
ByteBuffer recordFormatIndicatorBuffer = ByteBuffer
177180
.allocate(RECORD_FORMAT_INDICATOR_SIZE)

0 commit comments

Comments
 (0)