Skip to content

Commit db6bcaa

Browse files
committed
alignment with kafka v.2.5.0 and latest Schema Registry - ParsedSchema, AvroSchema
1 parent e0d45b3 commit db6bcaa

File tree

3 files changed

+51
-45
lines changed

3 files changed

+51
-45
lines changed

src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import io.confluent.kafka.schemaregistry.ParsedSchema;
2627
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
2728
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
2829
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
@@ -63,7 +64,7 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans
6364

6465
private CachedSchemaRegistryClient sourceSchemaRegistryClient;
6566
private CachedSchemaRegistryClient destSchemaRegistryClient;
66-
private SubjectNameStrategy<org.apache.avro.Schema> subjectNameStrategy;
67+
private SubjectNameStrategy subjectNameStrategy;
6768
private boolean transferKeys, includeHeaders;
6869

6970
// caches from the source registry to the destination registry
@@ -213,7 +214,7 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean
213214
try {
214215
log.trace("Looking up schema id {} in source registry", sourceSchemaId);
215216
// Can't do getBySubjectAndId because that requires a Schema object for the strategy
216-
schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId);
217+
schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId);
217218
} catch (IOException | RestClientException e) {
218219
log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e);
219220
throw new ConnectException(e);
@@ -257,12 +258,12 @@ interface ConfigName {
257258

258259
private static class SchemaAndId {
259260
private Integer id;
260-
private org.apache.avro.Schema schema;
261+
private ParsedSchema schema;
261262

262263
SchemaAndId() {
263264
}
264265

265-
SchemaAndId(int id, org.apache.avro.Schema schema) {
266+
SchemaAndId(int id, ParsedSchema schema) {
266267
this.id = id;
267268
this.schema = schema;
268269
}

src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
1818

19+
import io.confluent.kafka.schemaregistry.ParsedSchema;
20+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
1921
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
2022
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
2123
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
@@ -152,22 +154,22 @@ public void beforeEach(final ExtensionContext context) {
152154
.willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName())));
153155
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(CONFIG_PATTERN))
154156
.willReturn(WireMock.aResponse().withTransformers(this.getConfigHandler.getName())));
155-
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+"))
157+
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+/(?:fetchMaxId=false)"))
156158
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
157159
}
158160

159-
public int registerSchema(final String topic, boolean isKey, final Schema schema) {
161+
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema) {
160162
return this.registerSchema(topic, isKey, schema, new TopicNameStrategy());
161163
}
162164

163-
public int registerSchema(final String topic, boolean isKey, final Schema schema, SubjectNameStrategy<Schema> strategy) {
165+
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema, SubjectNameStrategy strategy) {
164166
return this.register(strategy.subjectName(topic, isKey, schema), schema);
165167
}
166168

167-
private int register(final String subject, final Schema schema) {
169+
private int register(final String subject, final ParsedSchema schema) {
168170
try {
169171
final int id = this.schemaRegistryClient.register(subject, schema);
170-
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id))
172+
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id + "?fetchMaxId=false"))
171173
.willReturn(ResponseDefinitionBuilder.okForJson(new SchemaString(schema.toString()))));
172174
log.debug("Registered schema {}", id);
173175
return id;
@@ -242,8 +244,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit
242244
final FileSource files, final Parameters parameters) {
243245
try {
244246
final int id = SchemaRegistryMock.this.register(getSubject(request),
245-
new Schema.Parser()
246-
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()));
247+
new AvroSchema(new Schema.Parser()
248+
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())));
247249
final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
248250
registerSchemaResponse.setId(id);
249251
return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse);
@@ -279,7 +281,8 @@ private class GetVersionHandler extends SubjectsVersioHandler {
279281
@Override
280282
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
281283
final FileSource files, final Parameters parameters) {
282-
String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
284+
String versionStrFull = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
285+
String versionStr = versionStrFull.substring(0, versionStrFull.indexOf("?"));
283286
SchemaMetadata metadata;
284287
if (versionStr.equals("latest")) {
285288
metadata = SchemaRegistryMock.this.getSubjectVersion(getSubject(request), versionStr);

src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

39+
import io.confluent.kafka.schemaregistry.ParsedSchema;
40+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
3941
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
4042
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
4143
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
@@ -59,19 +61,19 @@ private enum ExplicitAuthType {
5961
private static final byte MAGIC_BYTE = (byte) 0x0;
6062
public static final int ID_SIZE = Integer.SIZE / Byte.SIZE;
6163
private static final int AVRO_CONTENT_OFFSET = 1 + ID_SIZE;
62-
public static final org.apache.avro.Schema INT_SCHEMA = org.apache.avro.Schema.create(INT);
63-
public static final org.apache.avro.Schema STRING_SCHEMA = org.apache.avro.Schema.create(STRING);
64-
public static final org.apache.avro.Schema BOOLEAN_SCHEMA = org.apache.avro.Schema.create(BOOLEAN);
65-
public static final org.apache.avro.Schema NAME_SCHEMA = SchemaBuilder.record("FullName")
64+
public static final AvroSchema INT_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(INT));
65+
public static final AvroSchema STRING_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(STRING));
66+
public static final AvroSchema BOOLEAN_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(BOOLEAN));
67+
public static final AvroSchema NAME_SCHEMA = new AvroSchema(SchemaBuilder.record("FullName")
6668
.namespace("cricket.jmoore.kafka.connect.transforms").fields()
6769
.requiredString("first")
6870
.requiredString("last")
69-
.endRecord();
70-
public static final org.apache.avro.Schema NAME_SCHEMA_ALIASED = SchemaBuilder.record("FullName")
71+
.endRecord());
72+
public static final AvroSchema NAME_SCHEMA_ALIASED = new AvroSchema(SchemaBuilder.record("FullName")
7173
.namespace("cricket.jmoore.kafka.connect.transforms").fields()
7274
.requiredString("first")
7375
.name("surname").aliases("last").type().stringType().noDefault()
74-
.endRecord();
76+
.endRecord());
7577

7678
@RegisterExtension
7779
final SchemaRegistryMock sourceSchemaRegistry =
@@ -188,9 +190,9 @@ private void passSimpleMessage() throws IOException {
188190
final int sourceValId = sourceSchemaRegistry.registerSchema(TOPIC, false, STRING_SCHEMA);
189191

190192
final ByteArrayOutputStream keyOut =
191-
encodeAvroObject(STRING_SCHEMA, sourceKeyId, HELLO_WORLD_VALUE);
193+
encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceKeyId, HELLO_WORLD_VALUE);
192194
final ByteArrayOutputStream valOut =
193-
encodeAvroObject(STRING_SCHEMA, sourceValId, HELLO_WORLD_VALUE);
195+
encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceValId, HELLO_WORLD_VALUE);
194196
final ConnectRecord record =
195197
createRecord(keyOut.toByteArray(), valOut.toByteArray());
196198

@@ -387,7 +389,7 @@ public void testKeySchemaTransfer() {
387389
destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), true, INT_SCHEMA);
388390

389391
// Create new schema for source registry
390-
org.apache.avro.Schema schema = STRING_SCHEMA;
392+
AvroSchema schema = STRING_SCHEMA;
391393
log.info("Registering schema in source registry");
392394
int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, true, schema);
393395
final String subject = TOPIC + "-key";
@@ -403,7 +405,7 @@ public void testKeySchemaTransfer() {
403405
}
404406

405407
try {
406-
ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world");
408+
ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world");
407409

408410
ConnectRecord record = createRecord(Schema.OPTIONAL_BYTES_SCHEMA, out.toByteArray(), null, null);
409411

@@ -431,8 +433,8 @@ public void testKeySchemaTransfer() {
431433
"destination id should be different and higher since that registry already had schemas");
432434

433435
// Verify the schema is the same
434-
org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId);
435-
org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema());
436+
ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId);
437+
ParsedSchema destSchema = new AvroSchema(metadata.getSchema());
436438
assertEquals(schema, sourceSchema, "source server returned same schema");
437439
assertEquals(schema, destSchema, "destination server returned same schema");
438440
assertEquals(sourceSchema, destSchema, "both servers' schemas match");
@@ -450,7 +452,7 @@ public void testValueSchemaTransfer() {
450452
destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, INT_SCHEMA);
451453

452454
// Create new schema for source registry
453-
org.apache.avro.Schema schema = STRING_SCHEMA;
455+
AvroSchema schema = STRING_SCHEMA;
454456
log.info("Registering schema in source registry");
455457
int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema);
456458
final String subject = TOPIC + "-value";
@@ -469,7 +471,7 @@ public void testValueSchemaTransfer() {
469471
ConnectRecord appliedRecord = null;
470472
int destinationId = -1;
471473
try {
472-
ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world");
474+
ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world");
473475

474476
value = out.toByteArray();
475477
ConnectRecord record = createRecord(null, value);
@@ -500,8 +502,8 @@ public void testValueSchemaTransfer() {
500502
"destination id should be different and higher since that registry already had schemas");
501503

502504
// Verify the schema is the same
503-
org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId);
504-
org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema());
505+
ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId);
506+
ParsedSchema destSchema = new AvroSchema(metadata.getSchema());
505507
assertEquals(schema, sourceSchema, "source server returned same schema");
506508
assertEquals(schema, destSchema, "destination server returned same schema");
507509
assertEquals(sourceSchema, destSchema, "both servers' schemas match");
@@ -531,8 +533,8 @@ public void testKeyValueSchemaTransfer() {
531533
destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA);
532534

533535
// Create new schemas for source registry
534-
org.apache.avro.Schema keySchema = INT_SCHEMA;
535-
org.apache.avro.Schema valueSchema = STRING_SCHEMA;
536+
AvroSchema keySchema = INT_SCHEMA;
537+
AvroSchema valueSchema = STRING_SCHEMA;
536538
log.info("Registering schemas in source registry");
537539
int sourceKeyId = sourceSchemaRegistry.registerSchema(TOPIC, true, keySchema);
538540
final String keySubject = TOPIC + "-key";
@@ -559,8 +561,8 @@ public void testKeyValueSchemaTransfer() {
559561
int destinationKeyId = -1;
560562
int destinationValueId = -1;
561563
try {
562-
ByteArrayOutputStream keyStream = encodeAvroObject(keySchema, sourceKeyId, AVRO_CONTENT_OFFSET);
563-
ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema, sourceValueId, "hello, world");
564+
ByteArrayOutputStream keyStream = encodeAvroObject(keySchema.rawSchema(), sourceKeyId, AVRO_CONTENT_OFFSET);
565+
ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema.rawSchema(), sourceValueId, "hello, world");
564566

565567
key = keyStream.toByteArray();
566568
value = valueStream.toByteArray();
@@ -601,13 +603,13 @@ public void testKeyValueSchemaTransfer() {
601603
"destination id should be different and higher since that registry already had schemas");
602604

603605
// Verify the schemas are the same
604-
org.apache.avro.Schema sourceKeySchema = sourceClient.getById(sourceKeyId);
605-
org.apache.avro.Schema destKeySchema = new org.apache.avro.Schema.Parser().parse(keyMetadata.getSchema());
606+
ParsedSchema sourceKeySchema = sourceClient.getSchemaById(sourceKeyId);
607+
ParsedSchema destKeySchema = new AvroSchema(keyMetadata.getSchema());
606608
assertEquals(destKeySchema, sourceKeySchema, "source server returned same key schema");
607609
assertEquals(keySchema, destKeySchema, "destination server returned same key schema");
608610
assertEquals(sourceKeySchema, destKeySchema, "both servers' key schemas match");
609-
org.apache.avro.Schema sourceValueSchema = sourceClient.getById(sourceValueId);
610-
org.apache.avro.Schema destValueSchema = new org.apache.avro.Schema.Parser().parse(valueMetadata.getSchema());
611+
ParsedSchema sourceValueSchema = sourceClient.getSchemaById(sourceValueId);
612+
ParsedSchema destValueSchema = new AvroSchema(valueMetadata.getSchema());
611613
assertEquals(destValueSchema, sourceValueSchema, "source server returned same value schema");
612614
assertEquals(valueSchema, destValueSchema, "destination server returned same value schema");
613615
assertEquals(sourceValueSchema, destValueSchema, "both servers' value schemas match");
@@ -679,20 +681,20 @@ public void testEvolvingValueSchemaTransfer() {
679681
}
680682

681683
try {
682-
GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA)
684+
GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA.rawSchema())
683685
.set("first", "fname")
684686
.set("last", "lname")
685687
.build();
686-
ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA, sourceId, record1);
688+
ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA.rawSchema(), sourceId, record1);
687689

688690
byte[] value = out.toByteArray();
689691
ConnectRecord record = createRecord(null, value);
690692

691-
GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED)
693+
GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED.rawSchema())
692694
.set("first", "fname")
693695
.set("surname", "lname")
694696
.build();
695-
out = encodeAvroObject(NAME_SCHEMA_ALIASED, nextSourceId, record2);
697+
out = encodeAvroObject(NAME_SCHEMA_ALIASED.rawSchema(), nextSourceId, record2);
696698

697699
byte[] nextValue = out.toByteArray();
698700
ConnectRecord nextRecord = createRecord(null, nextValue);
@@ -736,8 +738,8 @@ public void testIncompatibleEvolvingValueSchemaTransfer() {
736738
log.info("Registering schema in source registry");
737739

738740
// TODO: Figure out what these should be, where if order is flipped, destination will not accept
739-
org.apache.avro.Schema schema = null;
740-
org.apache.avro.Schema nextSchema = null;
741+
AvroSchema schema = null;
742+
AvroSchema nextSchema = null;
741743

742744
int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema);
743745
int nextSourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, nextSchema);
@@ -757,12 +759,12 @@ public void testIncompatibleEvolvingValueSchemaTransfer() {
757759
try {
758760
// TODO: Depending on schemas above, then build Avro records for them
759761
// ensure second id is encoded first
760-
ByteArrayOutputStream out = encodeAvroObject(nextSchema, nextSourceId, null);
762+
ByteArrayOutputStream out = encodeAvroObject(nextSchema.rawSchema(), nextSourceId, null);
761763

762764
byte[] value = out.toByteArray();
763765
ConnectRecord record = createRecord(null, value);
764766

765-
out = encodeAvroObject(schema, sourceId, null);
767+
out = encodeAvroObject(schema.rawSchema(), sourceId, null);
766768

767769
byte[] nextValue = out.toByteArray();
768770
ConnectRecord nextRecord = createRecord(null, nextValue);

0 commit comments

Comments
 (0)