99import java .util .Objects ;
1010import java .util .Optional ;
1111
12+ import io .confluent .kafka .serializers .AbstractKafkaSchemaSerDeConfig ;
1213import org .apache .kafka .common .cache .Cache ;
1314import org .apache .kafka .common .cache .LRUCache ;
1415import org .apache .kafka .common .cache .SynchronizedCache ;
2627import io .confluent .kafka .schemaregistry .ParsedSchema ;
2728import io .confluent .kafka .schemaregistry .client .CachedSchemaRegistryClient ;
2829import io .confluent .kafka .schemaregistry .client .rest .exceptions .RestClientException ;
29- import io .confluent .kafka .serializers .AbstractKafkaAvroSerDeConfig ;
3030import io .confluent .kafka .serializers .subject .TopicNameStrategy ;
3131import io .confluent .kafka .serializers .subject .strategy .SubjectNameStrategy ;
3232
@@ -45,17 +45,17 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans
4545
4646 public static final String SRC_PREAMBLE = "For source consumer's schema registry, " ;
4747 public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry." ;
48- public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
49- public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
50- public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
51- public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
48+ public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
49+ public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
50+ public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
51+ public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
5252
5353 public static final String DEST_PREAMBLE = "For target producer's schema registry, " ;
5454 public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry." ;
55- public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
56- public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
57- public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
58- public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
55+ public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DOC ;
56+ public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT ;
57+ public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DOC ;
58+ public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_USER_INFO_DEFAULT ;
5959
6060 public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries." ;
6161 public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true ;
@@ -99,17 +99,17 @@ public void configure(Map<String, ?> props) {
9999
100100 List <String > sourceUrls = config .getList (ConfigName .SRC_SCHEMA_REGISTRY_URL );
101101 final Map <String , String > sourceProps = new HashMap <>();
102- sourceProps .put (AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
102+ sourceProps .put (AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
103103 "SRC_" + config .getString (ConfigName .SRC_BASIC_AUTH_CREDENTIALS_SOURCE ));
104- sourceProps .put (AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ,
104+ sourceProps .put (AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ,
105105 config .getPassword (ConfigName .SRC_USER_INFO )
106106 .value ());
107107
108108 List <String > destUrls = config .getList (ConfigName .DEST_SCHEMA_REGISTRY_URL );
109109 final Map <String , String > destProps = new HashMap <>();
110- destProps .put (AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
110+ destProps .put (AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ,
111111 "DEST_" + config .getString (ConfigName .DEST_BASIC_AUTH_CREDENTIALS_SOURCE ));
112- destProps .put (AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ,
112+ destProps .put (AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ,
113113 config .getPassword (ConfigName .DEST_USER_INFO )
114114 .value ());
115115
@@ -207,7 +207,7 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean
207207
208208 schemaAndDestId = schemaCache .get (sourceSchemaId );
209209 if (schemaAndDestId != null ) {
210- log .trace ("Schema id {} has been seen before. Not registering with destination registry again." );
210+ log .trace ("Schema id {} has been seen before. Not registering with destination registry again." , sourceSchemaId );
211211 } else { // cache miss
212212 log .trace ("Schema id {} has not been seen before" , sourceSchemaId );
213213 schemaAndDestId = new SchemaAndId ();
@@ -245,12 +245,12 @@ public void close() {
245245 }
246246
247247 interface ConfigName {
248- String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
249- String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
250- String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ;
251- String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
252- String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
253- String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig .USER_INFO_CONFIG ;
248+ String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
249+ String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
250+ String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ;
251+ String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG ;
252+ String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig .BASIC_AUTH_CREDENTIALS_SOURCE ;
253+ String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig .USER_INFO_CONFIG ;
254254 String SCHEMA_CAPACITY = "schema.capacity" ;
255255 String TRANSFER_KEYS = "transfer.message.keys" ;
256256 String INCLUDE_HEADERS = "include.message.headers" ;
0 commit comments