diff --git a/java/AvroGlueSchemaRegistryKafka/README.md b/java/AvroGlueSchemaRegistryKafka/README.md index ad4875ef..af0420b1 100644 --- a/java/AvroGlueSchemaRegistryKafka/README.md +++ b/java/AvroGlueSchemaRegistryKafka/README.md @@ -1,82 +1,129 @@ ## AVRO serialization in KafkaSource and KafkaSink using AWS Glue Schema Registry -* Flink version: 1.15 +This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using +[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (GSR). + +* Flink version: 1.20 * Flink API: DataStream API * Language: Java (11) +* Connectors: Kafka connector, DataGenerator -This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using -[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). +The example contains two Flink applications: +1. a [producer](./producer) job, which generates random temperature samples and publishes to Kafka as AVRO using GSR. +2. a [consumer](./consumer) job, which reads temperature records from the same topic, using GSR. -This example uses AVRO generated classes (more details, [below](#Using_AVRO-generated_classes)) +Both applications use AVRO-specific records based on the schema definitions provided at compile time. +The record classes are generated during the build process based on the AVRO IDL (`.avdl`) you can find in the resources +folder of both jobs. For simplicity, the schema definition is repeated in both jobs. -The reader's schema definition, for the source, and the writer's schema definition, for the sink, are provided as -AVRO IDL (`.avdl`) in [./src/main/resources/avro](./src/main/resources/avro). +The two jobs are designed to run as separate Amazon Managed Service for Apache Flink applications, connecting to the +same Amazon Managed Streaming for Kafka (MSK) cluster. +The default configuration uses unauthenticated connection to MSK. The example can be extended to implement any supported +MSK authentication scheme. -A KafkaSource produces a stream of AVRO data objects (SpecificRecords), fetching the writer's schema from AWS Glue -Schema Registry. The AVRO Kafka message value must have been serialized using AWS Glue Schema Registry. +### Prerequisites -A KafkaSink serializes AVRO data objects as Kafka message value, and a String, converted to bytes as UTF-8, as Kafka -message key. +To run the two Managed Flink applications you need to set up the following prerequisites: -## Flink compatibility +1. An MSK cluster + - Create the topic `temperature-samples` or enable auto topic creation + - Allow unauthenticated access (or modify the application to support the configured authentication scheme) +2. Create a Registry named `temperature-schema-registry` in Glue Schema Registry, in the same region +3. ⚠️ Create a VPC Endpoint for Glue in the VPC where the Managed Flink applications are attached. + Without VPCE an application connected to a VPC may not be able to connector to a service endpoint. -**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink +### Create the Amazon Managed Service for Apache Flink applications -### Flink API compatibility +Create two Managed Flink applications, one for the producer and the other for the consumer. +1. Build both jobs by running `mvn package` in the directory of the example. This will build two JARs in the `target` subfolder of the producer and consumer. +2. Upload both JARs to an S3 bucket and use them as application code, for producer and consumer respectively.. +3. Application configuration + * Attach both applications to a VPC with access to the MSK cluster and ensure the Security Group allows access to the MSK cluster. + For simplicity, to run the example we suggest to use for both applications the same VPC, same subnets, and same Security Group as the MSK cluster + * Ensure the applications have permissions to access Glue Schema Registry. For the sake of this example you can attach + the policy `AWSGlueSchemaRegistryFullAccess` to the producer application's IAM Role, and the policy `AWSGlueSchemaRegistryReadonlyAccess` + to the consumer's Role. + * Set up the Runtime properties of the two applications as described in the following sections. -This example shows how to use AWS Glue Schema Registry with the Flink Java DataStream API. +### Runtime configuration -It uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, deprecated -with Flink 1.15). +The two applications expect different configurations. +When running locally the configurations are fetched from the `flink-application-properties-dev.json` files in the resources +folder of each job. -At the moment, no format provider is available for the Table API. +When running on Managed Flink these files are ignored and the configuration must be passed using the Runtime properties +as part of the configuration of each application. -## Notes about using AVRO with Apache Flink +All parameters are case-sensitive. -### AVRO-generated classes +#### Producer runtime parameters -This project uses classes generated at built-time as data objects. +| Group ID | Key | Description | +|-------------------|-----------------------|----------------------------------------------------------------| +| `Output0` | `bootstrap.servers` | Kafka bootstrap servers | +| `Output0` | `topic` | Kafka topic name for temperature samples | +| `SchemaRegistry` | `name` | AWS Glue Schema Registry name | +| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located | +| `DataGen` | `samples.per.second` | (optional) Rate of sample generation per second (default: 100) | -As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source -code. -AVRO Maven plugin generates the Java classes (source code) at build-time, during the + +#### Consumer runtime parameters + +| Group ID | Key | Description | +|-------------------|-----------------------|-------------------------------------------------| +| `Input0` | `bootstrap.servers` | Kafka bootstrap servers | +| `Input0` | `topic` | Kafka topic name for temperature samples | +| `Input0` | `group.id` | Kafka consumer group ID | +| `SchemaRegistry` | `name` | AWS Glue Schema Registry name | +| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located | + + +### Running the applications locally + +A [docker-compose](docker/docker-compose.yml) file is provided to run a local Kafka cluster for local development. +The default configurations use this local cluster. + +When running locally the jobs will use the actual Glue Schema Registry. +Make sure the machine where you are developing has an authenticated AWS CLI profile with permissions to use GSR. Use the +AWS Plugin of your IDE to make the application run with a specific AWS profile. + +See [Running examples locally](../running-examples-locally.md) for further details. + + +### Notes about using AVRO with Apache Flink + +#### AVRO-generated classes + +This project uses classes generated at build-time as data objects. + +As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source +code. The AVRO Maven plugin generates the Java classes (source code) at build-time, during the [`generate-source`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. The generated classes are written into `./target/generated-sources/avro` directory and should **not** be committed with -the project source. - -This way, the only dependency is on the schema definition file(s). +the project source. This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build. Code generation is supported by all common IDEs like IntelliJ. -If your IDE does not see the AVRO classes (`TemperatureSample` and `RoomTemperature`) when you import the project for the -first time, you may manually run `mvn generate-sources` once of force source code generation from the IDE. +If your IDE does not see the AVRO classes (`TemperatureSample`) when you import the project for the +first time, you may manually run `mvn generate-sources` once or force source code generation from the IDE. -### AVRO-generated classes (SpecificRecord) in Apache Flink +#### AVRO-generated classes (SpecificRecord) in Apache Flink Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the Flink state, has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) -these objects, without risking of falling back to Kryo. - -### AVRO and AWS Glue Schema Registry dependencies - -The following dependencies related to AVRO and AWS Glue Schema Registry are included (for FLink 1.15.2): - -1. `org.apache.flink:flink-avro-glue-schema-registry_2.12:1.15.2` - Support for AWS Glue Schema Registry SerDe -2. `org.apache.avro:avro:1.10.2` - Overrides AVRO 1.10.0, transitively included. - -The project also includes `org.apache.flink:flink-avro:1.15.2`. -This is already a transitive dependency from the Glue Schema Registry SerDe and is defined explicitly only for clarity. +these objects, without the risk of falling back to Kryo. -Note that we are overriding AVRO 1.10.0 with 1.10.2. -This minor version upgrade does not break the internal API, and includes some bug fixes introduced with -AVRO [1.10.1](https://github.com/apache/avro/releases/tag/release-1.10.1) -and [1.10.2](https://github.com/apache/avro/releases/tag/release-1.10.2). +### Common issues -### Running in IntelliJ +If the application fails to call the Glue Schema Registry API for any reasons, the job gets trapped in a fail-and-restart +loop. The exception says it cannot fetch or register a schema version. -To start the Flink job in IntelliJ edit the Run/Debug configuration enabling 'Add dependencies with "provided" scope to -the classpath'. \ No newline at end of file +The inability to use GSR may be caused by: +* Lack of permissions to access GSR --> add `AWSGlueSchemaRegistryFullAccess` or `AWSGlueSchemaRegistryReadonlyAccess` policies to the application IAM Role +* Unable to reach the Glue endpoint --> create a Glue VPC Endpoint in the application VPC +* The Registry does not exist --> create a registry with the configured name (`temperature-schema-registry` by default) +* Misconfiguration --> ensure the registry name and region passed to the application match your setup \ No newline at end of file diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml b/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml new file mode 100644 index 00000000..c9a9c1bf --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/consumer/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + + + com.amazonaws + avro-gsr-kafka + 1.0-SNAPSHOT + + + avro-gsr-kafka-consumer + + + + + org.apache.flink + flink-java + + + org.apache.flink + flink-streaming-java + + + org.apache.flink + flink-clients + + + + + com.amazonaws + aws-kinesisanalytics-runtime + + + + + org.apache.flink + flink-connector-base + + + org.apache.flink + flink-connector-kafka + + + + + org.apache.flink + flink-avro-glue-schema-registry + + + + + org.apache.flink + flink-avro + + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + + + org.junit.jupiter + junit-jupiter-api + + + org.junit.jupiter + junit-jupiter-engine + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.avro + avro-maven-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 00000000..bd0618e8 --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,118 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.TemperatureSample; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class StreamingJob { + private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static KafkaSource createKafkaSource( + Properties sourceProperties, + Properties authProperties, + KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema) { + + Properties kafkaConsumerConfig = new Properties(); + kafkaConsumerConfig.putAll(sourceProperties); + kafkaConsumerConfig.putAll(authProperties); + + return KafkaSource.builder() + .setBootstrapServers(sourceProperties.getProperty("bootstrap.servers")) + .setTopics(sourceProperties.getProperty("topic")) + .setGroupId(sourceProperties.getProperty("group.id")) + // If the job starts with no state, use the latest committed offset + // If the commited offset is also not available, start from latest + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setDeserializer(kafkaRecordDeserializationSchema) + .setProperties(kafkaConsumerConfig) + .build(); + } + + // Create a Kafka Record Deserialization Schema using GSR + // to extract the record value into an AVRO-specific (generated) record class + private static KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema(Properties schemaRegistryProperties, Class recordClazz) { + Map deserializerConfig = Map.of( + AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName(), + AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryProperties.getProperty("region"), + AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryProperties.getProperty("name")); + + DeserializationSchema legacyDeserializationSchema = GlueSchemaRegistryAvroDeserializationSchema.forSpecific(recordClazz, deserializerConfig); + return KafkaRecordDeserializationSchema.valueOnly(legacyDeserializationSchema); + } + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // For the scope of this example we are disabling operator chaining just to allow observing records flowing in the + // application. Disabling chaining in production application may seriously impact performances. + env.disableOperatorChaining(); + + if(isLocal(env)) { + // When running locally, enable checkpoints: the KafkaSource commits offsets on checkpoint only. + // When running on Managed Flink checkpoints are controlled by the application configuration. + env.enableCheckpointing(10_000); + } + + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties()); + Properties inputProperties = applicationProperties.get("Input0"); + Properties schemaRegistryProperties = applicationProperties.get("SchemaRegistry"); + + // Set up the deserialization schema and the Kafka source + KafkaRecordDeserializationSchema recordDeserializationSchema = kafkaRecordDeserializationSchema(schemaRegistryProperties, TemperatureSample.class); + KafkaSource source = createKafkaSource(inputProperties, authProperties, recordDeserializationSchema); + + // Attach the source + DataStream temperatureSamples = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "Temperature Samples source") + .uid("temperature-samples-source"); + + + // We print the records for the sake of this example. + // Any output to stdout is visible when running locally, but not when running on Managed Flink + temperatureSamples.print().uid("print-sink"); + + env.execute("Temperature Samples Consumer"); + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/input-reader-schema.avdl b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/avro/temperature-sample.avdl similarity index 100% rename from java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/input-reader-schema.avdl rename to java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/avro/temperature-sample.avdl diff --git a/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 00000000..49b59855 --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,17 @@ +[ + { + "PropertyGroupId": "Input0", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "temperature-samples", + "group.id": "flink-avro-consumer" + } + }, + { + "PropertyGroupId": "SchemaRegistry", + "PropertyMap": { + "name": "temperature-schema-registry", + "region": "us-east-1" + } + } +] diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties similarity index 50% rename from java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties rename to java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties index c7d36aa3..41570b1d 100644 --- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/log4j2.properties +++ b/java/AvroGlueSchemaRegistryKafka/consumer/src/main/resources/log4j2.properties @@ -4,4 +4,9 @@ appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n rootLogger.level = INFO -rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file +rootLogger.appenderRef.console.ref = ConsoleAppender + +logger.msf.name = com.amazonaws.services.msf +logger.msf.level = DEBUG +logger.msf.additivity = false +logger.msf.appenderRef.console.ref = ConsoleAppender diff --git a/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml b/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml new file mode 100644 index 00000000..2ed6ebfa --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/docker/docker-compose.yml @@ -0,0 +1,24 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_NUM_PARTITIONS: 3 diff --git a/java/AvroGlueSchemaRegistryKafka/pom.xml b/java/AvroGlueSchemaRegistryKafka/pom.xml index 2cc5d255..88033173 100644 --- a/java/AvroGlueSchemaRegistryKafka/pom.xml +++ b/java/AvroGlueSchemaRegistryKafka/pom.xml @@ -5,227 +5,203 @@ 4.0.0 com.amazonaws - avro-gsr-kafka-flink + avro-gsr-kafka 1.0-SNAPSHOT + pom + + + producer + consumer + UTF-8 - ${project.basedir}/target - ${project.name} 11 ${target.java.version} - com.amazonaws.services.msf.StreamingJob - - 2.12 - 1.10.2 - 2.8.1 + ${target.java.version} - 1.15.4 - 2.1.0 + 1.20.0 + 3.2.0-1.19 + 5.0.0-1.20 + 1.11.3 1.2.0 - 1.1.6 - 2.19.30 - - 1.7.32 2.17.2 5.9.1 - - - - - org.apache.flink - flink-java - ${flink.version} - provided - - - org.apache.flink - flink-streaming-java - ${flink.version} - provided - - - org.apache.flink - flink-clients - ${flink.version} - provided - - - - - com.amazonaws - aws-kinesisanalytics-runtime - ${kda.runtime.version} - provided - - - - - org.apache.flink - flink-connector-base - ${flink.version} - provided - - - org.apache.flink - flink-connector-kafka - ${flink.version} - - - com.amazonaws - aws-kinesisanalytics-flink - ${kda.connectors.version} - - - - - org.apache.flink - flink-avro - ${flink.version} - - - org.apache.avro - avro - ${avro.version} - - - - - org.apache.flink - flink-avro-glue-schema-registry_${scala.binary.version} - ${flink.version} - - - - - - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - - - org.junit.jupiter - junit-jupiter-api - ${junit.version} - test - - - org.junit.jupiter - junit-jupiter-engine - ${junit.version} - test - - + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + org.apache.flink + flink-connector-kafka + ${kafka.connector.version} + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + org.apache.flink + flink-avro-glue-schema-registry + ${aws.connectors.version} + + + + + org.apache.flink + flink-avro + ${flink.version} + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + - ${buildDirectory} - ${jar.finalName} - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - ${target.java.version} - ${target.java.version} - true - - - - - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - idl-protocol - - - ${project.basedir}/src/main/resources/avro - ${project.basedir}/src/test/resources/avro - private - String - true - - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.1.1 - - - - package - - shade - - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - - - *:* + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + idl-protocol + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* - - - - - - ${main.class} - - - - - - - - + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + - - \ No newline at end of file + diff --git a/java/AvroGlueSchemaRegistryKafka/producer/pom.xml b/java/AvroGlueSchemaRegistryKafka/producer/pom.xml new file mode 100644 index 00000000..1855c1dc --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + + com.amazonaws + avro-gsr-kafka + 1.0-SNAPSHOT + + + avro-gsr-kafka-producer + + + + + org.apache.flink + flink-java + + + org.apache.flink + flink-streaming-java + + + org.apache.flink + flink-clients + + + + + com.amazonaws + aws-kinesisanalytics-runtime + + + + + org.apache.flink + flink-connector-base + + + org.apache.flink + flink-connector-datagen + + + org.apache.flink + flink-connector-kafka + + + + + org.apache.flink + flink-avro-glue-schema-registry + + + + + org.apache.flink + flink-avro + + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + + + org.junit.jupiter + junit-jupiter-api + + + org.junit.jupiter + junit-jupiter-engine + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.avro + avro-maven-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 00000000..6b32dac3 --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,159 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.TemperatureSample; +import com.amazonaws.services.msf.domain.TemperatureSampleGenerator; +import com.amazonaws.services.msf.kafka.KeyHashKafkaPartitioner; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.SerializableFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Compatibility; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class StreamingJob { + private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static KafkaSink createKafkaSink( + Properties sinkProperties, + Properties authProperties, + KafkaRecordSerializationSchema recordSerializationSchema) { + + Properties kafkaProducerConfig = new Properties(); + kafkaProducerConfig.putAll(sinkProperties); + kafkaProducerConfig.putAll(authProperties); + + return KafkaSink.builder() + .setBootstrapServers(sinkProperties.getProperty("bootstrap.servers")) + .setKafkaProducerConfig(kafkaProducerConfig) + .setRecordSerializer(recordSerializationSchema) + .build(); + } + + // Create the Kafka Record Serialization schema for an AVRO-specific (generated) record class. + // This method also extracts the key to be used in the kafka record (it assumes it is a String) + private static KafkaRecordSerializationSchema kafkaRecordSerializationSchema( + Class recordClazz, + SerializableFunction keyExtractor, + Properties schemaRegistryProperties, + String topic) { + + Map serializerConfig = Map.of( + // Enable auto registering the schema if it does not exist. + AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true, + // Explicitly set the compatibility mode when the schema is auto-created (default would be "backward" anyway) + AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.BACKWARD, + // GSR Region + AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryProperties.getProperty("region"), + // The Registry must exist in the region + AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryProperties.getProperty("name")); + + // The key is extracted with the provided extractor function and turned into bytes + SerializationSchema keySerializationSchema = record -> { + try { + String key = keyExtractor.apply(record); + return key.getBytes(StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + // Serialize the value using GSR + SerializationSchema valueSerializationSchema = GlueSchemaRegistryAvroSerializationSchema.forSpecific( + recordClazz, topic, serializerConfig); + + return KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setKeySerializationSchema(keySerializationSchema) + .setValueSerializationSchema(valueSerializationSchema) + // In this example we explicitly set a partitioner to ensure the sink partitions by key in the destination + // topic. Note that this is not the default behavior of the KafkaSink + .setPartitioner(new KeyHashKafkaPartitioner()) + .build(); + } + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // For the scope of this example we are disabling operator chaining just to allow observing records flowing in the + // application. Disabling chaining in production application may seriously impact performances. + env.disableOperatorChaining(); + + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties()); + Properties outputProperties = applicationProperties.get("Output0"); + Properties schemaRegistryProperties = applicationProperties.get("SchemaRegistry"); + Properties dataGenProperties = applicationProperties.get("DataGen"); + + // Set up DataGenerator + int samplesPerSecond = Integer.parseInt(dataGenProperties.getProperty("samples.per.second", "100")); + DataGeneratorSource source = new DataGeneratorSource<>( + new TemperatureSampleGenerator(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(samplesPerSecond), + TypeInformation.of(TemperatureSample.class) + ); + + // Attach the DataGenerator source + DataStream temperatureSamples = env.fromSource( + source, + org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), + "Temperature Generator") + .uid("temperature-generator"); + + + // Print the records for the sake of this example. + // Any output to stdout is visible when running locally, but not when running on Managed Flink + temperatureSamples.print(); + + // Create the record serialization schema, which is specific to the TemperatureSample to be serialized + // It also defines the topic name and how to extract the key to be used in the kafka record + KafkaRecordSerializationSchema recordSerializationSchema = kafkaRecordSerializationSchema( + TemperatureSample.class, + (SerializableFunction) TemperatureSample::getRoom, + schemaRegistryProperties, + outputProperties.getProperty("topic")); + + // Attach the sink + KafkaSink sink = createKafkaSink(outputProperties, authProperties, recordSerializationSchema); + temperatureSamples.sinkTo(sink).uid("temperature-sink"); + + env.execute("Temperature Producer"); + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java new file mode 100644 index 00000000..24a8a0dd --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/domain/TemperatureSampleGenerator.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.domain; + +import com.amazonaws.services.msf.avro.TemperatureSample; +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.time.Instant; +import java.util.Random; + +public class TemperatureSampleGenerator implements GeneratorFunction { + + private static final String[] ROOMS = {"room1", "room2", "room3", "room4", "room5"}; + private final Random random = new Random(); + + @Override + public TemperatureSample map(Long index) throws Exception { + String room = ROOMS[random.nextInt(ROOMS.length)]; + double temperature = 18.0 + random.nextDouble() * 12.0; // 18-30°C + int sensorId = random.nextInt(100); // Random sensor ID + return TemperatureSample.newBuilder() + .setSensorId(sensorId) + .setRoom(room) + .setTemperature((float) temperature) + .setSampleTime(Instant.now()) + .build(); + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java new file mode 100644 index 00000000..62610e6f --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/java/com/amazonaws/services/msf/kafka/KeyHashKafkaPartitioner.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; + +import java.util.Arrays; + +/** + * Example of FlinkKafkaPartitioner which partitions by key. + *

+ * The KafkaSink in DataStream API does not use the Kafka default partitioner by default. + * + * @param record type + */ +public class KeyHashKafkaPartitioner extends FlinkKafkaPartitioner { + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + if (key != null) { + int numPartitions = partitions.length; + int hash = Arrays.hashCode(key); + return partitions[Math.abs(hash) % numPartitions]; + } else { + return partitions[0]; + } + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl new file mode 100644 index 00000000..b7802f0e --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/avro/temperature-sample.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol In { + record TemperatureSample { + int sensorId; + string room; + float temperature; + timestamp_ms sampleTime; + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 00000000..60a904c4 --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,22 @@ +[ + { + "PropertyGroupId": "Output0", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "temperature-samples" + } + }, + { + "PropertyGroupId": "SchemaRegistry", + "PropertyMap": { + "name": "temperature-schema-registry", + "region": "us-east-1" + } + }, + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "samples.per.second": "10" + } + } +] diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties new file mode 100644 index 00000000..41570b1d --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/main/resources/log4j2.properties @@ -0,0 +1,12 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +logger.msf.name = com.amazonaws.services.msf +logger.msf.level = DEBUG +logger.msf.additivity = false +logger.msf.appenderRef.console.ref = ConsoleAppender diff --git a/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java b/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java new file mode 100644 index 00000000..68f227d6 --- /dev/null +++ b/java/AvroGlueSchemaRegistryKafka/producer/src/test/java/com/amazonaws/services/msf/domain/TemperatureSampleGeneratorTest.java @@ -0,0 +1,39 @@ +package com.amazonaws.services.msf.domain; + +import com.amazonaws.services.msf.avro.TemperatureSample; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class TemperatureSampleGeneratorTest { + + @Test + void testGenerateTemperatureSample() throws Exception { + TemperatureSampleGenerator generator = new TemperatureSampleGenerator(); + + TemperatureSample sample = generator.map(1L); + + assertNotNull(sample); + assertTrue(sample.getSensorId() >= 0); + assertTrue(sample.getSensorId() < 1000); + assertNotNull(sample.getRoom()); + assertTrue(sample.getRoom().startsWith("room")); + assertTrue(sample.getTemperature() >= 18.0f); + assertTrue(sample.getTemperature() <= 30.0f); + assertNotNull(sample.getSampleTime()); + } + + @Test + void testMultipleSamplesHaveDifferentValues() throws Exception { + TemperatureSampleGenerator generator = new TemperatureSampleGenerator(); + + TemperatureSample sample1 = generator.map(1L); + TemperatureSample sample2 = generator.map(2L); + + // At least one property should be different (sensorId, room, or temperature) + boolean isDifferent = sample1.getSensorId() != sample2.getSensorId() || + !sample1.getRoom().equals(sample2.getRoom()) || + sample1.getTemperature() != sample2.getTemperature(); + assertTrue(isDifferent); + } +} diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java deleted file mode 100644 index 374ca2ee..00000000 --- a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ /dev/null @@ -1,230 +0,0 @@ -package com.amazonaws.services.msf; - -import com.amazonaws.services.msf.avro.RoomTemperature; -import com.amazonaws.services.msf.avro.TemperatureSample; -import com.amazonaws.services.msf.domain.RoomAverageTemperatureCalculator; -import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import com.amazonaws.services.schemaregistry.utils.AvroRecordType; -import org.apache.avro.specific.SpecificRecord; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema; -import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Map; -import java.util.Properties; - -/** - * Sample Flink application that can run in Amazon Managed Service for Apache Flink - * - * It reads from a Kafka topic temperature samples serialized as AVRO using Amazon Glue Schema Registry. - * It calculates average room temperatures every minute, and publish them to another Kafka topic, again serialized as - * AVRO using Glue Schema Registry. - */ -public class StreamingJob { - private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); - - // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime - private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; - - // Names of the configuration group containing the application properties - private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; - - private static boolean isLocal(StreamExecutionEnvironment env) { - return env instanceof LocalStreamEnvironment; - } - - /** - * Load application properties from the service runtime or from a local resource, when the environment is local - */ - private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { - if (isLocal(env)) { - LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); - return KinesisAnalyticsRuntime.getApplicationProperties( - StreamingJob.class.getClassLoader() - .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); - } else { - LOGGER.info("Loading application configuration from "); - return KinesisAnalyticsRuntime.getApplicationProperties(); - } - } - - /** - * KafkaSource for any AVRO-generated class (SpecificRecord) using AWS Glue Schema Registry. - * - * @param record type - * @param payloadAvroClass AVRO-generated class for the message body - * @param bootstrapServers Kafka bootstrap server - * @param topic topic name - * @param consumerGroupId Kafka Consumer Group ID - * @param schemaRegistryName Glue Schema Registry name - * @param schemaRegistryRegion Glue Schema Registry region - * @param kafkaConsumerConfig configuration passed to the Kafka Consumer - * @return a KafkaSource instance - */ - private static KafkaSource kafkaSource( - Class payloadAvroClass, - String bootstrapServers, - String topic, - String consumerGroupId, - String schemaRegistryName, - String schemaRegistryRegion, - Properties kafkaConsumerConfig) { - - // DeserializationSchema for the message body: AVRO specific record, with Glue Schema Registry - Map deserializerConfig = Map.of( - AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName(), - AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion, - AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryName); - DeserializationSchema legacyDeserializationSchema = GlueSchemaRegistryAvroDeserializationSchema.forSpecific(payloadAvroClass, deserializerConfig); - KafkaRecordDeserializationSchema kafkaRecordDeserializationSchema = KafkaRecordDeserializationSchema.valueOnly(legacyDeserializationSchema); - - // ... more Kafka consumer configurations (e.g. MSK IAM auth) go here... - - return KafkaSource.builder() - .setBootstrapServers(bootstrapServers) - .setTopics(topic) - .setGroupId(consumerGroupId) - .setDeserializer(kafkaRecordDeserializationSchema) - .setProperties(kafkaConsumerConfig) - .build(); - } - - /** - * KafkaSink for any AVRO-generated class (specific records) using AWS Glue Schema Registry - * using a Kafka message key (a String) extracted from the record using a KeySelector. - * - * @param record type - * @param payloadAvroClass AVRO-generated class for the message body - * @param messageKeyExtractor KeySelector to extract the message key from the record - * @param bootstrapServers Kafka bootstrap servers - * @param topic topic name - * @param schemaRegistryName Glue Schema Registry name - * @param schemaRegistryRegion Glue Schema Registry region - * @param kafkaProducerConfig configuration passed to the Kafka Producer - * @return a KafkaSink - */ - private static KafkaSink keyedKafkaSink( - Class payloadAvroClass, - KeySelector messageKeyExtractor, - String bootstrapServers, - String topic, - String schemaRegistryName, - String schemaRegistryRegion, - Properties kafkaProducerConfig) { - - // SerializationSchema for the message body: AVRO with Glue Schema Registry - // (GlueSchemaRegistryAvroSerializationSchema expects a Map as configuration) - Map serializerConfig = Map.of( - AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true, // Enable Schema Auto-registration (the name of the schema is based on the name of the topic) - AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion, - AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryName); - SerializationSchema valueSerializationSchema = GlueSchemaRegistryAvroSerializationSchema.forSpecific(payloadAvroClass, topic, serializerConfig); - - // SerializationSchema for the message key. - // Extracts the key (a String) from the record using a KeySelector - // and covert the String to bytes as UTF-8 (same default behaviour of org.apache.flink.api.common.serialization.SimpleStringSchema) - SerializationSchema keySerializationSchema = record -> { - try { - return messageKeyExtractor.getKey(record).getBytes(StandardCharsets.UTF_8); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - - // ... more Kafka consumer configurations (e.g. MSK IAM auth) go here... - - return KafkaSink.builder() - .setBootstrapServers(bootstrapServers) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setTopic(topic) - .setValueSerializationSchema(valueSerializationSchema) - .setKeySerializationSchema(keySerializationSchema) - .build()) - .setKafkaProducerConfig(kafkaProducerConfig) - .build(); - } - - - public static void main(String[] args) throws Exception { - // set up the streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - - // Local dev specific settings - if (isLocal(env)) { - // Checkpointing and parallelism are set by the service when running on AWS - env.enableCheckpointing(60000); - env.setParallelism(2); - } - - // Application configuration - Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP); - String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); - String sourceTopic = Preconditions.checkNotNull(applicationProperties.getProperty("source.topic"), "source.topic not defined"); - String sourceConsumerGroupId = applicationProperties.getProperty("source.consumer.group.id", "flink-avro-gsr-sample"); - String sinkTopic = Preconditions.checkNotNull(applicationProperties.getProperty("sink.topic"), "sink.topic not defined"); - String schemaRegistryName = Preconditions.checkNotNull(applicationProperties.getProperty("schema.registry.name"), "schema.registry.name not defined"); - String schemaRegistryRegion = Preconditions.checkNotNull(applicationProperties.getProperty("schema.registry.region"), "schema.registry.region not defined"); - - - KafkaSource source = kafkaSource( - TemperatureSample.class, - bootstrapServers, - sourceTopic, - sourceConsumerGroupId, - schemaRegistryName, - schemaRegistryRegion, - new Properties() // ...any other Kafka consumer property - ); - - KafkaSink sink = keyedKafkaSink( - RoomTemperature.class, - RoomTemperature::getRoom, - bootstrapServers, - sinkTopic, - schemaRegistryName, - schemaRegistryRegion, - new Properties() // ...any other Kafka producer property - ); - - - DataStream temperatureSamples = env.fromSource( - source, - WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) - .withIdleness(Duration.ofSeconds(60)) - .withTimestampAssigner((sample, ts) -> sample.getSampleTime().toEpochMilli()), - "Temperature Samples source") - .uid("temperature-samples-source"); - - DataStream roomTemperatures = temperatureSamples - .keyBy(TemperatureSample::getRoom) - .window(TumblingEventTimeWindows.of(Time.seconds(60))) - .aggregate(new RoomAverageTemperatureCalculator()) - .uid("room-temperatures"); - - roomTemperatures.sinkTo(sink).uid("room-temperatures-sink"); - - env.execute("Flink app"); - } -} diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java b/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java deleted file mode 100644 index 40e994a6..00000000 --- a/java/AvroGlueSchemaRegistryKafka/src/main/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculator.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.amazonaws.services.msf.domain; - -import com.amazonaws.services.msf.avro.RoomTemperature; -import com.amazonaws.services.msf.avro.TemperatureSample; -import org.apache.flink.api.common.functions.AggregateFunction; - -import java.time.Instant; - -/** - * AggregateFunction aggregating the room average temperature - * (provided only for completeness, but not relevant for the AVRO-Glue Schema Registry example) - */ -public class RoomAverageTemperatureCalculator implements AggregateFunction { - private Instant maxInstant(Instant a, Instant b) { - return Instant.ofEpochMilli( - Math.max(a.toEpochMilli(), b.toEpochMilli())); - } - - @Override - public RoomTemperature createAccumulator() { - return RoomTemperature.newBuilder() - .setRoom("").setSampleCount(0).setLastSampleTime(Instant.EPOCH).setTemperature(0).build(); - } - - @Override - public RoomTemperature add(TemperatureSample sample, RoomTemperature accumulator) { - final int sampleCount = accumulator.getSampleCount(); - final float roomAvgTemp = accumulator.getTemperature(); - final Instant lastSampleTime = maxInstant(accumulator.getLastSampleTime(), sample.getSampleTime()); - final float newRoomAvgTemp = (roomAvgTemp * sampleCount + sample.getTemperature()) / (float) (sampleCount + 1); - - accumulator.setRoom(sample.getRoom()); - accumulator.setSampleCount(sampleCount + 1); - accumulator.setTemperature(newRoomAvgTemp); - accumulator.setLastSampleTime(lastSampleTime); - - return accumulator; - } - - @Override - public RoomTemperature getResult(RoomTemperature accumulator) { - return accumulator; - } - - @Override - public RoomTemperature merge(RoomTemperature a, RoomTemperature b) { - final int totalSampleCount = a.getSampleCount() + b.getSampleCount(); - final float avgTemperature = (a.getTemperature() * a.getSampleCount() + b.getTemperature() * b.getSampleCount()) / (float) (totalSampleCount); - - return RoomTemperature.newBuilder() - .setRoom(a.getRoom()) - .setSampleCount(totalSampleCount) - .setTemperature(avgTemperature) - .setLastSampleTime(maxInstant(a.getLastSampleTime(), b.getLastSampleTime())) - .build(); - } - - -} diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl b/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl deleted file mode 100644 index 3024ebe8..00000000 --- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/avro/output-writer-schema.avdl +++ /dev/null @@ -1,9 +0,0 @@ -@namespace("com.amazonaws.services.msf.avro") -protocol Out { - record RoomTemperature { - string room; - float temperature; - int sampleCount; - timestamp_ms lastSampleTime; - } -} \ No newline at end of file diff --git a/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json b/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json deleted file mode 100644 index 9cec7377..00000000 --- a/java/AvroGlueSchemaRegistryKafka/src/main/resources/flink-application-properties-dev.json +++ /dev/null @@ -1,13 +0,0 @@ -[ - { - "PropertyGroupId": "FlinkApplicationProperties", - "PropertyMap": { - "bootstrap.servers": "localhost:29092", - "source.topic": "temperature-samples", - "source.consumer.group.id": "flink-avro-gsr", - "sink.topic": "room-temperatures", - "schema.registry.name": "avro-sample", - "schema.registry.region": "eu-west-1" - } - } -] diff --git a/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java b/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java deleted file mode 100644 index 5c60673d..00000000 --- a/java/AvroGlueSchemaRegistryKafka/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.amazonaws.services.msf.domain; - -import com.amazonaws.services.msf.avro.RoomTemperature; -import com.amazonaws.services.msf.avro.TemperatureSample; -import org.junit.jupiter.api.Test; - -import java.time.Instant; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class RoomAverageTemperatureCalculatorTest { - - RoomAverageTemperatureCalculator sut = new RoomAverageTemperatureCalculator(); - - @Test - void createAccumulator() { - RoomTemperature roomTemperature = sut.createAccumulator(); - - assertNotNull(roomTemperature); - assertEquals(0.0f, roomTemperature.getTemperature()); - assertEquals(0, roomTemperature.getSampleCount()); - assertEquals(Instant.EPOCH, roomTemperature.getLastSampleTime()); - } - - @Test - void add() { - TemperatureSample sample = TemperatureSample.newBuilder() - .setSensorId(42) - .setRoom("a-room") - .setSampleTime(Instant.ofEpochMilli(50)) - .setTemperature(10.0f) - .build(); - RoomTemperature acc = RoomTemperature.newBuilder() - .setRoom("a-room") - .setSampleCount(10) - .setTemperature(20.0f) - .setLastSampleTime(Instant.ofEpochMilli(100)) - .build(); - - RoomTemperature accOut = sut.add(sample, acc); - - assertEquals("a-room", accOut.getRoom()); - assertEquals(10 + 1, accOut.getSampleCount()); - assertEquals(Instant.ofEpochMilli(100), accOut.getLastSampleTime()); - assertEquals((20.0f * 10 + 10.0f) / (float) (10 + 1), accOut.getTemperature()); - } - - @Test - void getResult() { - RoomTemperature acc = RoomTemperature.newBuilder() - .setRoom("a-room") - .setSampleCount(10) - .setTemperature(20.0f) - .setLastSampleTime(Instant.ofEpochMilli(100)) - .build(); - - RoomTemperature result = sut.getResult(acc); - assertEquals(acc.getRoom(), result.getRoom()); - assertEquals(acc.getTemperature(), result.getTemperature()); - assertEquals(acc.getSampleCount(), result.getSampleCount()); - assertEquals(acc.getLastSampleTime(), acc.getLastSampleTime()); - } - - @Test - void merge() { - RoomTemperature a = RoomTemperature.newBuilder() - .setRoom("a-room") - .setSampleCount(10) - .setTemperature(20.0f) - .setLastSampleTime(Instant.ofEpochMilli(100)) - .build(); - RoomTemperature b = RoomTemperature.newBuilder() - .setRoom("a-room") - .setSampleCount(20) - .setTemperature(40.0f) - .setLastSampleTime(Instant.ofEpochMilli(200)) - .build(); - - RoomTemperature c = sut.merge(a, b); - assertEquals("a-room", c.getRoom()); - assertEquals(10 + 20, c.getSampleCount()); - assertEquals((20.0f * 10 + 40.0f * 20) / (float) (10 + 20), c.getTemperature()); - assertEquals(Instant.ofEpochMilli(200), c.getLastSampleTime()); - } -} \ No newline at end of file