diff --git a/java/AvroGlueSchemaRegistryKafka/README.md b/java/AvroGlueSchemaRegistryKafka/README.md
index ad4875ef..af0420b1 100644
--- a/java/AvroGlueSchemaRegistryKafka/README.md
+++ b/java/AvroGlueSchemaRegistryKafka/README.md
@@ -1,82 +1,129 @@
## AVRO serialization in KafkaSource and KafkaSink using AWS Glue Schema Registry
-* Flink version: 1.15
+This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
+[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (GSR).
+
+* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
+* Connectors: Kafka connector, DataGenerator
-This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
-[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html).
+The example contains two Flink applications:
+1. a [producer](./producer) job, which generates random temperature samples and publishes to Kafka as AVRO using GSR.
+2. a [consumer](./consumer) job, which reads temperature records from the same topic, using GSR.
-This example uses AVRO generated classes (more details, [below](#Using_AVRO-generated_classes))
+Both applications use AVRO-specific records based on the schema definitions provided at compile time.
+The record classes are generated during the build process based on the AVRO IDL (`.avdl`) you can find in the resources
+folder of both jobs. For simplicity, the schema definition is repeated in both jobs.
-The reader's schema definition, for the source, and the writer's schema definition, for the sink, are provided as
-AVRO IDL (`.avdl`) in [./src/main/resources/avro](./src/main/resources/avro).
+The two jobs are designed to run as separate Amazon Managed Service for Apache Flink applications, connecting to the
+same Amazon Managed Streaming for Kafka (MSK) cluster.
+The default configuration uses unauthenticated connection to MSK. The example can be extended to implement any supported
+MSK authentication scheme.
-A KafkaSource produces a stream of AVRO data objects (SpecificRecords), fetching the writer's schema from AWS Glue
-Schema Registry. The AVRO Kafka message value must have been serialized using AWS Glue Schema Registry.
+### Prerequisites
-A KafkaSink serializes AVRO data objects as Kafka message value, and a String, converted to bytes as UTF-8, as Kafka
-message key.
+To run the two Managed Flink applications you need to set up the following prerequisites:
-## Flink compatibility
+1. An MSK cluster
+ - Create the topic `temperature-samples` or enable auto topic creation
+ - Allow unauthenticated access (or modify the application to support the configured authentication scheme)
+2. Create a Registry named `temperature-schema-registry` in Glue Schema Registry, in the same region
+3. ⚠️ Create a VPC Endpoint for Glue in the VPC where the Managed Flink applications are attached.
+ Without VPCE an application connected to a VPC may not be able to connector to a service endpoint.
-**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink
+### Create the Amazon Managed Service for Apache Flink applications
-### Flink API compatibility
+Create two Managed Flink applications, one for the producer and the other for the consumer.
+1. Build both jobs by running `mvn package` in the directory of the example. This will build two JARs in the `target` subfolder of the producer and consumer.
+2. Upload both JARs to an S3 bucket and use them as application code, for producer and consumer respectively..
+3. Application configuration
+ * Attach both applications to a VPC with access to the MSK cluster and ensure the Security Group allows access to the MSK cluster.
+ For simplicity, to run the example we suggest to use for both applications the same VPC, same subnets, and same Security Group as the MSK cluster
+ * Ensure the applications have permissions to access Glue Schema Registry. For the sake of this example you can attach
+ the policy `AWSGlueSchemaRegistryFullAccess` to the producer application's IAM Role, and the policy `AWSGlueSchemaRegistryReadonlyAccess`
+ to the consumer's Role.
+ * Set up the Runtime properties of the two applications as described in the following sections.
-This example shows how to use AWS Glue Schema Registry with the Flink Java DataStream API.
+### Runtime configuration
-It uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, deprecated
-with Flink 1.15).
+The two applications expect different configurations.
+When running locally the configurations are fetched from the `flink-application-properties-dev.json` files in the resources
+folder of each job.
-At the moment, no format provider is available for the Table API.
+When running on Managed Flink these files are ignored and the configuration must be passed using the Runtime properties
+as part of the configuration of each application.
-## Notes about using AVRO with Apache Flink
+All parameters are case-sensitive.
-### AVRO-generated classes
+#### Producer runtime parameters
-This project uses classes generated at built-time as data objects.
+| Group ID | Key | Description |
+|-------------------|-----------------------|----------------------------------------------------------------|
+| `Output0` | `bootstrap.servers` | Kafka bootstrap servers |
+| `Output0` | `topic` | Kafka topic name for temperature samples |
+| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
+| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |
+| `DataGen` | `samples.per.second` | (optional) Rate of sample generation per second (default: 100) |
-As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
-code.
-AVRO Maven plugin generates the Java classes (source code) at build-time, during the
+
+#### Consumer runtime parameters
+
+| Group ID | Key | Description |
+|-------------------|-----------------------|-------------------------------------------------|
+| `Input0` | `bootstrap.servers` | Kafka bootstrap servers |
+| `Input0` | `topic` | Kafka topic name for temperature samples |
+| `Input0` | `group.id` | Kafka consumer group ID |
+| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
+| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |
+
+
+### Running the applications locally
+
+A [docker-compose](docker/docker-compose.yml) file is provided to run a local Kafka cluster for local development.
+The default configurations use this local cluster.
+
+When running locally the jobs will use the actual Glue Schema Registry.
+Make sure the machine where you are developing has an authenticated AWS CLI profile with permissions to use GSR. Use the
+AWS Plugin of your IDE to make the application run with a specific AWS profile.
+
+See [Running examples locally](../running-examples-locally.md) for further details.
+
+
+### Notes about using AVRO with Apache Flink
+
+#### AVRO-generated classes
+
+This project uses classes generated at build-time as data objects.
+
+As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
+code. The AVRO Maven plugin generates the Java classes (source code) at build-time, during the
[`generate-source`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase.
The generated classes are written into `./target/generated-sources/avro` directory and should **not** be committed with
-the project source.
-
-This way, the only dependency is on the schema definition file(s).
+the project source. This way, the only dependency is on the schema definition file(s).
If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build.
Code generation is supported by all common IDEs like IntelliJ.
-If your IDE does not see the AVRO classes (`TemperatureSample` and `RoomTemperature`) when you import the project for the
-first time, you may manually run `mvn generate-sources` once of force source code generation from the IDE.
+If your IDE does not see the AVRO classes (`TemperatureSample`) when you import the project for the
+first time, you may manually run `mvn generate-sources` once or force source code generation from the IDE.
-### AVRO-generated classes (SpecificRecord) in Apache Flink
+#### AVRO-generated classes (SpecificRecord) in Apache Flink
Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the
Flink state, has an additional benefit.
Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)
-these objects, without risking of falling back to Kryo.
-
-### AVRO and AWS Glue Schema Registry dependencies
-
-The following dependencies related to AVRO and AWS Glue Schema Registry are included (for FLink 1.15.2):
-
-1. `org.apache.flink:flink-avro-glue-schema-registry_2.12:1.15.2` - Support for AWS Glue Schema Registry SerDe
-2. `org.apache.avro:avro:1.10.2` - Overrides AVRO 1.10.0, transitively included.
-
-The project also includes `org.apache.flink:flink-avro:1.15.2`.
-This is already a transitive dependency from the Glue Schema Registry SerDe and is defined explicitly only for clarity.
+these objects, without the risk of falling back to Kryo.
-Note that we are overriding AVRO 1.10.0 with 1.10.2.
-This minor version upgrade does not break the internal API, and includes some bug fixes introduced with
-AVRO [1.10.1](https://github.com/apache/avro/releases/tag/release-1.10.1)
-and [1.10.2](https://github.com/apache/avro/releases/tag/release-1.10.2).
+### Common issues
-### Running in IntelliJ
+If the application fails to call the Glue Schema Registry API for any reasons, the job gets trapped in a fail-and-restart
+loop. The exception says it cannot fetch or register a schema version.
-To start the Flink job in IntelliJ edit the Run/Debug configuration enabling 'Add dependencies with "provided" scope to
-the classpath'.
\ No newline at end of file
+The inability to use GSR may be caused by:
+* Lack of permissions to access GSR --> add `AWSGlueSchemaRegistryFullAccess` or `AWSGlueSchemaRegistryReadonlyAccess` policies to the application IAM Role
+* Unable to reach the Glue endpoint --> create a Glue VPC Endpoint in the application VPC
+* The Registry does not exist --> create a registry with the configured name (`temperature-schema-registry` by default)
+* Misconfiguration --> ensure the registry name and region passed to the application match your setup
\ No newline at end of file
diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml b/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml
new file mode 100644
index 00000000..c9a9c1bf
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml
@@ -0,0 +1,99 @@
+
+
+ 4.0.0
+
+
+ com.amazonaws
+ avro-gsr-kafka
+ 1.0-SNAPSHOT
+
+
+ avro-gsr-kafka-consumer
+
+
+
+
+ org.apache.flink
+ flink-java
+
+
+ org.apache.flink
+ flink-streaming-java
+
+
+ org.apache.flink
+ flink-clients
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+
+
+ org.apache.flink
+ flink-connector-kafka
+
+
+
+
+ org.apache.flink
+ flink-avro-glue-schema-registry
+
+
+
+
+ org.apache.flink
+ flink-avro
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java
new file mode 100644
index 00000000..bd0618e8
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java
@@ -0,0 +1,118 @@
+package com.amazonaws.services.msf;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import com.amazonaws.services.msf.avro.TemperatureSample;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+public class StreamingJob {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
+
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+
+ private static boolean isLocal(StreamExecutionEnvironment env) {
+ return env instanceof LocalStreamEnvironment;
+ }
+
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (isLocal(env)) {
+ LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ Objects.requireNonNull(StreamingJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath());
+ } else {
+ LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ private static KafkaSource createKafkaSource(
+ Properties sourceProperties,
+ Properties authProperties,
+ KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema) {
+
+ Properties kafkaConsumerConfig = new Properties();
+ kafkaConsumerConfig.putAll(sourceProperties);
+ kafkaConsumerConfig.putAll(authProperties);
+
+ return KafkaSource.builder()
+ .setBootstrapServers(sourceProperties.getProperty("bootstrap.servers"))
+ .setTopics(sourceProperties.getProperty("topic"))
+ .setGroupId(sourceProperties.getProperty("group.id"))
+ // If the job starts with no state, use the latest committed offset
+ // If the commited offset is also not available, start from latest
+ .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
+ .setDeserializer(kafkaRecordDeserializationSchema)
+ .setProperties(kafkaConsumerConfig)
+ .build();
+ }
+
+ // Create a Kafka Record Deserialization Schema using GSR
+ // to extract the record value into an AVRO-specific (generated) record class
+ private static KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema(Properties schemaRegistryProperties, Class recordClazz) {
+ Map deserializerConfig = Map.of(
+ AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName(),
+ AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryProperties.getProperty("region"),
+ AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryProperties.getProperty("name"));
+
+ DeserializationSchema legacyDeserializationSchema = GlueSchemaRegistryAvroDeserializationSchema.forSpecific(recordClazz, deserializerConfig);
+ return KafkaRecordDeserializationSchema.valueOnly(legacyDeserializationSchema);
+ }
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // For the scope of this example we are disabling operator chaining just to allow observing records flowing in the
+ // application. Disabling chaining in production application may seriously impact performances.
+ env.disableOperatorChaining();
+
+ if(isLocal(env)) {
+ // When running locally, enable checkpoints: the KafkaSource commits offsets on checkpoint only.
+ // When running on Managed Flink checkpoints are controlled by the application configuration.
+ env.enableCheckpointing(10_000);
+ }
+
+ final Map applicationProperties = loadApplicationProperties(env);
+ LOG.info("Application properties: {}", applicationProperties);
+
+ Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties());
+ Properties inputProperties = applicationProperties.get("Input0");
+ Properties schemaRegistryProperties = applicationProperties.get("SchemaRegistry");
+
+ // Set up the deserialization schema and the Kafka source
+ KafkaRecordDeserializationSchema recordDeserializationSchema = kafkaRecordDeserializationSchema(schemaRegistryProperties, TemperatureSample.class);
+ KafkaSource source = createKafkaSource(inputProperties, authProperties, recordDeserializationSchema);
+
+ // Attach the source
+ DataStream temperatureSamples = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "Temperature Samples source")
+ .uid("temperature-samples-source");
+
+
+ // We print the records for the sake of this example.
+ // Any output to stdout is visible when running locally, but not when running on Managed Flink
+ temperatureSamples.print().uid("print-sink");
+
+ env.execute("Temperature Samples Consumer");
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/input-reader-schema.avdl b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/avro/temperature-sample.avdl
similarity index 100%
rename from java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/input-reader-schema.avdl
rename to java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/avro/temperature-sample.avdl
diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 00000000..49b59855
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,17 @@
+[
+ {
+ "PropertyGroupId": "Input0",
+ "PropertyMap": {
+ "bootstrap.servers": "localhost:9092",
+ "topic": "temperature-samples",
+ "group.id": "flink-avro-consumer"
+ }
+ },
+ {
+ "PropertyGroupId": "SchemaRegistry",
+ "PropertyMap": {
+ "name": "temperature-schema-registry",
+ "region": "us-east-1"
+ }
+ }
+]
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties
similarity index 50%
rename from java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties
rename to java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties
index c7d36aa3..41570b1d 100644
--- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties
+++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties
@@ -4,4 +4,9 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
\ No newline at end of file
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+logger.msf.name = com.amazonaws.services.msf
+logger.msf.level = DEBUG
+logger.msf.additivity = false
+logger.msf.appenderRef.console.ref = ConsoleAppender
diff --git a/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml b/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml
new file mode 100644
index 00000000..2ed6ebfa
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml
@@ -0,0 +1,24 @@
+version: '3.8'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.4.0
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka:
+ image: confluentinc/cp-kafka:7.4.0
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_DEFAULT_REPLICATION_FACTOR: 1
+ KAFKA_NUM_PARTITIONS: 3
diff --git a/java/AvroGlueSchemaRegistryKafka/pom.xml b/java/AvroGlueSchemaRegistryKafka/pom.xml
index 2cc5d255..88033173 100644
--- a/java/AvroGlueSchemaRegistryKafka/pom.xml
+++ b/java/AvroGlueSchemaRegistryKafka/pom.xml
@@ -5,227 +5,203 @@
4.0.0com.amazonaws
- avro-gsr-kafka-flink
+ avro-gsr-kafka1.0-SNAPSHOT
+ pom
+
+
+ producer
+ consumer
+ UTF-8
- ${project.basedir}/target
- ${project.name}11${target.java.version}
- com.amazonaws.services.msf.StreamingJob
-
- 2.12
- 1.10.2
- 2.8.1
+ ${target.java.version}
- 1.15.4
- 2.1.0
+ 1.20.0
+ 3.2.0-1.19
+ 5.0.0-1.20
+ 1.11.31.2.0
- 1.1.6
- 2.19.30
-
- 1.7.322.17.25.9.1
-
-
-
-
- org.apache.flink
- flink-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-streaming-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-clients
- ${flink.version}
- provided
-
-
-
-
- com.amazonaws
- aws-kinesisanalytics-runtime
- ${kda.runtime.version}
- provided
-
-
-
-
- org.apache.flink
- flink-connector-base
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-connector-kafka
- ${flink.version}
-
-
- com.amazonaws
- aws-kinesisanalytics-flink
- ${kda.connectors.version}
-
-
-
-
- org.apache.flink
- flink-avro
- ${flink.version}
-
-
- org.apache.avro
- avro
- ${avro.version}
-
-
-
-
- org.apache.flink
- flink-avro-glue-schema-registry_${scala.binary.version}
- ${flink.version}
-
-
-
-
-
-
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- ${log4j.version}
-
-
- org.apache.logging.log4j
- log4j-api
- ${log4j.version}
-
-
- org.apache.logging.log4j
- log4j-core
- ${log4j.version}
-
-
-
-
- org.junit.jupiter
- junit-jupiter-api
- ${junit.version}
- test
-
-
- org.junit.jupiter
- junit-jupiter-engine
- ${junit.version}
- test
-
-
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${kafka.connector.version}
+
+
+ org.apache.flink
+ flink-connector-datagen
+ ${flink.version}
+
+
+
+
+ org.apache.flink
+ flink-avro-glue-schema-registry
+ ${aws.connectors.version}
+
+
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.version}
+ test
+
+
+
- ${buildDirectory}
- ${jar.finalName}
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.8.1
-
- ${target.java.version}
- ${target.java.version}
- true
-
-
-
-
-
- org.apache.avro
- avro-maven-plugin
- ${avro.version}
-
-
- generate-sources
-
- idl-protocol
-
-
- ${project.basedir}/src/main/resources/avro
- ${project.basedir}/src/test/resources/avro
- private
- String
- true
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.1.1
-
-
-
- package
-
- shade
-
-
-
-
- org.apache.flink:force-shading
- com.google.code.findbugs:jsr305
- org.slf4j:*
- org.apache.logging.log4j:*
-
-
-
-
-
- *:*
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+ true
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ idl-protocol
+
+
+ ${project.basedir}/src/main/resources/avro
+ ${project.basedir}/src/test/resources/avro
+ private
+ String
+ true
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.1
+
+
+ package
+
+ shade
+
+
+
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ org.apache.logging.log4j:*
-
-
-
-
-
- ${main.class}
-
-
-
-
-
-
-
-
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.amazonaws.services.msf.StreamingJob
+
+
+
+
+
+
+
+
-
-
\ No newline at end of file
+
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/pom.xml b/java/AvroGlueSchemaRegistryKafka/producer/pom.xml
new file mode 100644
index 00000000..1855c1dc
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/pom.xml
@@ -0,0 +1,103 @@
+
+
+ 4.0.0
+
+
+ com.amazonaws
+ avro-gsr-kafka
+ 1.0-SNAPSHOT
+
+
+ avro-gsr-kafka-producer
+
+
+
+
+ org.apache.flink
+ flink-java
+
+
+ org.apache.flink
+ flink-streaming-java
+
+
+ org.apache.flink
+ flink-clients
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+
+
+ org.apache.flink
+ flink-connector-datagen
+
+
+ org.apache.flink
+ flink-connector-kafka
+
+
+
+
+ org.apache.flink
+ flink-avro-glue-schema-registry
+
+
+
+
+ org.apache.flink
+ flink-avro
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java
new file mode 100644
index 00000000..6b32dac3
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java
@@ -0,0 +1,159 @@
+package com.amazonaws.services.msf;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import com.amazonaws.services.msf.avro.TemperatureSample;
+import com.amazonaws.services.msf.domain.TemperatureSampleGenerator;
+import com.amazonaws.services.msf.kafka.KeyHashKafkaPartitioner;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Compatibility;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+public class StreamingJob {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
+
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+
+ private static boolean isLocal(StreamExecutionEnvironment env) {
+ return env instanceof LocalStreamEnvironment;
+ }
+
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (isLocal(env)) {
+ LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ Objects.requireNonNull(StreamingJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath());
+ } else {
+ LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ private static KafkaSink createKafkaSink(
+ Properties sinkProperties,
+ Properties authProperties,
+ KafkaRecordSerializationSchema recordSerializationSchema) {
+
+ Properties kafkaProducerConfig = new Properties();
+ kafkaProducerConfig.putAll(sinkProperties);
+ kafkaProducerConfig.putAll(authProperties);
+
+ return KafkaSink.builder()
+ .setBootstrapServers(sinkProperties.getProperty("bootstrap.servers"))
+ .setKafkaProducerConfig(kafkaProducerConfig)
+ .setRecordSerializer(recordSerializationSchema)
+ .build();
+ }
+
+ // Create the Kafka Record Serialization schema for an AVRO-specific (generated) record class.
+ // This method also extracts the key to be used in the kafka record (it assumes it is a String)
+ private static KafkaRecordSerializationSchema kafkaRecordSerializationSchema(
+ Class recordClazz,
+ SerializableFunction keyExtractor,
+ Properties schemaRegistryProperties,
+ String topic) {
+
+ Map serializerConfig = Map.of(
+ // Enable auto registering the schema if it does not exist.
+ AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true,
+ // Explicitly set the compatibility mode when the schema is auto-created (default would be "backward" anyway)
+ AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.BACKWARD,
+ // GSR Region
+ AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryProperties.getProperty("region"),
+ // The Registry must exist in the region
+ AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryProperties.getProperty("name"));
+
+ // The key is extracted with the provided extractor function and turned into bytes
+ SerializationSchema keySerializationSchema = record -> {
+ try {
+ String key = keyExtractor.apply(record);
+ return key.getBytes(StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // Serialize the value using GSR
+ SerializationSchema valueSerializationSchema = GlueSchemaRegistryAvroSerializationSchema.forSpecific(
+ recordClazz, topic, serializerConfig);
+
+ return KafkaRecordSerializationSchema.builder()
+ .setTopic(topic)
+ .setKeySerializationSchema(keySerializationSchema)
+ .setValueSerializationSchema(valueSerializationSchema)
+ // In this example we explicitly set a partitioner to ensure the sink partitions by key in the destination
+ // topic. Note that this is not the default behavior of the KafkaSink
+ .setPartitioner(new KeyHashKafkaPartitioner())
+ .build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // For the scope of this example we are disabling operator chaining just to allow observing records flowing in the
+ // application. Disabling chaining in production application may seriously impact performances.
+ env.disableOperatorChaining();
+
+ final Map applicationProperties = loadApplicationProperties(env);
+ LOG.info("Application properties: {}", applicationProperties);
+
+ Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties());
+ Properties outputProperties = applicationProperties.get("Output0");
+ Properties schemaRegistryProperties = applicationProperties.get("SchemaRegistry");
+ Properties dataGenProperties = applicationProperties.get("DataGen");
+
+ // Set up DataGenerator
+ int samplesPerSecond = Integer.parseInt(dataGenProperties.getProperty("samples.per.second", "100"));
+ DataGeneratorSource source = new DataGeneratorSource<>(
+ new TemperatureSampleGenerator(),
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(samplesPerSecond),
+ TypeInformation.of(TemperatureSample.class)
+ );
+
+ // Attach the DataGenerator source
+ DataStream temperatureSamples = env.fromSource(
+ source,
+ org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
+ "Temperature Generator")
+ .uid("temperature-generator");
+
+
+ // Print the records for the sake of this example.
+ // Any output to stdout is visible when running locally, but not when running on Managed Flink
+ temperatureSamples.print();
+
+ // Create the record serialization schema, which is specific to the TemperatureSample to be serialized
+ // It also defines the topic name and how to extract the key to be used in the kafka record
+ KafkaRecordSerializationSchema recordSerializationSchema = kafkaRecordSerializationSchema(
+ TemperatureSample.class,
+ (SerializableFunction) TemperatureSample::getRoom,
+ schemaRegistryProperties,
+ outputProperties.getProperty("topic"));
+
+ // Attach the sink
+ KafkaSink sink = createKafkaSink(outputProperties, authProperties, recordSerializationSchema);
+ temperatureSamples.sinkTo(sink).uid("temperature-sink");
+
+ env.execute("Temperature Producer");
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java
new file mode 100644
index 00000000..24a8a0dd
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java
@@ -0,0 +1,26 @@
+package com.amazonaws.services.msf.domain;
+
+import com.amazonaws.services.msf.avro.TemperatureSample;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+
+import java.time.Instant;
+import java.util.Random;
+
+public class TemperatureSampleGenerator implements GeneratorFunction {
+
+ private static final String[] ROOMS = {"room1", "room2", "room3", "room4", "room5"};
+ private final Random random = new Random();
+
+ @Override
+ public TemperatureSample map(Long index) throws Exception {
+ String room = ROOMS[random.nextInt(ROOMS.length)];
+ double temperature = 18.0 + random.nextDouble() * 12.0; // 18-30°C
+ int sensorId = random.nextInt(100); // Random sensor ID
+ return TemperatureSample.newBuilder()
+ .setSensorId(sensorId)
+ .setRoom(room)
+ .setTemperature((float) temperature)
+ .setSampleTime(Instant.now())
+ .build();
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java
new file mode 100644
index 00000000..62610e6f
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java
@@ -0,0 +1,26 @@
+package com.amazonaws.services.msf.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+import java.util.Arrays;
+
+/**
+ * Example of FlinkKafkaPartitioner which partitions by key.
+ *
+ * The KafkaSink in DataStream API does not use the Kafka default partitioner by default.
+ *
+ * @param record type
+ */
+public class KeyHashKafkaPartitioner extends FlinkKafkaPartitioner {
+
+ @Override
+ public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ if (key != null) {
+ int numPartitions = partitions.length;
+ int hash = Arrays.hashCode(key);
+ return partitions[Math.abs(hash) % numPartitions];
+ } else {
+ return partitions[0];
+ }
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl
new file mode 100644
index 00000000..b7802f0e
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl
@@ -0,0 +1,9 @@
+@namespace("com.amazonaws.services.msf.avro")
+protocol In {
+ record TemperatureSample {
+ int sensorId;
+ string room;
+ float temperature;
+ timestamp_ms sampleTime;
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 00000000..60a904c4
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,22 @@
+[
+ {
+ "PropertyGroupId": "Output0",
+ "PropertyMap": {
+ "bootstrap.servers": "localhost:9092",
+ "topic": "temperature-samples"
+ }
+ },
+ {
+ "PropertyGroupId": "SchemaRegistry",
+ "PropertyMap": {
+ "name": "temperature-schema-registry",
+ "region": "us-east-1"
+ }
+ },
+ {
+ "PropertyGroupId": "DataGen",
+ "PropertyMap": {
+ "samples.per.second": "10"
+ }
+ }
+]
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..41570b1d
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties
@@ -0,0 +1,12 @@
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+logger.msf.name = com.amazonaws.services.msf
+logger.msf.level = DEBUG
+logger.msf.additivity = false
+logger.msf.appenderRef.console.ref = ConsoleAppender
diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java b/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java
new file mode 100644
index 00000000..68f227d6
--- /dev/null
+++ b/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java
@@ -0,0 +1,39 @@
+package com.amazonaws.services.msf.domain;
+
+import com.amazonaws.services.msf.avro.TemperatureSample;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class TemperatureSampleGeneratorTest {
+
+ @Test
+ void testGenerateTemperatureSample() throws Exception {
+ TemperatureSampleGenerator generator = new TemperatureSampleGenerator();
+
+ TemperatureSample sample = generator.map(1L);
+
+ assertNotNull(sample);
+ assertTrue(sample.getSensorId() >= 0);
+ assertTrue(sample.getSensorId() < 1000);
+ assertNotNull(sample.getRoom());
+ assertTrue(sample.getRoom().startsWith("room"));
+ assertTrue(sample.getTemperature() >= 18.0f);
+ assertTrue(sample.getTemperature() <= 30.0f);
+ assertNotNull(sample.getSampleTime());
+ }
+
+ @Test
+ void testMultipleSamplesHaveDifferentValues() throws Exception {
+ TemperatureSampleGenerator generator = new TemperatureSampleGenerator();
+
+ TemperatureSample sample1 = generator.map(1L);
+ TemperatureSample sample2 = generator.map(2L);
+
+ // At least one property should be different (sensorId, room, or temperature)
+ boolean isDifferent = sample1.getSensorId() != sample2.getSensorId() ||
+ !sample1.getRoom().equals(sample2.getRoom()) ||
+ sample1.getTemperature() != sample2.getTemperature();
+ assertTrue(isDifferent);
+ }
+}
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java
deleted file mode 100644
index 374ca2ee..00000000
--- a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package com.amazonaws.services.msf;
-
-import com.amazonaws.services.msf.avro.RoomTemperature;
-import com.amazonaws.services.msf.avro.TemperatureSample;
-import com.amazonaws.services.msf.domain.RoomAverageTemperatureCalculator;
-import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
-import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
-import org.apache.flink.connector.kafka.sink.KafkaSink;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema;
-import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Sample Flink application that can run in Amazon Managed Service for Apache Flink
- *
- * It reads from a Kafka topic temperature samples serialized as AVRO using Amazon Glue Schema Registry.
- * It calculates average room temperatures every minute, and publish them to another Kafka topic, again serialized as
- * AVRO using Glue Schema Registry.
- */
-public class StreamingJob {
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class);
-
- // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime
- private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
-
- // Names of the configuration group containing the application properties
- private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties";
-
- private static boolean isLocal(StreamExecutionEnvironment env) {
- return env instanceof LocalStreamEnvironment;
- }
-
- /**
- * Load application properties from the service runtime or from a local resource, when the environment is local
- */
- private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
- if (isLocal(env)) {
- LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
- return KinesisAnalyticsRuntime.getApplicationProperties(
- StreamingJob.class.getClassLoader()
- .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
- } else {
- LOGGER.info("Loading application configuration from ");
- return KinesisAnalyticsRuntime.getApplicationProperties();
- }
- }
-
- /**
- * KafkaSource for any AVRO-generated class (SpecificRecord) using AWS Glue Schema Registry.
- *
- * @param record type
- * @param payloadAvroClass AVRO-generated class for the message body
- * @param bootstrapServers Kafka bootstrap server
- * @param topic topic name
- * @param consumerGroupId Kafka Consumer Group ID
- * @param schemaRegistryName Glue Schema Registry name
- * @param schemaRegistryRegion Glue Schema Registry region
- * @param kafkaConsumerConfig configuration passed to the Kafka Consumer
- * @return a KafkaSource instance
- */
- private static KafkaSource kafkaSource(
- Class payloadAvroClass,
- String bootstrapServers,
- String topic,
- String consumerGroupId,
- String schemaRegistryName,
- String schemaRegistryRegion,
- Properties kafkaConsumerConfig) {
-
- // DeserializationSchema for the message body: AVRO specific record, with Glue Schema Registry
- Map deserializerConfig = Map.of(
- AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName(),
- AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion,
- AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryName);
- DeserializationSchema legacyDeserializationSchema = GlueSchemaRegistryAvroDeserializationSchema.forSpecific(payloadAvroClass, deserializerConfig);
- KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema = KafkaRecordDeserializationSchema.valueOnly(legacyDeserializationSchema);
-
- // ... more Kafka consumer configurations (e.g. MSK IAM auth) go here...
-
- return KafkaSource.builder()
- .setBootstrapServers(bootstrapServers)
- .setTopics(topic)
- .setGroupId(consumerGroupId)
- .setDeserializer(kafkaRecordDeserializationSchema)
- .setProperties(kafkaConsumerConfig)
- .build();
- }
-
- /**
- * KafkaSink for any AVRO-generated class (specific records) using AWS Glue Schema Registry
- * using a Kafka message key (a String) extracted from the record using a KeySelector.
- *
- * @param record type
- * @param payloadAvroClass AVRO-generated class for the message body
- * @param messageKeyExtractor KeySelector to extract the message key from the record
- * @param bootstrapServers Kafka bootstrap servers
- * @param topic topic name
- * @param schemaRegistryName Glue Schema Registry name
- * @param schemaRegistryRegion Glue Schema Registry region
- * @param kafkaProducerConfig configuration passed to the Kafka Producer
- * @return a KafkaSink
- */
- private static KafkaSink keyedKafkaSink(
- Class payloadAvroClass,
- KeySelector messageKeyExtractor,
- String bootstrapServers,
- String topic,
- String schemaRegistryName,
- String schemaRegistryRegion,
- Properties kafkaProducerConfig) {
-
- // SerializationSchema for the message body: AVRO with Glue Schema Registry
- // (GlueSchemaRegistryAvroSerializationSchema expects a Map as configuration)
- Map serializerConfig = Map.of(
- AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true, // Enable Schema Auto-registration (the name of the schema is based on the name of the topic)
- AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion,
- AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryName);
- SerializationSchema valueSerializationSchema = GlueSchemaRegistryAvroSerializationSchema.forSpecific(payloadAvroClass, topic, serializerConfig);
-
- // SerializationSchema for the message key.
- // Extracts the key (a String) from the record using a KeySelector
- // and covert the String to bytes as UTF-8 (same default behaviour of org.apache.flink.api.common.serialization.SimpleStringSchema)
- SerializationSchema keySerializationSchema = record -> {
- try {
- return messageKeyExtractor.getKey(record).getBytes(StandardCharsets.UTF_8);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
-
- // ... more Kafka consumer configurations (e.g. MSK IAM auth) go here...
-
- return KafkaSink.builder()
- .setBootstrapServers(bootstrapServers)
- .setRecordSerializer(
- KafkaRecordSerializationSchema.builder()
- .setTopic(topic)
- .setValueSerializationSchema(valueSerializationSchema)
- .setKeySerializationSchema(keySerializationSchema)
- .build())
- .setKafkaProducerConfig(kafkaProducerConfig)
- .build();
- }
-
-
- public static void main(String[] args) throws Exception {
- // set up the streaming execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
- // Local dev specific settings
- if (isLocal(env)) {
- // Checkpointing and parallelism are set by the service when running on AWS
- env.enableCheckpointing(60000);
- env.setParallelism(2);
- }
-
- // Application configuration
- Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP);
- String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined");
- String sourceTopic = Preconditions.checkNotNull(applicationProperties.getProperty("source.topic"), "source.topic not defined");
- String sourceConsumerGroupId = applicationProperties.getProperty("source.consumer.group.id", "flink-avro-gsr-sample");
- String sinkTopic = Preconditions.checkNotNull(applicationProperties.getProperty("sink.topic"), "sink.topic not defined");
- String schemaRegistryName = Preconditions.checkNotNull(applicationProperties.getProperty("schema.registry.name"), "schema.registry.name not defined");
- String schemaRegistryRegion = Preconditions.checkNotNull(applicationProperties.getProperty("schema.registry.region"), "schema.registry.region not defined");
-
-
- KafkaSource source = kafkaSource(
- TemperatureSample.class,
- bootstrapServers,
- sourceTopic,
- sourceConsumerGroupId,
- schemaRegistryName,
- schemaRegistryRegion,
- new Properties() // ...any other Kafka consumer property
- );
-
- KafkaSink sink = keyedKafkaSink(
- RoomTemperature.class,
- RoomTemperature::getRoom,
- bootstrapServers,
- sinkTopic,
- schemaRegistryName,
- schemaRegistryRegion,
- new Properties() // ...any other Kafka producer property
- );
-
-
- DataStream temperatureSamples = env.fromSource(
- source,
- WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
- .withIdleness(Duration.ofSeconds(60))
- .withTimestampAssigner((sample, ts) -> sample.getSampleTime().toEpochMilli()),
- "Temperature Samples source")
- .uid("temperature-samples-source");
-
- DataStream roomTemperatures = temperatureSamples
- .keyBy(TemperatureSample::getRoom)
- .window(TumblingEventTimeWindows.of(Time.seconds(60)))
- .aggregate(new RoomAverageTemperatureCalculator())
- .uid("room-temperatures");
-
- roomTemperatures.sinkTo(sink).uid("room-temperatures-sink");
-
- env.execute("Flink app");
- }
-}
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java b/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java
deleted file mode 100644
index 40e994a6..00000000
--- a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.amazonaws.services.msf.domain;
-
-import com.amazonaws.services.msf.avro.RoomTemperature;
-import com.amazonaws.services.msf.avro.TemperatureSample;
-import org.apache.flink.api.common.functions.AggregateFunction;
-
-import java.time.Instant;
-
-/**
- * AggregateFunction aggregating the room average temperature
- * (provided only for completeness, but not relevant for the AVRO-Glue Schema Registry example)
- */
-public class RoomAverageTemperatureCalculator implements AggregateFunction {
- private Instant maxInstant(Instant a, Instant b) {
- return Instant.ofEpochMilli(
- Math.max(a.toEpochMilli(), b.toEpochMilli()));
- }
-
- @Override
- public RoomTemperature createAccumulator() {
- return RoomTemperature.newBuilder()
- .setRoom("").setSampleCount(0).setLastSampleTime(Instant.EPOCH).setTemperature(0).build();
- }
-
- @Override
- public RoomTemperature add(TemperatureSample sample, RoomTemperature accumulator) {
- final int sampleCount = accumulator.getSampleCount();
- final float roomAvgTemp = accumulator.getTemperature();
- final Instant lastSampleTime = maxInstant(accumulator.getLastSampleTime(), sample.getSampleTime());
- final float newRoomAvgTemp = (roomAvgTemp * sampleCount + sample.getTemperature()) / (float) (sampleCount + 1);
-
- accumulator.setRoom(sample.getRoom());
- accumulator.setSampleCount(sampleCount + 1);
- accumulator.setTemperature(newRoomAvgTemp);
- accumulator.setLastSampleTime(lastSampleTime);
-
- return accumulator;
- }
-
- @Override
- public RoomTemperature getResult(RoomTemperature accumulator) {
- return accumulator;
- }
-
- @Override
- public RoomTemperature merge(RoomTemperature a, RoomTemperature b) {
- final int totalSampleCount = a.getSampleCount() + b.getSampleCount();
- final float avgTemperature = (a.getTemperature() * a.getSampleCount() + b.getTemperature() * b.getSampleCount()) / (float) (totalSampleCount);
-
- return RoomTemperature.newBuilder()
- .setRoom(a.getRoom())
- .setSampleCount(totalSampleCount)
- .setTemperature(avgTemperature)
- .setLastSampleTime(maxInstant(a.getLastSampleTime(), b.getLastSampleTime()))
- .build();
- }
-
-
-}
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl b/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl
deleted file mode 100644
index 3024ebe8..00000000
--- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl
+++ /dev/null
@@ -1,9 +0,0 @@
-@namespace("com.amazonaws.services.msf.avro")
-protocol Out {
- record RoomTemperature {
- string room;
- float temperature;
- int sampleCount;
- timestamp_ms lastSampleTime;
- }
-}
\ No newline at end of file
diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json
deleted file mode 100644
index 9cec7377..00000000
--- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json
+++ /dev/null
@@ -1,13 +0,0 @@
-[
- {
- "PropertyGroupId": "FlinkApplicationProperties",
- "PropertyMap": {
- "bootstrap.servers": "localhost:29092",
- "source.topic": "temperature-samples",
- "source.consumer.group.id": "flink-avro-gsr",
- "sink.topic": "room-temperatures",
- "schema.registry.name": "avro-sample",
- "schema.registry.region": "eu-west-1"
- }
- }
-]
diff --git a/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java b/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java
deleted file mode 100644
index 5c60673d..00000000
--- a/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.amazonaws.services.msf.domain;
-
-import com.amazonaws.services.msf.avro.RoomTemperature;
-import com.amazonaws.services.msf.avro.TemperatureSample;
-import org.junit.jupiter.api.Test;
-
-import java.time.Instant;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-class RoomAverageTemperatureCalculatorTest {
-
- RoomAverageTemperatureCalculator sut = new RoomAverageTemperatureCalculator();
-
- @Test
- void createAccumulator() {
- RoomTemperature roomTemperature = sut.createAccumulator();
-
- assertNotNull(roomTemperature);
- assertEquals(0.0f, roomTemperature.getTemperature());
- assertEquals(0, roomTemperature.getSampleCount());
- assertEquals(Instant.EPOCH, roomTemperature.getLastSampleTime());
- }
-
- @Test
- void add() {
- TemperatureSample sample = TemperatureSample.newBuilder()
- .setSensorId(42)
- .setRoom("a-room")
- .setSampleTime(Instant.ofEpochMilli(50))
- .setTemperature(10.0f)
- .build();
- RoomTemperature acc = RoomTemperature.newBuilder()
- .setRoom("a-room")
- .setSampleCount(10)
- .setTemperature(20.0f)
- .setLastSampleTime(Instant.ofEpochMilli(100))
- .build();
-
- RoomTemperature accOut = sut.add(sample, acc);
-
- assertEquals("a-room", accOut.getRoom());
- assertEquals(10 + 1, accOut.getSampleCount());
- assertEquals(Instant.ofEpochMilli(100), accOut.getLastSampleTime());
- assertEquals((20.0f * 10 + 10.0f) / (float) (10 + 1), accOut.getTemperature());
- }
-
- @Test
- void getResult() {
- RoomTemperature acc = RoomTemperature.newBuilder()
- .setRoom("a-room")
- .setSampleCount(10)
- .setTemperature(20.0f)
- .setLastSampleTime(Instant.ofEpochMilli(100))
- .build();
-
- RoomTemperature result = sut.getResult(acc);
- assertEquals(acc.getRoom(), result.getRoom());
- assertEquals(acc.getTemperature(), result.getTemperature());
- assertEquals(acc.getSampleCount(), result.getSampleCount());
- assertEquals(acc.getLastSampleTime(), acc.getLastSampleTime());
- }
-
- @Test
- void merge() {
- RoomTemperature a = RoomTemperature.newBuilder()
- .setRoom("a-room")
- .setSampleCount(10)
- .setTemperature(20.0f)
- .setLastSampleTime(Instant.ofEpochMilli(100))
- .build();
- RoomTemperature b = RoomTemperature.newBuilder()
- .setRoom("a-room")
- .setSampleCount(20)
- .setTemperature(40.0f)
- .setLastSampleTime(Instant.ofEpochMilli(200))
- .build();
-
- RoomTemperature c = sut.merge(a, b);
- assertEquals("a-room", c.getRoom());
- assertEquals(10 + 20, c.getSampleCount());
- assertEquals((20.0f * 10 + 40.0f * 20) / (float) (10 + 20), c.getTemperature());
- assertEquals(Instant.ofEpochMilli(200), c.getLastSampleTime());
- }
-}
\ No newline at end of file