Skip to content

Commit cf5d375

Browse files
jheinnicOneCricketeer
authored andcommitted
Cure HTTP Authentication Singleton's Password Overwrite (OneCricketeer#19)
* Enable Service Provider Relocation (OneCricketeer#17) (OneCricketeer#4) Schema Registry Client's basic HTTP Authentication support is implemented through a ServiceProvider. Without handling the fact that relocating the schema client also relocates the service implementations, no implementations are found when the client attempts to find a strategy that matches an authentication source type specified through basic.auth.credentials.source... * Improve test cases that distinguish between source and destination credentials -- Split credentials fixture value pair into two distinct value pairs, one for source registry clients, another for destinations. -- Add test case verifying source authentication properties reach source registry client -- Add test case verifying destination authentication properties reach destination registry client -- Add test cases verifying expected exception when incorrect credentials are passed to source and/or destination registry client. -- Add test case using distinct credentials for both source and destination in same execution and using same authentication source strategy (currently does not pass!) * Compensate for Basic HTTP Authentication's Singleton Implementation The fact that Basic HTTP Authentication as implemented in the Kafka Connect Client uses a singleton to hold configured credentials means that if both the source and destination schema registries require basic HTTP authentication and want to provide credentials via `basic.auth.credentials.source`, the second set of credentials will overwrite and replace the first` Connect's three singletons for Basic HTTP Authentication are selected by the same three key values used in `basic.auth.credentials.source` to designate which represented algorithm to use. This commit begins compensating for these singletons first by creating and registering two additional copies of the Basic Auth singletons in the SMT's code base. One is intended for use by the source broker's schema registry client, the other is for destinaation broker's registry. The only intentional difference between what is built here and the production Kafka Connect namespac is the addition of a short prefix to distinguish `SRC_` from `DEST_`. Now, when removing a prefix it uses to broker input to one adapter or the orther, in addition to selecting which configuration hash destrination to use, it also adds that prefix to the value it provides for `basic.auth.credentials.source`. As a result, the source and destination schema registry clients will each now use a diffent singleton to hold onto their credentials with two distinct singletons. This work relies on addition of ServicesRouterTransformer to he maven shade plugin that was recently reviewd and released.
1 parent 8b76edb commit cf5d375

11 files changed

+120
-12
lines changed

src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ public void configure(Map<String, ?> props) {
9999
List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL);
100100
final Map<String, String> sourceProps = new HashMap<>();
101101
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
102-
config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
102+
"SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
103103
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
104104
config.getPassword(ConfigName.SRC_USER_INFO)
105105
.value());
106106

107107
List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
108108
final Map<String, String> destProps = new HashMap<>();
109109
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
110-
config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
110+
"DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
111111
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
112112
config.getPassword(ConfigName.DEST_USER_INFO)
113113
.value());
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider;
5+
6+
public class DestSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider {
7+
@Override
8+
public String alias() {
9+
return "DEST_SASL_INHERIT";
10+
}
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider;
5+
6+
public class DestUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider {
7+
@Override
8+
public String alias() {
9+
return "DEST_URL";
10+
}
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
5+
6+
public class DestUserInfoCredentialProvider extends UserInfoCredentialProvider
7+
{
8+
@Override
9+
public String alias() {
10+
return "DEST_USER_INFO";
11+
}
12+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider;
5+
6+
public class SrcSaslBasicAuthCredentialProvider extends SaslBasicAuthCredentialProvider {
7+
@Override
8+
public String alias() {
9+
return "SRC_SASL_INHERIT";
10+
}
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider;
5+
6+
public class SrcUrlBasicAuthCredentialProvider extends UrlBasicAuthCredentialProvider {
7+
@Override
8+
public String alias() {
9+
return "SRC_URL";
10+
}
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/* Licensed under Apache-2.0 */
2+
package cricket.jmoore.security.basicauth;
3+
4+
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
5+
6+
public class SrcUserInfoCredentialProvider extends UserInfoCredentialProvider
7+
{
8+
@Override
9+
public String alias() {
10+
return "SRC_USER_INFO";
11+
}
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
cricket.jmoore.security.basicauth.DestSaslBasicAuthCredentialProvider
2+
cricket.jmoore.security.basicauth.DestUrlBasicAuthCredentialProvider
3+
cricket.jmoore.security.basicauth.DestUserInfoCredentialProvider
4+
cricket.jmoore.security.basicauth.SrcSaslBasicAuthCredentialProvider
5+
cricket.jmoore.security.basicauth.SrcUrlBasicAuthCredentialProvider
6+
cricket.jmoore.security.basicauth.SrcUserInfoCredentialProvider

src/test/java/cricket/jmoore/kafka/connect/transforms/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ public interface Constants {
1010

1111
public static final String URL_SOURCE = "URL";
1212

13-
public static final String HTTP_AUTH_CREDENTIALS_FIXTURE = "username:password";
13+
public static final String HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE = "sourceuser:sourcepass";
14+
15+
public static final String HTTP_AUTH_DEST_CREDENTIALS_FIXTURE = "destuser:destpass";
1416
}

src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public enum Role {
113113
this.getConfigHandler));
114114
private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
115115
private final String basicAuthTag;
116+
private final String basicAuthCredentials;
116117
private Function<MappingBuilder, StubMapping> stubFor;
117118

118119
private static final Logger log = LoggerFactory.getLogger(SchemaRegistryMock.class);
@@ -123,6 +124,8 @@ public SchemaRegistryMock(Role role) {
123124
}
124125

125126
this.basicAuthTag = (role == Role.SOURCE) ? Constants.USE_BASIC_AUTH_SOURCE_TAG : Constants.USE_BASIC_AUTH_DEST_TAG;
127+
this.basicAuthCredentials =
128+
(role == Role.SOURCE)? Constants.HTTP_AUTH_SOURCE_CREDENTIALS_FIXTURE : Constants.HTTP_AUTH_DEST_CREDENTIALS_FIXTURE;
126129
}
127130

128131
@Override
@@ -133,7 +136,7 @@ public void afterEach(final ExtensionContext context) {
133136
@Override
134137
public void beforeEach(final ExtensionContext context) {
135138
if (context.getTags().contains(this.basicAuthTag)) {
136-
String[] userPass = Constants.HTTP_AUTH_CREDENTIALS_FIXTURE.split(":");
139+
final String[] userPass = this.basicAuthCredentials.split(":");
137140
this.stubFor = (MappingBuilder mappingBuilder) -> this.mockSchemaRegistry.stubFor(
138141
mappingBuilder.withBasicAuth(userPass[0], userPass[1]));
139142
} else {

0 commit comments

Comments
 (0)