2323import org .slf4j .Logger ;
2424import org .slf4j .LoggerFactory ;
2525
26+ import io .confluent .kafka .schemaregistry .ParsedSchema ;
2627import io .confluent .kafka .schemaregistry .client .CachedSchemaRegistryClient ;
2728import io .confluent .kafka .schemaregistry .client .rest .exceptions .RestClientException ;
28- import io .confluent .kafka .serializers .AbstractKafkaAvroSerDeConfig ;
29+ import io .confluent .kafka .serializers .AbstractKafkaSchemaSerDeConfig ;
2930import io .confluent .kafka .serializers .subject .TopicNameStrategy ;
3031import io .confluent .kafka .serializers .subject .strategy .SubjectNameStrategy ;
3132
@@ -44,17 +45,17 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans
4445
4546 public static final String SRC_PREAMBLE = "For source consumer's schema registry, " ;
4647 public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry." ;
47- public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
48- public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
49- public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
50- public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
48+ public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
49+ public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
50+ public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
51+ public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
5152
5253 public static final String DEST_PREAMBLE = "For target producer's schema registry, " ;
5354 public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry." ;
54- public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
55- public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
56- public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
57- public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
55+ public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
56+ public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
57+ public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
58+ public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
5859
5960 public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries." ;
6061 public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true ;
@@ -63,7 +64,7 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans
6364
6465 private CachedSchemaRegistryClient sourceSchemaRegistryClient ;
6566 private CachedSchemaRegistryClient destSchemaRegistryClient ;
66- private SubjectNameStrategy < org . apache . avro . Schema > subjectNameStrategy ;
67+ private SubjectNameStrategy subjectNameStrategy ;
6768 private boolean transferKeys , includeHeaders ;
6869
6970 // caches from the source registry to the destination registry
@@ -98,17 +99,17 @@ public void configure(Map<String, ?> props) {
9899
99100 List <String > sourceUrls = config .getList (ConfigName .SRC_SCHEMA_REGISTRY_URL );
100101 final Map <String , String > sourceProps = new HashMap <>();
101- sourceProps .put (AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
102+ sourceProps .put (AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
102103 "SRC_" + config .getString (ConfigName .SRC_BASIC_AUTH_CREDENTIALS_SOURCE ));
103- sourceProps .put (AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ,
104+ sourceProps .put (AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ,
104105 config .getPassword (ConfigName .SRC_USER_INFO )
105106 .value ());
106107
107108 List <String > destUrls = config .getList (ConfigName .DEST_SCHEMA_REGISTRY_URL );
108109 final Map <String , String > destProps = new HashMap <>();
109- destProps .put (AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
110+ destProps .put (AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
110111 "DEST_" + config .getString (ConfigName .DEST_BASIC_AUTH_CREDENTIALS_SOURCE ));
111- destProps .put (AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ,
112+ destProps .put (AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ,
112113 config .getPassword (ConfigName .DEST_USER_INFO )
113114 .value ());
114115
@@ -206,14 +207,14 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean
206207
207208 schemaAndDestId = schemaCache .get (sourceSchemaId );
208209 if (schemaAndDestId != null ) {
209- log .trace ("Schema id {} has been seen before. Not registering with destination registry again." );
210+ log .trace ("Schema id {} has been seen before. Not registering with destination registry again." , sourceSchemaId );
210211 } else { // cache miss
211212 log .trace ("Schema id {} has not been seen before" , sourceSchemaId );
212213 schemaAndDestId = new SchemaAndId ();
213214 try {
214215 log .trace ("Looking up schema id {} in source registry" , sourceSchemaId );
215216 // Can't do getBySubjectAndId because that requires a Schema object for the strategy
216- schemaAndDestId .schema = sourceSchemaRegistryClient .getById (sourceSchemaId );
217+ schemaAndDestId .schema = sourceSchemaRegistryClient .getSchemaById (sourceSchemaId );
217218 } catch (IOException | RestClientException e ) {
218219 log .error (String .format ("Unable to fetch source schema for id %d." , sourceSchemaId ), e );
219220 throw new ConnectException (e );
@@ -244,25 +245,25 @@ public void close() {
244245 }
245246
246247 interface ConfigName {
247- String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
248- String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
249- String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ;
250- String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
251- String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
252- String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ;
248+ String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
249+ String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
250+ String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ;
251+ String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
252+ String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
253+ String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ;
253254 String SCHEMA_CAPACITY = "schema.capacity" ;
254255 String TRANSFER_KEYS = "transfer.message.keys" ;
255256 String INCLUDE_HEADERS = "include.message.headers" ;
256257 }
257258
258259 private static class SchemaAndId {
259260 private Integer id ;
260- private org . apache . avro . Schema schema ;
261+ private ParsedSchema schema ;
261262
262263 SchemaAndId () {
263264 }
264265
265- SchemaAndId (int id , org . apache . avro . Schema schema ) {
266+ SchemaAndId (int id , ParsedSchema schema ) {
266267 this .id = id ;
267268 this .schema = schema ;
268269 }
0 commit comments