Skip to content

Commit 8b76edb

Browse files
jheinnicOneCricketeer
authored andcommitted
Hide user info (OneCricketeer#22)
* Enable Service Provider Relocation (OneCricketeer#17) (OneCricketeer#4) Schema Registry Client's basic HTTP Authentication support is implemented through a ServiceProvider. Without handling the fact that relocating the schema client also relocates the service implementations, no implementations are found when the client attempts to find a strategy that matches an authentication source type specified through basic.auth.credentials.source... * Modify USER_INFO fields to use type Password instead of String Kafka Connect logs connector configurations before launching them, which is a problem if some of those configuration properties happen to contain sensitive information that does not belong in a log file, such as any Basic HTTP Authentication credentials MirrorTool ahs been configured to make use of. Kakfa Connect provides a `Password` data type that is always masked on display. It was relatifely simple to change both the USER_INFO fields recently added to use PASSWORD instead of STRING as their data types. The URL field can sometimes also contain a password, when the authentiation source is set to URL instead of USER_INFO. There is no way to make these data types conditional, so it is not possible to make URL of type PASSWORD if the credential source is URL, while it is also of type STRING if the credential source is not URL. Since the credential format when using URL is the same as it is when using USER_INFO, and there is arguably a good reason to not mask the rest of the URL, the URL fields continue to have type String here.
1 parent f961af9 commit 8b76edb

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ public SchemaRegistryTransfer() {
7777
.define(ConfigName.SRC_SCHEMA_REGISTRY_URL, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, SRC_SCHEMA_REGISTRY_CONFIG_DOC)
7878
.define(ConfigName.DEST_SCHEMA_REGISTRY_URL, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, DEST_SCHEMA_REGISTRY_CONFIG_DOC)
7979
.define(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE, ConfigDef.Type.STRING, SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC)
80-
.define(ConfigName.SRC_USER_INFO, ConfigDef.Type.STRING, SRC_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_USER_INFO_CONFIG_DOC)
80+
.define(ConfigName.SRC_USER_INFO, ConfigDef.Type.PASSWORD, SRC_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, SRC_USER_INFO_CONFIG_DOC)
8181
.define(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE, ConfigDef.Type.STRING, DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC)
82-
.define(ConfigName.DEST_USER_INFO, ConfigDef.Type.STRING, DEST_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_USER_INFO_CONFIG_DOC)
82+
.define(ConfigName.DEST_USER_INFO, ConfigDef.Type.PASSWORD, DEST_USER_INFO_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, DEST_USER_INFO_CONFIG_DOC)
8383
.define(ConfigName.SCHEMA_CAPACITY, ConfigDef.Type.INT, SCHEMA_CAPACITY_CONFIG_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CAPACITY_CONFIG_DOC)
8484
.define(ConfigName.TRANSFER_KEYS, ConfigDef.Type.BOOLEAN, TRANSFER_KEYS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, TRANSFER_KEYS_CONFIG_DOC)
8585
.define(ConfigName.INCLUDE_HEADERS, ConfigDef.Type.BOOLEAN, INCLUDE_HEADERS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, INCLUDE_HEADERS_CONFIG_DOC)
@@ -101,14 +101,16 @@ public void configure(Map<String, ?> props) {
101101
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
102102
config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
103103
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
104-
config.getString(ConfigName.SRC_USER_INFO));
104+
config.getPassword(ConfigName.SRC_USER_INFO)
105+
.value());
105106

106107
List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
107108
final Map<String, String> destProps = new HashMap<>();
108109
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
109110
config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
110111
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
111-
config.getString(ConfigName.DEST_USER_INFO));
112+
config.getPassword(ConfigName.DEST_USER_INFO)
113+
.value());
112114

113115
Integer schemaCapacity = config.getInt(ConfigName.SCHEMA_CAPACITY);
114116

0 commit comments

Comments
 (0)