Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
{
"PropertyGroupId": "DataGen",
"PropertyMap": {
"records.per.second": "1000"
"records.per.second": "10"
}
},
{
"PropertyGroupId": "KinesisSink",
"PropertyGroupId": "KinesisSink-DISABLE",
"PropertyMap": {
"stream.arn": "arn:aws:kinesis:eu-west-1:<account-id>:stream/FlinkDataGeneratorTestStream",
"aws.region": "eu-west-1"
}
},
{
"PropertyGroupId": "KafkaSink-DISABLE",
"PropertyGroupId": "KafkaSink",
"PropertyMap": {
"bootstrap.servers": "localhost:9092",
"topic": "stock-prices"
Expand Down
57 changes: 28 additions & 29 deletions java/KafkaConnectors/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Flink Kafka Source & Sink Examples
## Flink Kafka Source & Sink Examples

* Flink version: 1.20
* Flink API: DataStream API
Expand All @@ -17,56 +17,55 @@ The JSON input follows the structure set in `Stock.java` and can be automaticall

![Flink Example](images/flink-example.png),

> In this example, the Kafka Sink uses *exactly-once* delivery guarantees. This leverages Kafka transaction under the hood, improving guarantees but
> adding some overhead and increasing the effective latency of the output to the consumers of the destination Kafka topic.
>
> Moreover, there are failure scenarios were the Kafka Sink may still cause duplicates, even when set for exactly-once guarantees.
> See [FLIP-319](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710) for more details.
>
> We recommend not to consider Kafka Sink *exactly-once* guarantees as a default setting for all sinks to Kafka.
> Make sure you understand the implications before enabling it. Refer to the [Flink Kafka sink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#fault-tolerance) for details.

Note that the old
[`FlinkKafkaConsumer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-sourcefunction)
and [`FlinkKafkaProducer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-producer)
were removed in Flink 1.17 and 1.15, respectively.

## Runtime configuration
### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.

When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.

Runtime parameters:

| Group ID | Key | Description |
|-----------|---------------------|-----------------------------------|
| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. |
| `Input0` | `topic` | Source topic (default: `source`). |
| `Input0` | `group.id` | Source group id (default: `my-group`) |
| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. |
| Group ID | Key | Description |
|-----------|---------------------|---------------------------------------------|
| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. |
| `Input0` | `topic` | Source topic (default: `source`). |
| `Input0` | `group.id` | Source group id (default: `my-group`) |
| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. |
| `Output0` | `topic` | Destination topic (default: `destination`). |
| `Output0` | `transaction.timeout.ms` | Sink transaction timeout (default: `1000`) |

If you are connecting with no-auth and no SSL, above will work. Else you need additional configuration for both source and sink.

### For IAM Auth
#### For IAM Auth

When using IAM Auth, the following Runtime Properties are expected at the Group ID `AuthProperties`:
* `sasl.mechanism` AWS_MSK_IAM
* `sasl.client.callback.handler.class` software.amazon.msk.auth.iam.IAMClientCallbackHandler
* `sasl.jaas.config` "software.amazon.msk.auth.iam.IAMLoginModule required;"
* `security.protocol` SASL_SSL
* `ssl.truststore.location` /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
* `ssl.truststore.password` changeit
* `sasl.mechanism` = `AWS_MSK_IAM`
* `sasl.client.callback.handler.class` = `software.amazon.msk.auth.iam.IAMClientCallbackHandler`
* `sasl.jaas.config` = `software.amazon.msk.auth.iam.IAMLoginModule required;`
* `security.protocol` = `SASL_SSL`

The properties in the `AuthProperties` group are passed to both Kafka Source and Kafka Sink configurations.

### Data Generator

To generate the JSON data expected by this example, you can use the Python [data-generator](../../python/data-generator)
script you can find in the Python examples.

## Running locally in IntelliJ
Alternatively, you can use the [Flink DataGenerator](../FlinkDataGenerator) application, which generates the same schema.

> Due to MSK VPC networking, to run this example on your machine you need to set up network connectivity to the VPC where MSK is deployed, for example with a VPN.
> Alternatively, you can use a local Kafka installation, for example in a container.
> Setting up the connectivity or a local containerized Kafka depends on your set up and is out of scope for this example.
### Running locally in IntelliJ

To run the example locally we provide a [docker-compose](docker/docker-compose.yml) file which starts a local Kafka cluster
with 3 brokers on `locakhost:9092`, `locakhost:9093`, and `locakhost:9094`.
It also runs a Kafka UI responding on `http://localhost:8080`.

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

Note that you can run both the [Flink DataGenerator](../FlinkDataGenerator) and this example locally, in the IDE, to observe
data being consumed and re-published to Kafka.

See [Running examples locally](../running-examples-locally.md) for details.
74 changes: 74 additions & 0 deletions java/KafkaConnectors/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka1:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3

kafka2:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3

kafka3:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29094,PLAINTEXT_HOST://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3

kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29093,kafka3:29094
Binary file not shown.
Binary file removed java/KafkaConnectors/images/runConfiguration.png
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.amazonaws.services.msf;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import com.amazonaws.services.msf.domain.StockPrice;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
Expand Down Expand Up @@ -76,7 +76,7 @@ private static <T> KafkaSink<T> createKafkaSink(Properties outputProperties, Kaf
.setBootstrapServers(outputProperties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(outputProperties)
.setRecordSerializer(recordSerializationSchema)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}

Expand All @@ -89,7 +89,13 @@ private static Properties mergeProperties(Properties properties, Properties auth
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);

if( isLocal(env)) {
env.enableCheckpointing(10_000);
env.setParallelism(3);
}



// Load the application properties
final Map<String, Properties> applicationProperties = loadApplicationProperties(env);
Expand All @@ -104,10 +110,10 @@ public static void main(String[] args) throws Exception {
Properties outputProperties = mergeProperties(applicationProperties.get("Output0"), authProperties);

// Create and add the Source
KafkaSource<Stock> source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(Stock.class));
DataStream<Stock> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
KafkaSource<StockPrice> source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(StockPrice.class));
DataStream<StockPrice> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");

KafkaRecordSerializationSchema<Stock> recordSerializationSchema = KafkaRecordSerializationSchema.<Stock>builder()
KafkaRecordSerializationSchema<StockPrice> recordSerializationSchema = KafkaRecordSerializationSchema.<StockPrice>builder()
.setTopic(outputProperties.getProperty("topic", DEFAULT_SINK_TOPIC))
// Use a field as kafka record key
// Define no keySerializationSchema to publish kafka records with no key
Expand All @@ -118,7 +124,7 @@ public static void main(String[] args) throws Exception {


// Create and add the Sink
KafkaSink<Stock> sink = createKafkaSink(outputProperties, recordSerializationSchema);
KafkaSink<StockPrice> sink = createKafkaSink(outputProperties, recordSerializationSchema);
input.sinkTo(sink);

env.execute("Flink Kafka Source and Sink examples");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.amazonaws.services.msf.domain;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class StockPrice {
// This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the
// appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization
@JsonProperty("event_time")
private String eventTime;
private String ticker;
private float price;

public StockPrice() {}

public StockPrice(String eventTime, String ticker, float price) {
this.eventTime = eventTime;
this.ticker = ticker;
this.price = price;
}

public String getEventTime() {
return eventTime;
}

public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}

public String getTicker() {
return ticker;
}

public void setTicker(String ticker) {
this.ticker = ticker;
}

public float getPrice() {
return price;
}

public void setPrice(float price) {
this.price = price;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StockPrice stock = (StockPrice) o;
return Float.compare(stock.price, price) == 0 &&
Objects.equals(eventTime, stock.eventTime) &&
Objects.equals(ticker, stock.ticker);
}

@Override
public int hashCode() {
return Objects.hash(eventTime, ticker, price);
}

@Override
public String toString() {
return "Stock{" +
"event_time='" + eventTime + '\'' +
", ticker='" + ticker + '\'' +
", price=" + price +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,16 @@
{
"PropertyGroupId": "Input0",
"PropertyMap": {
"bootstrap.servers": "<BootstrapServers>",
"topic": "<Topic>",
"group.id": "<ConsumerGroupID>"
"bootstrap.servers": "localhost:9092",
"topic": "stock-prices",
"group.id": "flink-kafka-example"
}
},
{
"PropertyGroupId": "Output0",
"PropertyMap": {
"bootstrap.servers": "<BootstrapServers>",
"topic": "<Topic>",
"transaction.timeout.ms": "1000"
}
},
{
"PropertyGroupId": "AuthProperties",
"PropertyMap": {
"sasl.mechanism": "AWS_MSK_IAM",
"sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
"sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
"security.protocol": "SASL_SSL",
"ssl.truststore.location": "/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts",
"ssl.truststore.password": "changeit"
"bootstrap.servers": "localhost:9092",
"topic": "stock-prices-out"
}
}
]