Skip to content

Commit 1ee7ddb

Browse files
authored
Improve kafka connector example (#136)
* Improving Kafka connectors example * Added docker-compose with local kafka cluster * Aligned expected record schema with schema generated by Flink DataGenerator example
1 parent 6891278 commit 1ee7ddb

File tree

9 files changed

+193
-101
lines changed

9 files changed

+193
-101
lines changed

java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
{
33
"PropertyGroupId": "DataGen",
44
"PropertyMap": {
5-
"records.per.second": "1000"
5+
"records.per.second": "10"
66
}
77
},
88
{
9-
"PropertyGroupId": "KinesisSink",
9+
"PropertyGroupId": "KinesisSink-DISABLE",
1010
"PropertyMap": {
1111
"stream.arn": "arn:aws:kinesis:eu-west-1:<account-id>:stream/FlinkDataGeneratorTestStream",
1212
"aws.region": "eu-west-1"
1313
}
1414
},
1515
{
16-
"PropertyGroupId": "KafkaSink-DISABLE",
16+
"PropertyGroupId": "KafkaSink",
1717
"PropertyMap": {
1818
"bootstrap.servers": "localhost:9092",
1919
"topic": "stock-prices"

java/KafkaConnectors/README.md

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Flink Kafka Source & Sink Examples
1+
## Flink Kafka Source & Sink Examples
22

33
* Flink version: 1.20
44
* Flink API: DataStream API
@@ -17,56 +17,55 @@ The JSON input follows the structure set in `Stock.java` and can be automaticall
1717

1818
![Flink Example](images/flink-example.png),
1919

20-
> In this example, the Kafka Sink uses *exactly-once* delivery guarantees. This leverages Kafka transaction under the hood, improving guarantees but
21-
> adding some overhead and increasing the effective latency of the output to the consumers of the destination Kafka topic.
22-
>
23-
> Moreover, there are failure scenarios were the Kafka Sink may still cause duplicates, even when set for exactly-once guarantees.
24-
> See [FLIP-319](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710) for more details.
25-
>
26-
> We recommend not to consider Kafka Sink *exactly-once* guarantees as a default setting for all sinks to Kafka.
27-
> Make sure you understand the implications before enabling it. Refer to the [Flink Kafka sink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#fault-tolerance) for details.
28-
2920
Note that the old
3021
[`FlinkKafkaConsumer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-sourcefunction)
3122
and [`FlinkKafkaProducer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-producer)
3223
were removed in Flink 1.17 and 1.15, respectively.
3324

34-
## Runtime configuration
25+
### Runtime configuration
3526

3627
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
3728

3829
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
3930

4031
Runtime parameters:
4132

42-
| Group ID | Key | Description |
43-
|-----------|---------------------|-----------------------------------|
44-
| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. |
45-
| `Input0` | `topic` | Source topic (default: `source`). |
46-
| `Input0` | `group.id` | Source group id (default: `my-group`) |
47-
| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. |
33+
| Group ID | Key | Description |
34+
|-----------|---------------------|---------------------------------------------|
35+
| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. |
36+
| `Input0` | `topic` | Source topic (default: `source`). |
37+
| `Input0` | `group.id` | Source group id (default: `my-group`) |
38+
| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. |
4839
| `Output0` | `topic` | Destination topic (default: `destination`). |
49-
| `Output0` | `transaction.timeout.ms` | Sink transaction timeout (default: `1000`) |
5040

5141
If you are connecting with no-auth and no SSL, above will work. Else you need additional configuration for both source and sink.
5242

53-
### For IAM Auth
43+
#### For IAM Auth
5444

5545
When using IAM Auth, the following Runtime Properties are expected at the Group ID `AuthProperties`:
56-
* `sasl.mechanism` AWS_MSK_IAM
57-
* `sasl.client.callback.handler.class` software.amazon.msk.auth.iam.IAMClientCallbackHandler
58-
* `sasl.jaas.config` "software.amazon.msk.auth.iam.IAMLoginModule required;"
59-
* `security.protocol` SASL_SSL
60-
* `ssl.truststore.location` /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
61-
* `ssl.truststore.password` changeit
46+
* `sasl.mechanism` = `AWS_MSK_IAM`
47+
* `sasl.client.callback.handler.class` = `software.amazon.msk.auth.iam.IAMClientCallbackHandler`
48+
* `sasl.jaas.config` = `software.amazon.msk.auth.iam.IAMLoginModule required;`
49+
* `security.protocol` = `SASL_SSL`
50+
51+
The properties in the `AuthProperties` group are passed to both Kafka Source and Kafka Sink configurations.
52+
53+
### Data Generator
6254

55+
To generate the JSON data expected by this example, you can use the Python [data-generator](../../python/data-generator)
56+
script you can find in the Python examples.
6357

64-
## Running locally in IntelliJ
58+
Alternatively, you can use the [Flink DataGenerator](../FlinkDataGenerator) application, which generates the same schema.
6559

66-
> Due to MSK VPC networking, to run this example on your machine you need to set up network connectivity to the VPC where MSK is deployed, for example with a VPN.
67-
> Alternatively, you can use a local Kafka installation, for example in a container.
68-
> Setting up the connectivity or a local containerized Kafka depends on your set up and is out of scope for this example.
60+
### Running locally in IntelliJ
61+
62+
To run the example locally we provide a [docker-compose](docker/docker-compose.yml) file which starts a local Kafka cluster
63+
with 3 brokers on `locakhost:9092`, `locakhost:9093`, and `locakhost:9094`.
64+
It also runs a Kafka UI responding on `http://localhost:8080`.
6965

7066
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
7167

68+
Note that you can run both the [Flink DataGenerator](../FlinkDataGenerator) and this example locally, in the IDE, to observe
69+
data being consumed and re-published to Kafka.
70+
7271
See [Running examples locally](../running-examples-locally.md) for details.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
version: '3.8'
2+
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:7.4.0
6+
environment:
7+
ZOOKEEPER_CLIENT_PORT: 2181
8+
ZOOKEEPER_TICK_TIME: 2000
9+
10+
kafka1:
11+
image: confluentinc/cp-kafka:7.4.0
12+
depends_on:
13+
- zookeeper
14+
ports:
15+
- "9092:9092"
16+
environment:
17+
KAFKA_BROKER_ID: 1
18+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
20+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
21+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
22+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
23+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
24+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
25+
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
26+
KAFKA_NUM_PARTITIONS: 3
27+
28+
kafka2:
29+
image: confluentinc/cp-kafka:7.4.0
30+
depends_on:
31+
- zookeeper
32+
ports:
33+
- "9093:9093"
34+
environment:
35+
KAFKA_BROKER_ID: 2
36+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
37+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
38+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9093
39+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
40+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
41+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
42+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
43+
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
44+
KAFKA_NUM_PARTITIONS: 3
45+
46+
kafka3:
47+
image: confluentinc/cp-kafka:7.4.0
48+
depends_on:
49+
- zookeeper
50+
ports:
51+
- "9094:9094"
52+
environment:
53+
KAFKA_BROKER_ID: 3
54+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
55+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
56+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29094,PLAINTEXT_HOST://0.0.0.0:9094
57+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
58+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
59+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
60+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
61+
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
62+
KAFKA_NUM_PARTITIONS: 3
63+
64+
kafka-ui:
65+
image: provectuslabs/kafka-ui:latest
66+
depends_on:
67+
- kafka1
68+
- kafka2
69+
- kafka3
70+
ports:
71+
- "8080:8080"
72+
environment:
73+
KAFKA_CLUSTERS_0_NAME: local
74+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29093,kafka3:29094
-34.4 KB
Binary file not shown.
-77.3 KB
Binary file not shown.

java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.amazonaws.services.msf;
22

33
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
4+
import com.amazonaws.services.msf.domain.StockPrice;
45
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
56
import org.apache.flink.api.common.serialization.DeserializationSchema;
6-
import org.apache.flink.api.common.serialization.SerializationSchema;
77
import org.apache.flink.connector.base.DeliveryGuarantee;
88
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
99
import org.apache.flink.connector.kafka.sink.KafkaSink;
@@ -76,7 +76,7 @@ private static <T> KafkaSink<T> createKafkaSink(Properties outputProperties, Kaf
7676
.setBootstrapServers(outputProperties.getProperty("bootstrap.servers"))
7777
.setKafkaProducerConfig(outputProperties)
7878
.setRecordSerializer(recordSerializationSchema)
79-
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
79+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
8080
.build();
8181
}
8282

@@ -89,7 +89,13 @@ private static Properties mergeProperties(Properties properties, Properties auth
8989
public static void main(String[] args) throws Exception {
9090
// Set up the streaming execution environment
9191
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
92-
env.enableCheckpointing(1000);
92+
93+
if( isLocal(env)) {
94+
env.enableCheckpointing(10_000);
95+
env.setParallelism(3);
96+
}
97+
98+
9399

94100
// Load the application properties
95101
final Map<String, Properties> applicationProperties = loadApplicationProperties(env);
@@ -104,10 +110,10 @@ public static void main(String[] args) throws Exception {
104110
Properties outputProperties = mergeProperties(applicationProperties.get("Output0"), authProperties);
105111

106112
// Create and add the Source
107-
KafkaSource<Stock> source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(Stock.class));
108-
DataStream<Stock> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
113+
KafkaSource<StockPrice> source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(StockPrice.class));
114+
DataStream<StockPrice> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
109115

110-
KafkaRecordSerializationSchema<Stock> recordSerializationSchema = KafkaRecordSerializationSchema.<Stock>builder()
116+
KafkaRecordSerializationSchema<StockPrice> recordSerializationSchema = KafkaRecordSerializationSchema.<StockPrice>builder()
111117
.setTopic(outputProperties.getProperty("topic", DEFAULT_SINK_TOPIC))
112118
// Use a field as kafka record key
113119
// Define no keySerializationSchema to publish kafka records with no key
@@ -118,7 +124,7 @@ public static void main(String[] args) throws Exception {
118124

119125

120126
// Create and add the Sink
121-
KafkaSink<Stock> sink = createKafkaSink(outputProperties, recordSerializationSchema);
127+
KafkaSink<StockPrice> sink = createKafkaSink(outputProperties, recordSerializationSchema);
122128
input.sinkTo(sink);
123129

124130
env.execute("Flink Kafka Source and Sink examples");

java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/Stock.java

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.amazonaws.services.msf.domain;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
5+
import java.util.Objects;
6+
7+
public class StockPrice {
8+
// This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the
9+
// appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization
10+
@JsonProperty("event_time")
11+
private String eventTime;
12+
private String ticker;
13+
private float price;
14+
15+
public StockPrice() {}
16+
17+
public StockPrice(String eventTime, String ticker, float price) {
18+
this.eventTime = eventTime;
19+
this.ticker = ticker;
20+
this.price = price;
21+
}
22+
23+
public String getEventTime() {
24+
return eventTime;
25+
}
26+
27+
public void setEventTime(String eventTime) {
28+
this.eventTime = eventTime;
29+
}
30+
31+
public String getTicker() {
32+
return ticker;
33+
}
34+
35+
public void setTicker(String ticker) {
36+
this.ticker = ticker;
37+
}
38+
39+
public float getPrice() {
40+
return price;
41+
}
42+
43+
public void setPrice(float price) {
44+
this.price = price;
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
StockPrice stock = (StockPrice) o;
52+
return Float.compare(stock.price, price) == 0 &&
53+
Objects.equals(eventTime, stock.eventTime) &&
54+
Objects.equals(ticker, stock.ticker);
55+
}
56+
57+
@Override
58+
public int hashCode() {
59+
return Objects.hash(eventTime, ticker, price);
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return "Stock{" +
65+
"event_time='" + eventTime + '\'' +
66+
", ticker='" + ticker + '\'' +
67+
", price=" + price +
68+
'}';
69+
}
70+
}

java/KafkaConnectors/src/main/resources/flink-application-properties-dev.json

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,16 @@
22
{
33
"PropertyGroupId": "Input0",
44
"PropertyMap": {
5-
"bootstrap.servers": "<BootstrapServers>",
6-
"topic": "<Topic>",
7-
"group.id": "<ConsumerGroupID>"
5+
"bootstrap.servers": "localhost:9092",
6+
"topic": "stock-prices",
7+
"group.id": "flink-kafka-example"
88
}
99
},
1010
{
1111
"PropertyGroupId": "Output0",
1212
"PropertyMap": {
13-
"bootstrap.servers": "<BootstrapServers>",
14-
"topic": "<Topic>",
15-
"transaction.timeout.ms": "1000"
16-
}
17-
},
18-
{
19-
"PropertyGroupId": "AuthProperties",
20-
"PropertyMap": {
21-
"sasl.mechanism": "AWS_MSK_IAM",
22-
"sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
23-
"sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
24-
"security.protocol": "SASL_SSL",
25-
"ssl.truststore.location": "/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts",
26-
"ssl.truststore.password": "changeit"
13+
"bootstrap.servers": "localhost:9092",
14+
"topic": "stock-prices-out"
2715
}
2816
}
2917
]

0 commit comments

Comments
 (0)