Skip to content

Commit dce06af

Browse files
committed
Add kafka transformations
0 parents  commit dce06af

File tree

13 files changed

+991
-0
lines changed

13 files changed

+991
-0
lines changed

.github/workflows/main.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
name: Build and Publish artifacts
2+
3+
on: [push]
4+
5+
jobs:
6+
build-and-publish:
7+
runs-on: ubuntu-latest
8+
9+
steps:
10+
- uses: actions/checkout@v2
11+
- name: Set up JDK 8
12+
uses: actions/setup-java@v2
13+
with:
14+
java-version: '8'
15+
distribution: 'adopt'
16+
- name: Test with mvn
17+
run: mvn test
18+
- name: Build Fat jar for manual installation
19+
if: startsWith(github.ref, 'refs/tags/v')
20+
run: mvn org.apache.maven.plugins:maven-assembly-plugin:single
21+
- name: Build Confluent zip file
22+
if: startsWith(github.ref, 'refs/tags/v')
23+
run: mvn io.confluent:kafka-connect-maven-plugin:kafka-connect
24+
- name: Check if built zip file installable in confluent
25+
if: startsWith(github.ref, 'refs/tags/v')
26+
run: |
27+
docker run --rm \
28+
-e CONNECT_PLUGIN_PATH='/usr/share/java,/usr/share/confluent-hub-components/' \
29+
--entrypoint "" \
30+
-v $PWD:/app \
31+
confluentinc/cp-kafka-connect-base bash -c 'confluent-hub install --no-prompt /app/target/components/packages/tilakpatidar-kafka-connect-*.zip && test -f /usr/share/confluent-hub-components/tilakpatidar-kafka-connect-transform/lib/kafka-connect-transform-*.jar'
32+
- name: Release
33+
uses: softprops/action-gh-release@v1
34+
if: startsWith(github.ref, 'refs/tags/v')
35+
with:
36+
files: |
37+
target/kafka-connect-transform*.jar
38+
target/components/packages/tilakpatidar-kafka-connect-transform-*.zip
39+

.gitignore

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
### Java template
2+
# Compiled class file
3+
*.class
4+
5+
# Log file
6+
*.log
7+
8+
# BlueJ files
9+
*.ctxt
10+
11+
# Mobile Tools for Java (J2ME)
12+
.mtj.tmp/
13+
14+
# Package Files #
15+
*.jar
16+
*.war
17+
*.nar
18+
*.ear
19+
*.zip
20+
*.tar.gz
21+
*.rar
22+
23+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
24+
hs_err_pid*
25+
26+
### Maven template
27+
target/
28+
pom.xml.tag
29+
pom.xml.releaseBackup
30+
pom.xml.versionsBackup
31+
pom.xml.next
32+
release.properties
33+
dependency-reduced-pom.xml
34+
buildNumber.properties
35+
.mvn/timing.properties
36+
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
37+
.mvn/wrapper/maven-wrapper.jar
38+

LICENSE

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
The MIT License (MIT)
3+
4+
Copyright (c) 2021 Tilak Patidar
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in
14+
all copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22+
THE SOFTWARE.

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
## kafka-connect-transform
2+
3+
This project contains kafka-connect related custom transformation.
4+
5+
### Available components
6+
7+
| Component Type | Class Name | Description | Configuration |
8+
|--------------------------|--------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
9+
| Partitioner | com.github.tilakpatidar.kafka.connect.partitioner.HeaderValuePartitioner | Partition records based on key/value pairs present in record header. | `partition.header.name`: Comma separated list of header keys `directory.delim`: Delimiter to use when encoding partition value. Default / |
10+
| Single Message Transform | com.github.tilakpatidar.kafka.connect.smt.ArchiveMessage | Copies message key, topic name and message timestamp to the original message value for archiving the entire kafka key-value message as one record. | `archive.schema_name`: Name of the schema object. `archive.msg_key`: Name of column in which kafka message key will be stored. `archive.msg_topic`: Name of column in which kafka topic name will be stored. `archive.msg_timestamp`: Name of column in which kafka message timestamp will be stored. |
11+
| Single Message Transform | com.github.tilakpatidar.kafka.connect.smt.CloneField | Create clone of field with different name. | `clonefield.from` : Name of an existing field to clone value from. `clonefield.to`: Name of the new field to which value from `clonefield.from` field will be pasted. |
12+
| Single Message Transform | com.github.tilakpatidar.kafka.connect.smt.CloneFieldsToHeader | Clone record fields into headers of a kafka record | `clonefieldstoheader.from` : Field name or comma separated string for multiple fields. |
13+
14+
### Installation
15+
16+
#### For confluent kafka connect
17+
18+
Download the zip file available in the releases.
19+
20+
```shell
21+
confluent-hub install <zip file>
22+
```
23+
24+
#### For apache kafka connect
25+
26+
Download the jar file available in the releases. Place it in your `plugin.path`
27+
28+

pom.xml

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?xml version="1.0"?>
2+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>kafka-connect-transform</artifactId>
7+
<version>1.0.0</version>
8+
<name>kafka-connect-transform</name>
9+
<url>https://github.com/tilakpatidar/kafka-connect-transform</url>
10+
<inceptionYear>2021</inceptionYear>
11+
<groupId>com.github.tilakpatidar.kafka.connect</groupId>
12+
<description>Common transformations for Kafka Connect.</description>
13+
<repositories>
14+
<repository>
15+
<id>confluent</id>
16+
<url>https://packages.confluent.io/maven/</url>
17+
</repository>
18+
<repository>
19+
<id>maven-central</id>
20+
<url>https://repo1.maven.org/maven2/</url>
21+
</repository>
22+
</repositories>
23+
<licenses>
24+
<license>
25+
<name>MIT License</name>
26+
<url>https://github.com/tilakpatidar/kafka-connect-transform/LICENSE</url>
27+
<distribution>repo</distribution>
28+
</license>
29+
</licenses>
30+
<developers>
31+
<developer>
32+
<id>tilakpatidar</id>
33+
<name>Tilak Patidar</name>
34+
<url>https://github.com/tilakpatidar</url>
35+
<roles>
36+
<role>Committer</role>
37+
</roles>
38+
</developer>
39+
</developers>
40+
<scm>
41+
<connection>scm:git:https://github.com/tilakpatidar/kafka-connect-transform.git</connection>
42+
<developerConnection>scm:git:git@github.com:tilakpatidar/kafka-connect-transform.git</developerConnection>
43+
<url>https://github.com/tilakpatidar/kafka-connect-transform</url>
44+
</scm>
45+
<issueManagement>
46+
<system>github</system>
47+
<url>https://github.com/tilakpatidar/kafka-connect-transform/issues</url>
48+
</issueManagement>
49+
<dependencies>
50+
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-storage-partitioner -->
51+
<dependency>
52+
<groupId>io.confluent</groupId>
53+
<artifactId>kafka-connect-storage-partitioner</artifactId>
54+
<version>10.2.4</version>
55+
</dependency>
56+
57+
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
58+
<dependency>
59+
<groupId>org.apache.kafka</groupId>
60+
<artifactId>connect-api</artifactId>
61+
<version>3.0.0</version>
62+
</dependency>
63+
64+
<!-- https://mvnrepository.com/artifact/junit/junit -->
65+
<dependency>
66+
<groupId>junit</groupId>
67+
<artifactId>junit</artifactId>
68+
<version>4.13.1</version>
69+
<scope>test</scope>
70+
</dependency>
71+
72+
</dependencies>
73+
<build>
74+
<plugins>
75+
<plugin>
76+
<groupId>io.confluent</groupId>
77+
<artifactId>kafka-connect-maven-plugin</artifactId>
78+
<version>0.12.0</version>
79+
<configuration>
80+
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
81+
<documentationUrl>https://github.com/tilakpatidar/kafka-connect-transform</documentationUrl>
82+
<componentTypes>
83+
<componentType>transform</componentType>
84+
</componentTypes>
85+
<title>Kafka Connect Transformations</title>
86+
<supportUrl>${project.issueManagement.url}</supportUrl>
87+
<supportSummary>Support provided through community involvement.</supportSummary>
88+
<ownerName>Tilak Patidar</ownerName>
89+
<ownerUsername>tilakpatidar</ownerUsername>
90+
</configuration>
91+
</plugin>
92+
<plugin>
93+
<groupId>org.apache.maven.plugins</groupId>
94+
<artifactId>maven-compiler-plugin</artifactId>
95+
<version>3.8.1</version>
96+
<configuration>
97+
<source>8</source>
98+
<target>8</target>
99+
</configuration>
100+
</plugin>
101+
<plugin>
102+
<groupId>org.apache.maven.plugins</groupId>
103+
<artifactId>maven-assembly-plugin</artifactId>
104+
<version>3.3.0</version>
105+
<configuration>
106+
<descriptorRefs>
107+
<descriptorRef>jar-with-dependencies</descriptorRef>
108+
</descriptorRefs>
109+
</configuration>
110+
</plugin>
111+
</plugins>
112+
</build>
113+
</project>
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.github.tilakpatidar.kafka.connect.partitioner;
2+
3+
import io.confluent.connect.storage.common.StorageCommonConfig;
4+
import io.confluent.connect.storage.errors.PartitionException;
5+
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
6+
import org.apache.kafka.common.config.AbstractConfig;
7+
import org.apache.kafka.common.config.ConfigDef;
8+
import org.apache.kafka.common.utils.Utils;
9+
import org.apache.kafka.connect.sink.SinkRecord;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
/**
18+
* @param <T> The type representing the field schemas.
19+
*/
20+
public class HeaderValuePartitioner<T> extends DefaultPartitioner<T> {
21+
public static final String HEADER_FIELD_NAME_CONFIG = "partitioner.header.name";
22+
private static final Logger log = LoggerFactory.getLogger(HeaderValuePartitioner.class);
23+
private List<String> headerNames;
24+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
25+
.define(HEADER_FIELD_NAME_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Header key names used for partitioning the record.");
26+
27+
28+
public HeaderValuePartitioner() {
29+
}
30+
31+
public void configure(Map<String, Object> configs) {
32+
AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
33+
this.headerNames = config.getList(HEADER_FIELD_NAME_CONFIG);
34+
this.delim = (String) configs.getOrDefault(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, "/");
35+
assert !this.headerNames.isEmpty() : "Need at least one header field name for configuring HeaderValuePartitioner";
36+
}
37+
38+
public String encodePartition(SinkRecord sinkRecord) {
39+
40+
if (sinkRecord == null) {
41+
String errMsg = "Error encoding partition. SinkRecord is null.";
42+
log.error(errMsg);
43+
throw new PartitionException(errMsg);
44+
}
45+
46+
List<String> partitions = new ArrayList<>(this.headerNames.size());
47+
48+
for (String partitionHeader : this.headerNames) {
49+
boolean partitionKeyPresent = sinkRecord.headers().allWithName(partitionHeader).hasNext();
50+
if (!partitionKeyPresent) {
51+
String errMsg = String.format("Error encoding partition. No header key named %s present.", partitionHeader);
52+
log.error(errMsg);
53+
throw new PartitionException(errMsg);
54+
}
55+
Object partitionKey = sinkRecord.headers().allWithName(partitionHeader).next().value();
56+
partitions.add(String.format("%s=%s", partitionHeader, partitionKey));
57+
}
58+
return String.join(this.delim, partitions);
59+
}
60+
61+
public List<T> partitionFields() {
62+
if (this.partitionFields == null) {
63+
this.partitionFields = this.newSchemaGenerator(this.config).newPartitionFields(Utils.join(this.headerNames, ","));
64+
}
65+
66+
return this.partitionFields;
67+
}
68+
}

0 commit comments

Comments
 (0)