Skip to content

Commit e769a3a

Browse files
Wrap Avro exceptions and other fixes (Azure#27581)
* Wrap Avro exceptions * Remove preamble support * Add SchemaId to exception * Add troubleshooting docs * Update * Update * Improve error handling for BinaryContent types without public parameterless constructor * Recording files * Fix validation * Improve error handling * Update snippet * Typo * Fix
1 parent 5358c38 commit e769a3a

13 files changed

+416
-123
lines changed

sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/README.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,30 @@ Employee deserializedEmployee = await serializer.DeserializeAsync<Employee>(cont
131131

132132
## Troubleshooting
133133

134-
Information on troubleshooting steps will be provided as potential issues are discovered.
134+
If you encounter errors when communicating with the Schema Registry service, these errors will be thrown as a [RequestFailedException][request_failed_exception]. The serializer will only communicate with the service the first time it encounters a schema (when serializing) or a schema ID (when deserializing). Any errors related to serialization to Avro, or deserialization from Avro, will be thrown as a `AvroSerializationException`. The `InnerException` property will contain the underlying exception that was thrown from the Apache Avro library. When deserializing, the `SerializedSchemaId` property will contain the schema ID corresponding to the serialized data. Using our `Employee` schema example, if we add an `Employee_V2` model that adds a new required field, this would not be compatible with `Employee`. If the data we are attempting to deserialize may contain a schema that would not be compatible with our `Employee_V2` model, then we might write code like the following:
135+
136+
```C# Snippet:SchemaRegistryAvroException
137+
try
138+
{
139+
Employee_V2 employeeV2 = await serializer.DeserializeAsync<Employee_V2>(content);
140+
}
141+
catch (AvroSerializationException exception)
142+
{
143+
// When this exception occurs when deserializing, the exception message will contain the schema ID that was used to
144+
// serialize the data.
145+
Console.WriteLine(exception);
146+
147+
// We might also want to look up the specific schema from Schema Registry so that we can log the schema definition
148+
if (exception.SerializedSchemaId != null)
149+
{
150+
SchemaRegistrySchema schema = await client.GetSchemaAsync(exception.SerializedSchemaId);
151+
Console.WriteLine(schema.Definition);
152+
}
153+
}
154+
```
155+
156+
In general, any invalid Avro schemas would probably be caught during testing, but such schemas will also result in a `AvroSerializationException` being thrown when attempting to serialize using an invalid writer schema, or deserialize when using an invalid reader schema.
157+
135158

136159
## Next steps
137160

@@ -169,3 +192,4 @@ This project has adopted the [Microsoft Open Source Code of Conduct][code_of_con
169192
[specific_record]: https://avro.apache.org/docs/current/api/csharp/html/interfaceAvro_1_1Specific_1_1ISpecificRecord.html
170193
[azure_sub]: https://azure.microsoft.com/free/dotnet/
171194
[azure_schema_registry]: https://aka.ms/schemaregistry
195+
[request_failed_exception]: https://docs.microsoft.com/dotnet/api/azure.requestfailedexception?view=azure-dotnet

sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/api/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.netstandard2.0.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
22
{
3+
public partial class AvroSerializationException : System.Exception
4+
{
5+
public AvroSerializationException() { }
6+
protected AvroSerializationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
7+
public AvroSerializationException(string message) { }
8+
public AvroSerializationException(string message, System.Exception innerException) { }
9+
public AvroSerializationException(string message, string serializedSchemaId, System.Exception innerException) { }
10+
public string SerializedSchemaId { get { throw null; } set { } }
11+
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
12+
}
313
public partial class SchemaRegistryAvroSerializer
414
{
515
public SchemaRegistryAvroSerializer(Azure.Data.SchemaRegistry.SchemaRegistryClient client, string groupName, Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.SchemaRegistryAvroSerializerOptions options = null) { }
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Runtime.Serialization;
6+
using Azure.Core;
7+
8+
namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
9+
{
10+
/// <summary>
11+
/// Represents an exception that is thrown when Avro serialization or deserialization fails.
12+
/// </summary>
13+
[Serializable]
14+
public class AvroSerializationException : Exception
15+
{
16+
/// <summary>
17+
/// The Schema Registry schema Id related to the serialized data that caused the <see cref="AvroSerializationException"/>.
18+
/// </summary>
19+
public string SerializedSchemaId { get; set; }
20+
21+
/// <summary>
22+
/// Initializes a new instance of <see cref="AvroSerializationException"/>.
23+
/// </summary>
24+
public AvroSerializationException() : this(null, null)
25+
{
26+
}
27+
28+
/// <summary>
29+
/// Initializes a new instance of <see cref="AvroSerializationException"/>.
30+
/// </summary>
31+
/// <param name="message">The error message that explains the reason for the exception.</param>
32+
public AvroSerializationException(string message) : this(message, null)
33+
{
34+
}
35+
36+
/// <summary>
37+
/// Initializes a new instance of <see cref="AvroSerializationException"/>.
38+
/// </summary>
39+
/// <param name="message">The error message that explains the reason for the exception.</param>
40+
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference if no inner exception is specified.</param>
41+
public AvroSerializationException(string message, Exception innerException) : this(message, null, innerException)
42+
{
43+
}
44+
45+
/// <summary>
46+
/// Initializes a new instance of <see cref="AvroSerializationException"/>.
47+
/// </summary>
48+
/// <param name="message">The error message that explains the reason for the exception.</param>
49+
/// <param name="serializedSchemaId">The Schema Registry schema Id related to the <see cref="AvroSerializationException"/>.</param>
50+
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference if no inner exception is specified.</param>
51+
public AvroSerializationException(string message, string serializedSchemaId, Exception innerException) : base(message, innerException)
52+
{
53+
SerializedSchemaId = serializedSchemaId;
54+
}
55+
56+
/// <inheritdoc />
57+
protected AvroSerializationException(SerializationInfo info, StreamingContext context) : base(info, context)
58+
{
59+
SerializedSchemaId = info.GetString(nameof(SerializedSchemaId));
60+
}
61+
62+
/// <inheritdoc />
63+
public override void GetObjectData(SerializationInfo info, StreamingContext context)
64+
{
65+
Argument.AssertNotNull(info, nameof(info));
66+
67+
info.AddValue(nameof(SerializedSchemaId), SerializedSchemaId);
68+
base.GetObjectData(info, context);
69+
}
70+
}
71+
}

sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs

Lines changed: 82 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
using Azure.Data.SchemaRegistry;
1010
using System;
1111
using System.IO;
12-
using System.Linq;
13-
using System.Text;
1412
using System.Threading;
1513
using System.Threading.Tasks;
1614
using Azure;
@@ -29,7 +27,6 @@ public class SchemaRegistryAvroSerializer
2927
private readonly SchemaRegistryAvroSerializerOptions _options;
3028
private const string AvroMimeType = "avro/binary";
3129
private const int CacheCapacity = 128;
32-
private static readonly Encoding Utf8Encoding = new UTF8Encoding(false);
3330

3431
/// <summary>
3532
/// Initializes new instance of <see cref="SchemaRegistryAvroSerializer"/>.
@@ -41,10 +38,6 @@ public SchemaRegistryAvroSerializer(SchemaRegistryClient client, string groupNam
4138
_options = options;
4239
}
4340

44-
private static readonly byte[] EmptyRecordFormatIndicator = { 0, 0, 0, 0 };
45-
private const int RecordFormatIndicatorLength = 4;
46-
private const int SchemaIdLength = 32;
47-
private const int PayloadStartPosition = RecordFormatIndicatorLength + SchemaIdLength;
4841
private readonly LruCache<string, Schema> _idToSchemaMap = new(CacheCapacity);
4942
private readonly LruCache<Schema, string> _schemaToIdMap = new(CacheCapacity);
5043

@@ -125,12 +118,18 @@ internal async ValueTask<BinaryContent> SerializeInternalAsync(
125118
bool async,
126119
CancellationToken cancellationToken)
127120
{
121+
messageType ??= typeof(BinaryContent);
122+
if (messageType.GetConstructor(Type.EmptyTypes) == null)
123+
{
124+
throw new InvalidOperationException(
125+
$"The type {messageType} must have a public parameterless constructor in order to use it as the 'MessageContent' type to serialize to.");
126+
}
127+
var message = (BinaryContent)Activator.CreateInstance(messageType);
128+
128129
(string schemaId, BinaryData bd) = async
129130
? await SerializeInternalAsync(data, dataType, true, cancellationToken).ConfigureAwait(false)
130131
: SerializeInternalAsync(data, dataType, false, cancellationToken).EnsureCompleted();
131132

132-
messageType ??= typeof(BinaryContent);
133-
var message = (BinaryContent)Activator.CreateInstance(messageType);
134133
message.Data = bd;
135134
message.ContentType = $"{AvroMimeType}+{schemaId}";
136135
return message;
@@ -146,23 +145,31 @@ internal async ValueTask<BinaryContent> SerializeInternalAsync(
146145
dataType ??= value?.GetType() ?? typeof(object);
147146

148147
var supportedType = GetSupportedTypeOrThrow(dataType);
149-
var writer = GetWriterAndSchema(value, supportedType, out var schema);
150148

151-
using Stream stream = new MemoryStream();
152-
var binaryEncoder = new BinaryEncoder(stream);
149+
try
150+
{
151+
var writer = GetWriterAndSchema(value, supportedType, out var schema);
153152

154-
writer.Write(value, binaryEncoder);
155-
binaryEncoder.Flush();
156-
stream.Position = 0;
157-
BinaryData data = BinaryData.FromStream(stream);
153+
using Stream stream = new MemoryStream();
154+
var binaryEncoder = new BinaryEncoder(stream);
158155

159-
if (async)
160-
{
161-
return (await GetSchemaIdAsync(schema, true, cancellationToken).ConfigureAwait(false), data);
156+
writer.Write(value, binaryEncoder);
157+
binaryEncoder.Flush();
158+
stream.Position = 0;
159+
BinaryData data = BinaryData.FromStream(stream);
160+
161+
if (async)
162+
{
163+
return (await GetSchemaIdAsync(schema, true, cancellationToken).ConfigureAwait(false), data);
164+
}
165+
else
166+
{
167+
return (GetSchemaIdAsync(schema, false, cancellationToken).EnsureCompleted(), data);
168+
}
162169
}
163-
else
170+
catch (AvroException ex)
164171
{
165-
return (GetSchemaIdAsync(schema, false, cancellationToken).EnsureCompleted(), data);
172+
throw new AvroSerializationException("An error occurred while attempting to serialize to Avro.", ex);
166173
}
167174
}
168175

@@ -300,35 +307,18 @@ private async ValueTask<object> DeserializeMessageDataInternalAsync(
300307
Argument.AssertNotNull(data, nameof(data));
301308
Argument.AssertNotNull(contentType, nameof(contentType));
302309

303-
string schemaId;
304-
// Back Compat for first preview
305-
ReadOnlyMemory<byte> memory = data.ToMemory();
306-
byte[] recordFormatIdentifier = null;
307-
if (memory.Length >= RecordFormatIndicatorLength)
310+
string[] contentTypeArray = contentType.ToString().Split('+');
311+
if (contentTypeArray.Length != 2)
308312
{
309-
recordFormatIdentifier = memory.Slice(0, RecordFormatIndicatorLength).ToArray();
313+
throw new FormatException("Content type was not in the expected format of MIME type + schema ID");
310314
}
311-
if (recordFormatIdentifier != null && recordFormatIdentifier.SequenceEqual(EmptyRecordFormatIndicator))
315+
316+
if (contentTypeArray[0] != AvroMimeType)
312317
{
313-
byte[] schemaIdBytes = memory.Slice(RecordFormatIndicatorLength, SchemaIdLength).ToArray();
314-
schemaId = Utf8Encoding.GetString(schemaIdBytes);
315-
data = new BinaryData(memory.Slice(PayloadStartPosition, memory.Length - PayloadStartPosition));
318+
throw new InvalidOperationException("An avro serializer may only be used on content that is of 'avro/binary' type");
316319
}
317-
else
318-
{
319-
string[] contentTypeArray = contentType.ToString().Split('+');
320-
if (contentTypeArray.Length != 2)
321-
{
322-
throw new FormatException("Content type was not in the expected format of MIME type + schema ID");
323-
}
324320

325-
if (contentTypeArray[0] != AvroMimeType)
326-
{
327-
throw new InvalidOperationException("An avro serializer may only be used on content that is of 'avro/binary' type");
328-
}
329-
330-
schemaId = contentTypeArray[1];
331-
}
321+
string schemaId = contentTypeArray[1];
332322

333323
if (async)
334324
{
@@ -351,27 +341,63 @@ private async ValueTask<object> DeserializeInternalAsync(
351341
SupportedType supportedType = GetSupportedTypeOrThrow(dataType);
352342

353343
Schema writerSchema;
354-
if (async)
344+
try
355345
{
356-
writerSchema = await GetSchemaByIdAsync(schemaId, true, cancellationToken).ConfigureAwait(false);
346+
if (async)
347+
{
348+
writerSchema = await GetSchemaByIdAsync(schemaId, true, cancellationToken).ConfigureAwait(false);
349+
}
350+
else
351+
{
352+
writerSchema = GetSchemaByIdAsync(schemaId, false, cancellationToken).EnsureCompleted();
353+
}
357354
}
358-
else
355+
catch (SchemaParseException ex)
359356
{
360-
writerSchema = GetSchemaByIdAsync(schemaId, false, cancellationToken).EnsureCompleted();
357+
throw new AvroSerializationException(
358+
$"An error occurred while attempting to parse the schema (schema ID: {schemaId}) that was used to serialize the Avro. " +
359+
$"Make sure that the schema represents valid Avro.",
360+
schemaId,
361+
ex);
361362
}
362363

363-
var binaryDecoder = new BinaryDecoder(data.ToStream());
364+
Schema readerSchema;
365+
object returnInstance = null;
366+
try
367+
{
368+
if (supportedType == SupportedType.SpecificRecord)
369+
{
370+
returnInstance = Activator.CreateInstance(dataType);
371+
readerSchema = ((ISpecificRecord)returnInstance).Schema;
372+
}
373+
else
374+
{
375+
readerSchema = writerSchema;
376+
}
377+
}
378+
catch (SchemaParseException ex)
379+
{
380+
throw new AvroSerializationException(
381+
"An error occurred while attempting to parse the schema that you are attempting to deserialize the data with. " +
382+
"Make sure that the schema represents valid Avro.",
383+
schemaId,
384+
ex);
385+
}
364386

365-
if (supportedType == SupportedType.SpecificRecord)
387+
try
366388
{
367-
object returnInstance = Activator.CreateInstance(dataType);
368-
DatumReader<object> reader = GetReader(writerSchema, ((ISpecificRecord)returnInstance).Schema, SupportedType.SpecificRecord);
389+
var binaryDecoder = new BinaryDecoder(data.ToStream());
390+
DatumReader<object> reader = GetReader(writerSchema, readerSchema, supportedType);
369391
return reader.Read(reuse: returnInstance, binaryDecoder);
370392
}
371-
else
393+
catch (AvroException ex)
372394
{
373-
DatumReader<object> reader = GetReader(writerSchema, writerSchema, supportedType);
374-
return reader.Read(reuse: null, binaryDecoder);
395+
throw new AvroSerializationException(
396+
"An error occurred while attempting to deserialize " +
397+
$"Avro that was serialized with schemaId: {schemaId}. The schema used to deserialize the data may not be compatible with the schema that was used" +
398+
$"to serialize the data. Please ensure that the schemas are compatible.",
399+
schemaId,
400+
ex);
375401
}
376402
}
377403

0 commit comments

Comments
 (0)