Skip to content
This repository was archived by the owner on Sep 11, 2024. It is now read-only.

Commit b21eaa7

Browse files
authored
Merge pull request #297 from stephen-harris/feat/add-support-for-KeyAndTopicPartitionRecordGrouper
Bump commons-for-apache-kafka-connect to 0.11.0
2 parents 2befdc7 + d8ecd94 commit b21eaa7

File tree

3 files changed

+25
-11
lines changed

3 files changed

+25
-11
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ ext {
8080
confluentPlatformVersion = "4.1.4"
8181
// Align with version used by commons
8282
avroConverterVersion = "7.2.2"
83-
aivenConnectCommonsVersion = "0.10.1"
83+
aivenConnectCommonsVersion = "0.11.0"
8484

8585
amazonS3Version = "1.12.520"
8686
amazonSTSVersion = "1.12.519"

src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator;
4646
import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator;
4747
import io.aiven.kafka.connect.common.config.validators.UrlValidator;
48-
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
4948
import io.aiven.kafka.connect.common.templating.Template;
5049
import io.aiven.kafka.connect.s3.S3OutputStream;
5150

@@ -726,15 +725,6 @@ private void validate() {
726725
);
727726
}
728727

729-
// Special checks for {{key}} filename template.
730-
final Template filenameTemplate = getFilenameTemplate();
731-
if (RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate))) {
732-
if (getMaxRecordsPerFile() > 1) {
733-
final String msg = String.format("When %s is %s, %s must be either 1 or not set",
734-
FILE_NAME_TEMPLATE_CONFIG, filenameTemplate, FILE_MAX_RECORDS);
735-
throw new ConfigException(msg);
736-
}
737-
}
738728
}
739729

740730
public AwsAccessSecret getOldAwsCredentials() {

src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,4 +934,28 @@ void stsEndpointShouldNotBeSetWithoutRegion() {
934934
);
935935

936936
}
937+
938+
@ParameterizedTest
939+
@ValueSource(strings = {"{{key}}", "{{topic}}/{{partition}}/{{key}}"})
940+
void notSupportedFileMaxRecords(final String fileNameTemplate) {
941+
final Map<String, String> properties =
942+
Map.of(
943+
S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, fileNameTemplate,
944+
S3SinkConfig.FILE_MAX_RECORDS, "2",
945+
S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "any_access_key_id",
946+
S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "any_secret_key",
947+
S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any_bucket"
948+
);
949+
final Throwable t = assertThrows(
950+
ConfigException.class,
951+
() -> new S3SinkConfig(properties)
952+
);
953+
assertEquals(
954+
String.format(
955+
"When file.name.template is %s, file.max.records must be either 1 or not set",
956+
fileNameTemplate
957+
),
958+
t.getMessage());
959+
}
960+
937961
}

0 commit comments

Comments
 (0)