Skip to content

Commit 7cc7621

Browse files
authored
[SchemaRegistry] add auto register schemas (Azure#20479)
fixes: Azure#20306
1 parent 04b0f97 commit 7cc7621

File tree

6 files changed

+159
-13
lines changed

6 files changed

+159
-13
lines changed

sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- `auto_register_schemas` keyword argument has been added to `SchemaRegistryAvroSerializer`, which will allow for automatically registering schemas passed in to the `serialize`.
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,23 @@ class SchemaRegistryAvroSerializer(object):
4040
which is used to register schema and retrieve schema from the service.
4141
:type schema_registry: ~azure.schemaregistry.SchemaRegistryClient
4242
:param str schema_group: Schema group under which schema should be registered.
43+
:keyword bool auto_register_schemas: When true, register new schemas passed to serialize.
44+
Otherwise, and by default, fail if it has not been pre-registered in the registry.
4345
:keyword str codec: The writer codec. If None, let the avro library decides.
4446
4547
"""
48+
4649
def __init__(self, schema_registry, schema_group, **kwargs):
4750
# type: ("SchemaRegistryClient", str, Any) -> None
4851
self._schema_group = schema_group
4952
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
5053
self._schema_registry_client = schema_registry # type: "SchemaRegistryClient"
54+
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
55+
self._auto_register_schema_func = (
56+
self._schema_registry_client.register_schema
57+
if self._auto_register_schemas
58+
else self._schema_registry_client.get_schema_id
59+
)
5160
self._id_to_schema = {}
5261
self._schema_to_id = {}
5362
self._user_input_schema_cache = {}
@@ -63,7 +72,7 @@ def __exit__(self, *exc_details):
6372

6473
def close(self):
6574
# type: () -> None
66-
""" This method is to close the sockets opened by the client.
75+
"""This method is to close the sockets opened by the client.
6776
It need not be used when using with a context manager.
6877
"""
6978
self._schema_registry_client.close()
@@ -85,12 +94,8 @@ def _get_schema_id(self, schema_name, schema, **kwargs):
8594
try:
8695
return self._schema_to_id[schema_str]
8796
except KeyError:
88-
schema_id = self._schema_registry_client.register_schema(
89-
self._schema_group,
90-
schema_name,
91-
"Avro",
92-
schema_str,
93-
**kwargs
97+
schema_id = self._auto_register_schema_func(
98+
self._schema_group, schema_name, "Avro", schema_str, **kwargs
9499
).schema_id
95100
self._schema_to_id[schema_str] = schema_id
96101
self._id_to_schema[schema_id] = schema_str
@@ -108,7 +113,9 @@ def _get_schema(self, schema_id, **kwargs):
108113
try:
109114
return self._id_to_schema[schema_id]
110115
except KeyError:
111-
schema_str = self._schema_registry_client.get_schema(schema_id, **kwargs).schema_content
116+
schema_str = self._schema_registry_client.get_schema(
117+
schema_id, **kwargs
118+
).schema_content
112119
self._id_to_schema[schema_id] = schema_str
113120
self._schema_to_id[schema_str] = schema_id
114121
return schema_str
@@ -133,14 +140,14 @@ def serialize(self, data, schema, **kwargs):
133140
self._user_input_schema_cache[raw_input_schema] = parsed_schema
134141
cached_schema = parsed_schema
135142

136-
record_format_identifier = b'\0\0\0\0'
143+
record_format_identifier = b"\0\0\0\0"
137144
schema_id = self._get_schema_id(cached_schema.fullname, cached_schema, **kwargs)
138145
data_bytes = self._avro_serializer.serialize(data, cached_schema)
139146

140147
stream = BytesIO()
141148

142149
stream.write(record_format_identifier)
143-
stream.write(schema_id.encode('utf-8'))
150+
stream.write(schema_id.encode("utf-8"))
144151
stream.write(data_bytes)
145152
stream.flush()
146153

@@ -157,8 +164,12 @@ def deserialize(self, data, **kwargs):
157164
:rtype: Dict[str, Any]
158165
"""
159166
# record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier.
160-
schema_id = data[SCHEMA_ID_START_INDEX:(SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)].decode('utf-8')
167+
schema_id = data[
168+
SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)
169+
].decode("utf-8")
161170
schema_content = self._get_schema(schema_id, **kwargs)
162171

163-
dict_data = self._avro_serializer.deserialize(data[DATA_START_INDEX:], schema_content)
172+
dict_data = self._avro_serializer.deserialize(
173+
data[DATA_START_INDEX:], schema_content
174+
)
164175
return dict_data

sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ interactions:
2828
content-type:
2929
- application/json
3030
date:
31-
- Wed, 18 Aug 2021 15:11:19 GMT
31+
- Wed, 01 Sep 2021 17:08:23 GMT
3232
location:
3333
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
3434
server:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
interactions:
2+
- request:
3+
body: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\",
4+
\"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\",
5+
\"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"],
6+
\"name\": \"favorite_color\"}]}"'
7+
headers:
8+
Accept:
9+
- application/json
10+
Accept-Encoding:
11+
- gzip, deflate
12+
Connection:
13+
- keep-alive
14+
Content-Length:
15+
- '265'
16+
Content-Type:
17+
- application/json
18+
User-Agent:
19+
- azsdk-python-azureschemaregistry/1.0.0b1 Python/3.9.0 (Windows-10-10.0.19041-SP0)
20+
X-Schema-Type:
21+
- Avro
22+
method: PUT
23+
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
24+
response:
25+
body:
26+
string: '{"id":"576838e0558c43f8b85cdaadbd4561f5"}'
27+
headers:
28+
content-type:
29+
- application/json
30+
date:
31+
- Wed, 01 Sep 2021 17:06:46 GMT
32+
location:
33+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
34+
server:
35+
- Microsoft-HTTPAPI/2.0
36+
strict-transport-security:
37+
- max-age=31536000
38+
transfer-encoding:
39+
- chunked
40+
x-schema-id:
41+
- 576838e0558c43f8b85cdaadbd4561f5
42+
x-schema-id-location:
43+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/576838e0558c43f8b85cdaadbd4561f5?api-version=2017-04
44+
x-schema-type:
45+
- Avro
46+
x-schema-version:
47+
- '1'
48+
x-schema-versions-location:
49+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04
50+
status:
51+
code: 200
52+
message: OK
53+
version: 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
interactions:
2+
- request:
3+
body: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\",
4+
\"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\",
5+
\"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"],
6+
\"name\": \"favorite_color\"}]}"'
7+
headers:
8+
Accept:
9+
- application/json
10+
Accept-Encoding:
11+
- gzip, deflate
12+
Connection:
13+
- keep-alive
14+
Content-Length:
15+
- '265'
16+
Content-Type:
17+
- application/json
18+
User-Agent:
19+
- azsdk-python-azureschemaregistry/1.0.0b1 Python/3.9.0 (Windows-10-10.0.19041-SP0)
20+
X-Schema-Type:
21+
- Avro
22+
method: POST
23+
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
24+
response:
25+
body:
26+
string: '{"id":"576838e0558c43f8b85cdaadbd4561f5"}'
27+
headers:
28+
content-type:
29+
- application/json
30+
date:
31+
- Wed, 01 Sep 2021 17:08:24 GMT
32+
location:
33+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
34+
server:
35+
- Microsoft-HTTPAPI/2.0
36+
strict-transport-security:
37+
- max-age=31536000
38+
transfer-encoding:
39+
- chunked
40+
x-schema-id:
41+
- 576838e0558c43f8b85cdaadbd4561f5
42+
x-schema-id-location:
43+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/576838e0558c43f8b85cdaadbd4561f5?api-version=2017-04
44+
x-schema-type:
45+
- Avro
46+
x-schema-version:
47+
- '1'
48+
x-schema-versions-location:
49+
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04
50+
status:
51+
code: 200
52+
message: OK
53+
version: 1

sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,33 @@ def test_raw_avro_serializer_negative(self):
7676

7777
@SchemaRegistryPowerShellPreparer()
7878
def test_basic_sr_avro_serializer(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
79+
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
80+
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group, auto_register_schemas=True)
81+
82+
schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
83+
schema = avro.schema.parse(schema_str)
84+
85+
dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
86+
encoded_data = sr_avro_serializer.serialize(dict_data, schema_str)
87+
88+
assert schema_str in sr_avro_serializer._user_input_schema_cache
89+
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id
90+
91+
assert encoded_data[0:4] == b'\0\0\0\0'
92+
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
93+
assert encoded_data[4:36] == schema_id.encode("utf-8")
94+
95+
assert schema_id in sr_avro_serializer._id_to_schema
96+
97+
decoded_data = sr_avro_serializer.deserialize(encoded_data)
98+
assert decoded_data["name"] == u"Ben"
99+
assert decoded_data["favorite_number"] == 7
100+
assert decoded_data["favorite_color"] == u"red"
101+
102+
sr_avro_serializer.close()
103+
104+
@SchemaRegistryPowerShellPreparer()
105+
def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
79106
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
80107
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group)
81108

0 commit comments

Comments
 (0)