Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 94 additions & 47 deletions java/AvroGlueSchemaRegistryKafka/README.md
Original file line number Diff line number Diff line change
@@ -1,82 +1,129 @@
## AVRO serialization in KafkaSource and KafkaSink using AWS Glue Schema Registry

* Flink version: 1.15
This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (GSR).

* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
* Connectors: Kafka connector, DataGenerator

This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html).
The example contains two Flink applications:
1. a [producer](./producer) job, which generates random temperature samples and publishes to Kafka as AVRO using GSR.
2. a [consumer](./consumer) job, which reads temperature records from the same topic, using GSR.

This example uses AVRO generated classes (more details, [below](#Using_AVRO-generated_classes))
Both applications use AVRO-specific records based on the schema definitions provided at compile time.
The record classes are generated during the build process based on the AVRO IDL (`.avdl`) you can find in the resources
folder of both jobs. For simplicity, the schema definition is repeated in both jobs.

The reader's schema definition, for the source, and the writer's schema definition, for the sink, are provided as
AVRO IDL (`.avdl`) in [./src/main/resources/avro](./src/main/resources/avro).
The two jobs are designed to run as separate Amazon Managed Service for Apache Flink applications, connecting to the
same Amazon Managed Streaming for Kafka (MSK) cluster.

The default configuration uses unauthenticated connection to MSK. The example can be extended to implement any supported
MSK authentication scheme.

A KafkaSource produces a stream of AVRO data objects (SpecificRecords), fetching the writer's schema from AWS Glue
Schema Registry. The AVRO Kafka message value must have been serialized using AWS Glue Schema Registry.
### Prerequisites

A KafkaSink serializes AVRO data objects as Kafka message value, and a String, converted to bytes as UTF-8, as Kafka
message key.
To run the two Managed Flink applications you need to set up the following prerequisites:

## Flink compatibility
1. An MSK cluster
- Create the topic `temperature-samples` or enable auto topic creation
- Allow unauthenticated access (or modify the application to support the configured authentication scheme)
2. Create a Registry named `temperature-schema-registry` in Glue Schema Registry, in the same region
3. ⚠️ Create a VPC Endpoint for Glue in the VPC where the Managed Flink applications are attached.
Without VPCE an application connected to a VPC may not be able to connector to a service endpoint.

**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink
### Create the Amazon Managed Service for Apache Flink applications

### Flink API compatibility
Create two Managed Flink applications, one for the producer and the other for the consumer.
1. Build both jobs by running `mvn package` in the directory of the example. This will build two JARs in the `target` subfolder of the producer and consumer.
2. Upload both JARs to an S3 bucket and use them as application code, for producer and consumer respectively..
3. Application configuration
* Attach both applications to a VPC with access to the MSK cluster and ensure the Security Group allows access to the MSK cluster.
For simplicity, to run the example we suggest to use for both applications the same VPC, same subnets, and same Security Group as the MSK cluster
* Ensure the applications have permissions to access Glue Schema Registry. For the sake of this example you can attach
the policy `AWSGlueSchemaRegistryFullAccess` to the producer application's IAM Role, and the policy `AWSGlueSchemaRegistryReadonlyAccess`
to the consumer's Role.
* Set up the Runtime properties of the two applications as described in the following sections.

This example shows how to use AWS Glue Schema Registry with the Flink Java DataStream API.
### Runtime configuration

It uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, deprecated
with Flink 1.15).
The two applications expect different configurations.
When running locally the configurations are fetched from the `flink-application-properties-dev.json` files in the resources
folder of each job.

At the moment, no format provider is available for the Table API.
When running on Managed Flink these files are ignored and the configuration must be passed using the Runtime properties
as part of the configuration of each application.

## Notes about using AVRO with Apache Flink
All parameters are case-sensitive.

### AVRO-generated classes
#### Producer runtime parameters

This project uses classes generated at built-time as data objects.
| Group ID | Key | Description |
|-------------------|-----------------------|----------------------------------------------------------------|
| `Output0` | `bootstrap.servers` | Kafka bootstrap servers |
| `Output0` | `topic` | Kafka topic name for temperature samples |
| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |
| `DataGen` | `samples.per.second` | (optional) Rate of sample generation per second (default: 100) |

As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
code.

AVRO Maven plugin generates the Java classes (source code) at build-time, during the

#### Consumer runtime parameters

| Group ID | Key | Description |
|-------------------|-----------------------|-------------------------------------------------|
| `Input0` | `bootstrap.servers` | Kafka bootstrap servers |
| `Input0` | `topic` | Kafka topic name for temperature samples |
| `Input0` | `group.id` | Kafka consumer group ID |
| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |


### Running the applications locally

A [docker-compose](docker/docker-compose.yml) file is provided to run a local Kafka cluster for local development.
The default configurations use this local cluster.

When running locally the jobs will use the actual Glue Schema Registry.
Make sure the machine where you are developing has an authenticated AWS CLI profile with permissions to use GSR. Use the
AWS Plugin of your IDE to make the application run with a specific AWS profile.

See [Running examples locally](../running-examples-locally.md) for further details.


### Notes about using AVRO with Apache Flink

#### AVRO-generated classes

This project uses classes generated at build-time as data objects.

As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
code. The AVRO Maven plugin generates the Java classes (source code) at build-time, during the
[`generate-source`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase.

The generated classes are written into `./target/generated-sources/avro` directory and should **not** be committed with
the project source.

This way, the only dependency is on the schema definition file(s).
the project source. This way, the only dependency is on the schema definition file(s).
If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build.

Code generation is supported by all common IDEs like IntelliJ.
If your IDE does not see the AVRO classes (`TemperatureSample` and `RoomTemperature`) when you import the project for the
first time, you may manually run `mvn generate-sources` once of force source code generation from the IDE.
If your IDE does not see the AVRO classes (`TemperatureSample`) when you import the project for the
first time, you may manually run `mvn generate-sources` once or force source code generation from the IDE.

### AVRO-generated classes (SpecificRecord) in Apache Flink
#### AVRO-generated classes (SpecificRecord) in Apache Flink

Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the
Flink state, has an additional benefit.
Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)
these objects, without risking of falling back to Kryo.

### AVRO and AWS Glue Schema Registry dependencies

The following dependencies related to AVRO and AWS Glue Schema Registry are included (for FLink 1.15.2):

1. `org.apache.flink:flink-avro-glue-schema-registry_2.12:1.15.2` - Support for AWS Glue Schema Registry SerDe
2. `org.apache.avro:avro:1.10.2` - Overrides AVRO 1.10.0, transitively included.

The project also includes `org.apache.flink:flink-avro:1.15.2`.
This is already a transitive dependency from the Glue Schema Registry SerDe and is defined explicitly only for clarity.
these objects, without the risk of falling back to Kryo.

Note that we are overriding AVRO 1.10.0 with 1.10.2.
This minor version upgrade does not break the internal API, and includes some bug fixes introduced with
AVRO [1.10.1](https://github.com/apache/avro/releases/tag/release-1.10.1)
and [1.10.2](https://github.com/apache/avro/releases/tag/release-1.10.2).
### Common issues

### Running in IntelliJ
If the application fails to call the Glue Schema Registry API for any reasons, the job gets trapped in a fail-and-restart
loop. The exception says it cannot fetch or register a schema version.

To start the Flink job in IntelliJ edit the Run/Debug configuration enabling 'Add dependencies with "provided" scope to
the classpath'.
The inability to use GSR may be caused by:
* Lack of permissions to access GSR --> add `AWSGlueSchemaRegistryFullAccess` or `AWSGlueSchemaRegistryReadonlyAccess` policies to the application IAM Role
* Unable to reach the Glue endpoint --> create a Glue VPC Endpoint in the application VPC
* The Registry does not exist --> create a registry with the configured name (`temperature-schema-registry` by default)
* Misconfiguration --> ensure the registry name and region passed to the application match your setup
99 changes: 99 additions & 0 deletions java/AvroGlueSchemaRegistryKafka/consumer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.amazonaws</groupId>
<artifactId>avro-gsr-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>avro-gsr-kafka-consumer</artifactId>

<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
</dependency>

<!-- Library used to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
</dependency>

<!-- Connectors -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
</dependency>

<!-- GSR Format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-glue-schema-registry</artifactId>
</dependency>

<!-- AVRO -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Loading