diff --git a/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json b/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json index 03a4ae16..0e5e34b6 100644 --- a/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json +++ b/java/FlinkDataGenerator/src/main/resources/flink-application-properties-dev.json @@ -2,18 +2,18 @@ { "PropertyGroupId": "DataGen", "PropertyMap": { - "records.per.second": "1000" + "records.per.second": "10" } }, { - "PropertyGroupId": "KinesisSink", + "PropertyGroupId": "KinesisSink-DISABLE", "PropertyMap": { "stream.arn": "arn:aws:kinesis:eu-west-1::stream/FlinkDataGeneratorTestStream", "aws.region": "eu-west-1" } }, { - "PropertyGroupId": "KafkaSink-DISABLE", + "PropertyGroupId": "KafkaSink", "PropertyMap": { "bootstrap.servers": "localhost:9092", "topic": "stock-prices" diff --git a/java/KafkaConnectors/README.md b/java/KafkaConnectors/README.md index d5e99838..65593dd2 100644 --- a/java/KafkaConnectors/README.md +++ b/java/KafkaConnectors/README.md @@ -1,4 +1,4 @@ -# Flink Kafka Source & Sink Examples +## Flink Kafka Source & Sink Examples * Flink version: 1.20 * Flink API: DataStream API @@ -17,21 +17,12 @@ The JSON input follows the structure set in `Stock.java` and can be automaticall ![Flink Example](images/flink-example.png), -> In this example, the Kafka Sink uses *exactly-once* delivery guarantees. This leverages Kafka transaction under the hood, improving guarantees but -> adding some overhead and increasing the effective latency of the output to the consumers of the destination Kafka topic. -> -> Moreover, there are failure scenarios were the Kafka Sink may still cause duplicates, even when set for exactly-once guarantees. -> See [FLIP-319](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710) for more details. -> -> We recommend not to consider Kafka Sink *exactly-once* guarantees as a default setting for all sinks to Kafka. -> Make sure you understand the implications before enabling it. Refer to the [Flink Kafka sink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#fault-tolerance) for details. - Note that the old [`FlinkKafkaConsumer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-sourcefunction) and [`FlinkKafkaProducer`](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/#kafka-producer) were removed in Flink 1.17 and 1.15, respectively. -## Runtime configuration +### Runtime configuration When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. @@ -39,34 +30,42 @@ When running locally, the configuration is read from the [`resources/flink-appli Runtime parameters: -| Group ID | Key | Description | -|-----------|---------------------|-----------------------------------| -| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. | -| `Input0` | `topic` | Source topic (default: `source`). | -| `Input0` | `group.id` | Source group id (default: `my-group`) | -| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. | +| Group ID | Key | Description | +|-----------|---------------------|---------------------------------------------| +| `Input0` | `bootstrap.servers` | Source cluster boostrap servers. | +| `Input0` | `topic` | Source topic (default: `source`). | +| `Input0` | `group.id` | Source group id (default: `my-group`) | +| `Output0` | `bootstrap.servers` | Destination cluster bootstrap servers. | | `Output0` | `topic` | Destination topic (default: `destination`). | -| `Output0` | `transaction.timeout.ms` | Sink transaction timeout (default: `1000`) | If you are connecting with no-auth and no SSL, above will work. Else you need additional configuration for both source and sink. -### For IAM Auth +#### For IAM Auth When using IAM Auth, the following Runtime Properties are expected at the Group ID `AuthProperties`: -* `sasl.mechanism` AWS_MSK_IAM -* `sasl.client.callback.handler.class` software.amazon.msk.auth.iam.IAMClientCallbackHandler -* `sasl.jaas.config` "software.amazon.msk.auth.iam.IAMLoginModule required;" -* `security.protocol` SASL_SSL -* `ssl.truststore.location` /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts -* `ssl.truststore.password` changeit +* `sasl.mechanism` = `AWS_MSK_IAM` +* `sasl.client.callback.handler.class` = `software.amazon.msk.auth.iam.IAMClientCallbackHandler` +* `sasl.jaas.config` = `software.amazon.msk.auth.iam.IAMLoginModule required;` +* `security.protocol` = `SASL_SSL` + +The properties in the `AuthProperties` group are passed to both Kafka Source and Kafka Sink configurations. + +### Data Generator +To generate the JSON data expected by this example, you can use the Python [data-generator](../../python/data-generator) +script you can find in the Python examples. -## Running locally in IntelliJ +Alternatively, you can use the [Flink DataGenerator](../FlinkDataGenerator) application, which generates the same schema. -> Due to MSK VPC networking, to run this example on your machine you need to set up network connectivity to the VPC where MSK is deployed, for example with a VPN. -> Alternatively, you can use a local Kafka installation, for example in a container. -> Setting up the connectivity or a local containerized Kafka depends on your set up and is out of scope for this example. +### Running locally in IntelliJ + +To run the example locally we provide a [docker-compose](docker/docker-compose.yml) file which starts a local Kafka cluster +with 3 brokers on `locakhost:9092`, `locakhost:9093`, and `locakhost:9094`. +It also runs a Kafka UI responding on `http://localhost:8080`. You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Note that you can run both the [Flink DataGenerator](../FlinkDataGenerator) and this example locally, in the IDE, to observe +data being consumed and re-published to Kafka. + See [Running examples locally](../running-examples-locally.md) for details. diff --git a/java/KafkaConnectors/docker/docker-compose.yml b/java/KafkaConnectors/docker/docker-compose.yml new file mode 100644 index 00000000..14a518aa --- /dev/null +++ b/java/KafkaConnectors/docker/docker-compose.yml @@ -0,0 +1,74 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka1: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + + kafka2: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + ports: + - "9093:9093" + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + + kafka3: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + ports: + - "9094:9094" + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29094,PLAINTEXT_HOST://0.0.0.0:9094 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + depends_on: + - kafka1 + - kafka2 + - kafka3 + ports: + - "8080:8080" + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29093,kafka3:29094 diff --git a/java/KafkaConnectors/images/flink-msk-serverless-example.png b/java/KafkaConnectors/images/flink-msk-serverless-example.png deleted file mode 100644 index 40b536fa..00000000 Binary files a/java/KafkaConnectors/images/flink-msk-serverless-example.png and /dev/null differ diff --git a/java/KafkaConnectors/images/runConfiguration.png b/java/KafkaConnectors/images/runConfiguration.png deleted file mode 100644 index beb1d7f7..00000000 Binary files a/java/KafkaConnectors/images/runConfiguration.png and /dev/null differ diff --git a/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java b/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java index 7fb92069..b45ff353 100644 --- a/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java +++ b/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java @@ -1,9 +1,9 @@ package com.amazonaws.services.msf; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -76,7 +76,7 @@ private static KafkaSink createKafkaSink(Properties outputProperties, Kaf .setBootstrapServers(outputProperties.getProperty("bootstrap.servers")) .setKafkaProducerConfig(outputProperties) .setRecordSerializer(recordSerializationSchema) - .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); } @@ -89,7 +89,13 @@ private static Properties mergeProperties(Properties properties, Properties auth public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(1000); + + if( isLocal(env)) { + env.enableCheckpointing(10_000); + env.setParallelism(3); + } + + // Load the application properties final Map applicationProperties = loadApplicationProperties(env); @@ -104,10 +110,10 @@ public static void main(String[] args) throws Exception { Properties outputProperties = mergeProperties(applicationProperties.get("Output0"), authProperties); // Create and add the Source - KafkaSource source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(Stock.class)); - DataStream input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source"); + KafkaSource source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(StockPrice.class)); + DataStream input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source"); - KafkaRecordSerializationSchema recordSerializationSchema = KafkaRecordSerializationSchema.builder() + KafkaRecordSerializationSchema recordSerializationSchema = KafkaRecordSerializationSchema.builder() .setTopic(outputProperties.getProperty("topic", DEFAULT_SINK_TOPIC)) // Use a field as kafka record key // Define no keySerializationSchema to publish kafka records with no key @@ -118,7 +124,7 @@ public static void main(String[] args) throws Exception { // Create and add the Sink - KafkaSink sink = createKafkaSink(outputProperties, recordSerializationSchema); + KafkaSink sink = createKafkaSink(outputProperties, recordSerializationSchema); input.sinkTo(sink); env.execute("Flink Kafka Source and Sink examples"); diff --git a/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/Stock.java b/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/Stock.java deleted file mode 100644 index dc84f43f..00000000 --- a/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/Stock.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.amazonaws.services.msf; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -public class Stock { - @JsonProperty("event_time") - private String eventTime; - private String ticker; - private float price; - - public Stock() {} - - public String getEventTime() { - return eventTime; - } - - public void setEventTime(String eventTime) { - this.eventTime = eventTime; - } - - public String getTicker() { - return ticker; - } - - public void setTicker(String ticker) { - this.ticker = ticker; - } - - public float getPrice() { - return price; - } - - public void setPrice(float price) { - this.price = price; - } - - @Override - public String toString() { - return "Stock{" + - "event_time='" + eventTime + '\'' + - ", ticker='" + ticker + '\'' + - ", price=" + price + - '}'; - } -} diff --git a/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 00000000..f1590365 --- /dev/null +++ b/java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,70 @@ +package com.amazonaws.services.msf.domain; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class StockPrice { + // This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the + // appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization + @JsonProperty("event_time") + private String eventTime; + private String ticker; + private float price; + + public StockPrice() {} + + public StockPrice(String eventTime, String ticker, float price) { + this.eventTime = eventTime; + this.ticker = ticker; + this.price = price; + } + + public String getEventTime() { + return eventTime; + } + + public void setEventTime(String eventTime) { + this.eventTime = eventTime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public float getPrice() { + return price; + } + + public void setPrice(float price) { + this.price = price; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StockPrice stock = (StockPrice) o; + return Float.compare(stock.price, price) == 0 && + Objects.equals(eventTime, stock.eventTime) && + Objects.equals(ticker, stock.ticker); + } + + @Override + public int hashCode() { + return Objects.hash(eventTime, ticker, price); + } + + @Override + public String toString() { + return "Stock{" + + "event_time='" + eventTime + '\'' + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/KafkaConnectors/src/main/resources/flink-application-properties-dev.json b/java/KafkaConnectors/src/main/resources/flink-application-properties-dev.json index 91f2034d..54265a2f 100644 --- a/java/KafkaConnectors/src/main/resources/flink-application-properties-dev.json +++ b/java/KafkaConnectors/src/main/resources/flink-application-properties-dev.json @@ -2,28 +2,16 @@ { "PropertyGroupId": "Input0", "PropertyMap": { - "bootstrap.servers": "", - "topic": "", - "group.id": "" + "bootstrap.servers": "localhost:9092", + "topic": "stock-prices", + "group.id": "flink-kafka-example" } }, { "PropertyGroupId": "Output0", "PropertyMap": { - "bootstrap.servers": "", - "topic": "", - "transaction.timeout.ms": "1000" - } - }, - { - "PropertyGroupId": "AuthProperties", - "PropertyMap": { - "sasl.mechanism": "AWS_MSK_IAM", - "sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", - "sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", - "security.protocol": "SASL_SSL", - "ssl.truststore.location": "/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts", - "ssl.truststore.password": "changeit" + "bootstrap.servers": "localhost:9092", + "topic": "stock-prices-out" } } ] \ No newline at end of file