Skip to content
This repository was archived by the owner on Sep 11, 2024. It is now read-only.

Commit cba0008

Browse files
committed
feat: add validator to bucket name config
1 parent db043f9 commit cba0008

File tree

5 files changed

+60
-22
lines changed

5 files changed

+60
-22
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ List of deprecated configuration parameters:
484484
List of new configuration parameters:
485485
- `aws.access.key.id` - AWS Access Key ID for accessing S3 bucket.
486486
- `aws.secret.access.key` - AWS S3 Secret Access Key.
487-
- `aws.s3.bucket.name` - - Name of an existing bucket for storing the records. Mandatory.
487+
- `aws.s3.bucket.name` - - Name of an existing bucket for storing the records. Mandatory. See bucket name rules: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html>
488488
- `aws.s3.endpoint` - The endpoint configuration (service endpoint & signing region) to be used for requests.
489489
- `aws.s3.prefix` - [Deprecated] Use `file.name.prefix` and `file.name.template` instead. The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory.
490490
- `aws.s3.region` - Name of the region for the bucket used for storing the records. Defaults to `us-east-1`.

src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.aiven.kafka.connect.s3.S3OutputStream;
5050

5151
import com.amazonaws.regions.Regions;
52+
import com.amazonaws.services.s3.internal.BucketNameUtils;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455

@@ -218,7 +219,7 @@ private static void addAwsConfigGroup(final ConfigDef configDef) {
218219
AWS_S3_BUCKET_NAME_CONFIG,
219220
Type.STRING,
220221
null,
221-
new ConfigDef.NonEmptyString(),
222+
new BucketNameValidator(),
222223
Importance.MEDIUM,
223224
"AWS S3 Bucket name",
224225
GROUP_AWS,
@@ -295,6 +296,19 @@ public void ensureValid(final String name, final Object value) {
295296

296297
}
297298

299+
private static class BucketNameValidator implements ConfigDef.Validator {
300+
@Override
301+
public void ensureValid(final String name, final Object value) {
302+
try {
303+
if (value != null) {
304+
BucketNameUtils.validateBucketName((String) value);
305+
}
306+
} catch (final IllegalArgumentException e) {
307+
throw new ConfigException("Illegal bucket name: " + e.getMessage());
308+
}
309+
}
310+
}
311+
298312
private static void addS3RetryPolicies(final ConfigDef configDef) {
299313
var retryPolicyGroupCounter = 0;
300314
configDef.define(
@@ -583,7 +597,7 @@ public void ensureValid(final String name, final Object value) {
583597
AWS_S3_BUCKET,
584598
Type.STRING,
585599
null,
586-
new ConfigDef.NonEmptyString() {
600+
new BucketNameValidator() {
587601
@Override
588602
public void ensureValid(final String name, final Object o) {
589603
LOGGER.info(AWS_S3_BUCKET

src/test/java/io/aiven/kafka/connect/s3/AwsCredentialProviderFactoryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public void setUp() {
3939
factory = new AwsCredentialProviderFactory();
4040
props = new HashMap<>();
4141
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "anyBucket");
42+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket");
4243
}
4344

4445
@Test

src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ void setupCustomS3Policy() {
372372
S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "jsonl",
373373
S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "AWS_ACCESS_KEY_ID_CONFIG",
374374
S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "AWS_SECRET_ACCESS_KEY_CONFIG",
375-
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "AWS_S3_BUCKET_NAME_CONFIG",
375+
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket",
376376
"aws.s3.backoff.delay.ms", "1",
377377
"aws.s3.backoff.max.delay.ms", "2",
378378
"aws.s3.backoff.max.retries", "3");

src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void correctFullConfig() {
6868

6969
props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "AWS_ACCESS_KEY_ID");
7070
props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "AWS_SECRET_ACCESS_KEY");
71-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
71+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
7272
props.put(S3SinkConfig.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT");
7373
props.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX");
7474
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
@@ -88,7 +88,7 @@ void correctFullConfig() {
8888

8989
assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID");
9090
assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY");
91-
assertThat(conf.getAwsS3BucketName()).isEqualTo("THE_BUCKET");
91+
assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket");
9292
assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX");
9393
assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT");
9494
assertThat(conf.getAwsS3Region()).isEqualTo(Regions.US_EAST_1);
@@ -117,7 +117,7 @@ void correctFullConfigForOldStyleConfigParameters() {
117117

118118
props.put(AWS_ACCESS_KEY_ID, "AWS_ACCESS_KEY_ID");
119119
props.put(AWS_SECRET_ACCESS_KEY, "AWS_SECRET_ACCESS_KEY");
120-
props.put(AWS_S3_BUCKET, "THE_BUCKET");
120+
props.put(AWS_S3_BUCKET, "the-bucket");
121121
props.put(AWS_S3_ENDPOINT, "AWS_S3_ENDPOINT");
122122
props.put(AWS_S3_PREFIX, "AWS_S3_PREFIX");
123123
props.put(AWS_S3_REGION, Regions.US_EAST_1.getName());
@@ -136,7 +136,7 @@ void correctFullConfigForOldStyleConfigParameters() {
136136

137137
assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID");
138138
assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY");
139-
assertThat(conf.getAwsS3BucketName()).isEqualTo("THE_BUCKET");
139+
assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket");
140140
assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX");
141141
assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT");
142142
assertThat(conf.getAwsS3Region()).isEqualTo(Regions.US_EAST_1);
@@ -159,7 +159,7 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() {
159159

160160
props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "AWS_ACCESS_KEY_ID");
161161
props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "AWS_SECRET_ACCESS_KEY");
162-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
162+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
163163
props.put(S3SinkConfig.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT");
164164
props.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX");
165165
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
@@ -176,7 +176,7 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() {
176176

177177
props.put(AWS_ACCESS_KEY_ID, "AWS_ACCESS_KEY_ID_#1");
178178
props.put(AWS_SECRET_ACCESS_KEY, "AWS_SECRET_ACCESS_KEY_#1");
179-
props.put(AWS_S3_BUCKET, "THE_BUCKET_#1");
179+
props.put(AWS_S3_BUCKET, "the-bucket1");
180180
props.put(AWS_S3_ENDPOINT, "AWS_S3_ENDPOINT_#1");
181181
props.put(AWS_S3_PREFIX, "AWS_S3_PREFIX_#1");
182182
props.put(AWS_S3_REGION, Regions.US_WEST_1.getName());
@@ -189,7 +189,7 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() {
189189

190190
assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID");
191191
assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY");
192-
assertThat(conf.getAwsS3BucketName()).isEqualTo("THE_BUCKET");
192+
assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket");
193193
assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX");
194194
assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT");
195195
assertThat(conf.getAwsS3Region()).isEqualTo(Regions.US_EAST_1);
@@ -269,14 +269,37 @@ final void emptyAwsS3Bucket() {
269269
props.put(AWS_S3_BUCKET, "");
270270
assertThatThrownBy(() -> new S3SinkConfig(props))
271271
.isInstanceOf(ConfigException.class)
272-
.hasMessage("Invalid value for configuration aws_s3_bucket: String must be non-empty");
272+
.hasMessage("Illegal bucket name: Bucket name should be between 3 and 63 characters long");
273273

274274
props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah");
275275
props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah");
276276
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "");
277277
assertThatThrownBy(() -> new S3SinkConfig(props))
278278
.isInstanceOf(ConfigException.class)
279-
.hasMessage("Invalid value for configuration aws.s3.bucket.name: String must be non-empty");
279+
.hasMessage("Illegal bucket name: Bucket name should be between 3 and 63 characters long");
280+
}
281+
282+
@Test
283+
final void invalidAwsS3Bucket() {
284+
final Map<String, String> props = new HashMap<>();
285+
props.put(AWS_ACCESS_KEY_ID, "blah-blah-blah");
286+
props.put(AWS_SECRET_ACCESS_KEY, "blah-blah-blah");
287+
props.put(AWS_S3_BUCKET, "BUCKET-NAME");
288+
assertThatThrownBy(() -> new S3SinkConfig(props))
289+
.isInstanceOf(ConfigException.class)
290+
.hasMessage("Illegal bucket name: Bucket name should not contain uppercase characters");
291+
292+
props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah");
293+
props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah");
294+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "BUCKET-NAME");
295+
assertThatThrownBy(() -> new S3SinkConfig(props))
296+
.isInstanceOf(ConfigException.class)
297+
.hasMessage("Illegal bucket name: Bucket name should not contain uppercase characters");
298+
299+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "bucket_name");
300+
assertThatThrownBy(() -> new S3SinkConfig(props))
301+
.isInstanceOf(ConfigException.class)
302+
.hasMessage("Illegal bucket name: Bucket name should not contain '_'");
280303
}
281304

282305
@Test
@@ -647,7 +670,7 @@ void supportedFormatTypeConfig(final String formatType) {
647670
final Map<String, String> properties = new HashMap<>();
648671
properties.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "any_access_key_id");
649672
properties.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "any_secret_key");
650-
properties.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any_bucket");
673+
properties.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket");
651674
properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "any_prefix");
652675
properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, formatType);
653676

@@ -680,7 +703,7 @@ void notSupportYyyyUppercaseInFilenameTemplate() {
680703
+ "-{{partition}}-{{start_offset:padding=true}}.gz",
681704
S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "any_access_key_id",
682705
S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "any_secret_key",
683-
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any_bucket"
706+
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"
684707
);
685708
assertThatThrownBy(() -> new S3SinkConfig(properties))
686709
.isInstanceOf(ConfigException.class)
@@ -698,7 +721,7 @@ void stsRoleCorrectConfig() {
698721
props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
699722
props.put(S3SinkConfig.AWS_STS_ROLE_EXTERNAL_ID, "EXTERNAL_ID");
700723
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
701-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
724+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
702725
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
703726

704727
final var conf = new S3SinkConfig(props);
@@ -715,7 +738,7 @@ void stsRoleEmptyArn() {
715738

716739
props.put(S3SinkConfig.AWS_STS_ROLE_EXTERNAL_ID, "EXTERNAL_ID");
717740
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
718-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
741+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
719742
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
720743

721744
assertThatThrownBy(() -> new S3SinkConfig(props))
@@ -731,7 +754,7 @@ void stsRoleEmptySessionName() {
731754

732755
props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
733756
props.put(S3SinkConfig.AWS_STS_ROLE_EXTERNAL_ID, "EXTERNAL_ID");
734-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
757+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
735758
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
736759

737760
assertThatThrownBy(() -> new S3SinkConfig(props))
@@ -748,7 +771,7 @@ void stsWrongSessionDuration() {
748771

749772
props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
750773
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
751-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
774+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
752775
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
753776

754777
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_DURATION, "30");
@@ -772,7 +795,7 @@ void stsCorrectSessionDuration() {
772795

773796
props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
774797
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
775-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
798+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
776799
props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
777800

778801
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_DURATION, "900");
@@ -789,7 +812,7 @@ void stsEndpointShouldNotBeSetWithoutRegion() {
789812
props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
790813
props.put(S3SinkConfig.AWS_STS_ROLE_EXTERNAL_ID, "EXTERNAL_ID");
791814
props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
792-
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "THE_BUCKET");
815+
props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
793816
props.put(S3SinkConfig.AWS_STS_CONFIG_ENDPOINT, "https://sts.eu-north-1.amazonaws.com");
794817

795818
assertThatThrownBy(() -> new S3SinkConfig(props))
@@ -806,7 +829,7 @@ void notSupportedFileMaxRecords(final String fileNameTemplate) {
806829
S3SinkConfig.FILE_MAX_RECORDS, "2",
807830
S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "any_access_key_id",
808831
S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "any_secret_key",
809-
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any_bucket"
832+
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"
810833
);
811834
assertThatThrownBy(() -> new S3SinkConfig(properties))
812835
.isInstanceOf(ConfigException.class)

0 commit comments

Comments
 (0)