Skip to content

Commit 6891278

Browse files
authored
Updated and refactored Kafka GSR AVRO example (#134)
1 parent d0aaa0a commit 6891278

File tree

21 files changed

+937
-652
lines changed

21 files changed

+937
-652
lines changed
Lines changed: 94 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,129 @@
11
## AVRO serialization in KafkaSource and KafkaSink using AWS Glue Schema Registry
22

3-
* Flink version: 1.15
3+
This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
4+
[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (GSR).
5+
6+
* Flink version: 1.20
47
* Flink API: DataStream API
58
* Language: Java (11)
9+
* Connectors: Kafka connector, DataGenerator
610

7-
This example demonstrates how to serialize/deserialize AVRO messages in Kafka sources and sinks, using
8-
[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html).
11+
The example contains two Flink applications:
12+
1. a [producer](./producer) job, which generates random temperature samples and publishes to Kafka as AVRO using GSR.
13+
2. a [consumer](./consumer) job, which reads temperature records from the same topic, using GSR.
914

10-
This example uses AVRO generated classes (more details, [below](#Using_AVRO-generated_classes))
15+
Both applications use AVRO-specific records based on the schema definitions provided at compile time.
16+
The record classes are generated during the build process based on the AVRO IDL (`.avdl`) you can find in the resources
17+
folder of both jobs. For simplicity, the schema definition is repeated in both jobs.
1118

12-
The reader's schema definition, for the source, and the writer's schema definition, for the sink, are provided as
13-
AVRO IDL (`.avdl`) in [./src/main/resources/avro](./src/main/resources/avro).
19+
The two jobs are designed to run as separate Amazon Managed Service for Apache Flink applications, connecting to the
20+
same Amazon Managed Streaming for Kafka (MSK) cluster.
1421

22+
The default configuration uses unauthenticated connection to MSK. The example can be extended to implement any supported
23+
MSK authentication scheme.
1524

16-
A KafkaSource produces a stream of AVRO data objects (SpecificRecords), fetching the writer's schema from AWS Glue
17-
Schema Registry. The AVRO Kafka message value must have been serialized using AWS Glue Schema Registry.
25+
### Prerequisites
1826

19-
A KafkaSink serializes AVRO data objects as Kafka message value, and a String, converted to bytes as UTF-8, as Kafka
20-
message key.
27+
To run the two Managed Flink applications you need to set up the following prerequisites:
2128

22-
## Flink compatibility
29+
1. An MSK cluster
30+
- Create the topic `temperature-samples` or enable auto topic creation
31+
- Allow unauthenticated access (or modify the application to support the configured authentication scheme)
32+
2. Create a Registry named `temperature-schema-registry` in Glue Schema Registry, in the same region
33+
3. ⚠️ Create a VPC Endpoint for Glue in the VPC where the Managed Flink applications are attached.
34+
Without VPCE an application connected to a VPC may not be able to connector to a service endpoint.
2335

24-
**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink
36+
### Create the Amazon Managed Service for Apache Flink applications
2537

26-
### Flink API compatibility
38+
Create two Managed Flink applications, one for the producer and the other for the consumer.
39+
1. Build both jobs by running `mvn package` in the directory of the example. This will build two JARs in the `target` subfolder of the producer and consumer.
40+
2. Upload both JARs to an S3 bucket and use them as application code, for producer and consumer respectively..
41+
3. Application configuration
42+
* Attach both applications to a VPC with access to the MSK cluster and ensure the Security Group allows access to the MSK cluster.
43+
For simplicity, to run the example we suggest to use for both applications the same VPC, same subnets, and same Security Group as the MSK cluster
44+
* Ensure the applications have permissions to access Glue Schema Registry. For the sake of this example you can attach
45+
the policy `AWSGlueSchemaRegistryFullAccess` to the producer application's IAM Role, and the policy `AWSGlueSchemaRegistryReadonlyAccess`
46+
to the consumer's Role.
47+
* Set up the Runtime properties of the two applications as described in the following sections.
2748

28-
This example shows how to use AWS Glue Schema Registry with the Flink Java DataStream API.
49+
### Runtime configuration
2950

30-
It uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, deprecated
31-
with Flink 1.15).
51+
The two applications expect different configurations.
52+
When running locally the configurations are fetched from the `flink-application-properties-dev.json` files in the resources
53+
folder of each job.
3254

33-
At the moment, no format provider is available for the Table API.
55+
When running on Managed Flink these files are ignored and the configuration must be passed using the Runtime properties
56+
as part of the configuration of each application.
3457

35-
## Notes about using AVRO with Apache Flink
58+
All parameters are case-sensitive.
3659

37-
### AVRO-generated classes
60+
#### Producer runtime parameters
3861

39-
This project uses classes generated at built-time as data objects.
62+
| Group ID | Key | Description |
63+
|-------------------|-----------------------|----------------------------------------------------------------|
64+
| `Output0` | `bootstrap.servers` | Kafka bootstrap servers |
65+
| `Output0` | `topic` | Kafka topic name for temperature samples |
66+
| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
67+
| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |
68+
| `DataGen` | `samples.per.second` | (optional) Rate of sample generation per second (default: 100) |
4069

41-
As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
42-
code.
4370

44-
AVRO Maven plugin generates the Java classes (source code) at build-time, during the
71+
72+
#### Consumer runtime parameters
73+
74+
| Group ID | Key | Description |
75+
|-------------------|-----------------------|-------------------------------------------------|
76+
| `Input0` | `bootstrap.servers` | Kafka bootstrap servers |
77+
| `Input0` | `topic` | Kafka topic name for temperature samples |
78+
| `Input0` | `group.id` | Kafka consumer group ID |
79+
| `SchemaRegistry` | `name` | AWS Glue Schema Registry name |
80+
| `SchemaRegistry` | `region` | AWS region where the Schema Registry is located |
81+
82+
83+
### Running the applications locally
84+
85+
A [docker-compose](docker/docker-compose.yml) file is provided to run a local Kafka cluster for local development.
86+
The default configurations use this local cluster.
87+
88+
When running locally the jobs will use the actual Glue Schema Registry.
89+
Make sure the machine where you are developing has an authenticated AWS CLI profile with permissions to use GSR. Use the
90+
AWS Plugin of your IDE to make the application run with a specific AWS profile.
91+
92+
See [Running examples locally](../running-examples-locally.md) for further details.
93+
94+
95+
### Notes about using AVRO with Apache Flink
96+
97+
#### AVRO-generated classes
98+
99+
This project uses classes generated at build-time as data objects.
100+
101+
As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
102+
code. The AVRO Maven plugin generates the Java classes (source code) at build-time, during the
45103
[`generate-source`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase.
46104

47105
The generated classes are written into `./target/generated-sources/avro` directory and should **not** be committed with
48-
the project source.
49-
50-
This way, the only dependency is on the schema definition file(s).
106+
the project source. This way, the only dependency is on the schema definition file(s).
51107
If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build.
52108

53109
Code generation is supported by all common IDEs like IntelliJ.
54-
If your IDE does not see the AVRO classes (`TemperatureSample` and `RoomTemperature`) when you import the project for the
55-
first time, you may manually run `mvn generate-sources` once of force source code generation from the IDE.
110+
If your IDE does not see the AVRO classes (`TemperatureSample`) when you import the project for the
111+
first time, you may manually run `mvn generate-sources` once or force source code generation from the IDE.
56112

57-
### AVRO-generated classes (SpecificRecord) in Apache Flink
113+
#### AVRO-generated classes (SpecificRecord) in Apache Flink
58114

59115
Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the
60116
Flink state, has an additional benefit.
61117
Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)
62-
these objects, without risking of falling back to Kryo.
63-
64-
### AVRO and AWS Glue Schema Registry dependencies
65-
66-
The following dependencies related to AVRO and AWS Glue Schema Registry are included (for FLink 1.15.2):
67-
68-
1. `org.apache.flink:flink-avro-glue-schema-registry_2.12:1.15.2` - Support for AWS Glue Schema Registry SerDe
69-
2. `org.apache.avro:avro:1.10.2` - Overrides AVRO 1.10.0, transitively included.
70-
71-
The project also includes `org.apache.flink:flink-avro:1.15.2`.
72-
This is already a transitive dependency from the Glue Schema Registry SerDe and is defined explicitly only for clarity.
118+
these objects, without the risk of falling back to Kryo.
73119

74-
Note that we are overriding AVRO 1.10.0 with 1.10.2.
75-
This minor version upgrade does not break the internal API, and includes some bug fixes introduced with
76-
AVRO [1.10.1](https://github.com/apache/avro/releases/tag/release-1.10.1)
77-
and [1.10.2](https://github.com/apache/avro/releases/tag/release-1.10.2).
120+
### Common issues
78121

79-
### Running in IntelliJ
122+
If the application fails to call the Glue Schema Registry API for any reasons, the job gets trapped in a fail-and-restart
123+
loop. The exception says it cannot fetch or register a schema version.
80124

81-
To start the Flink job in IntelliJ edit the Run/Debug configuration enabling 'Add dependencies with "provided" scope to
82-
the classpath'.
125+
The inability to use GSR may be caused by:
126+
* Lack of permissions to access GSR --> add `AWSGlueSchemaRegistryFullAccess` or `AWSGlueSchemaRegistryReadonlyAccess` policies to the application IAM Role
127+
* Unable to reach the Glue endpoint --> create a Glue VPC Endpoint in the application VPC
128+
* The Registry does not exist --> create a registry with the configured name (`temperature-schema-registry` by default)
129+
* Misconfiguration --> ensure the registry name and region passed to the application match your setup
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>com.amazonaws</groupId>
9+
<artifactId>avro-gsr-kafka</artifactId>
10+
<version>1.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>avro-gsr-kafka-consumer</artifactId>
14+
15+
<dependencies>
16+
<!-- Apache Flink dependencies -->
17+
<dependency>
18+
<groupId>org.apache.flink</groupId>
19+
<artifactId>flink-java</artifactId>
20+
</dependency>
21+
<dependency>
22+
<groupId>org.apache.flink</groupId>
23+
<artifactId>flink-streaming-java</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.apache.flink</groupId>
27+
<artifactId>flink-clients</artifactId>
28+
</dependency>
29+
30+
<!-- Library used to retrieve runtime application properties in Managed Service for Apache Flink -->
31+
<dependency>
32+
<groupId>com.amazonaws</groupId>
33+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
34+
</dependency>
35+
36+
<!-- Connectors -->
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-connector-base</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.flink</groupId>
43+
<artifactId>flink-connector-kafka</artifactId>
44+
</dependency>
45+
46+
<!-- GSR Format -->
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-avro-glue-schema-registry</artifactId>
50+
</dependency>
51+
52+
<!-- AVRO -->
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-avro</artifactId>
56+
</dependency>
57+
58+
<!-- Logging -->
59+
<dependency>
60+
<groupId>org.apache.logging.log4j</groupId>
61+
<artifactId>log4j-slf4j-impl</artifactId>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.apache.logging.log4j</groupId>
65+
<artifactId>log4j-api</artifactId>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.apache.logging.log4j</groupId>
69+
<artifactId>log4j-core</artifactId>
70+
</dependency>
71+
72+
<!-- Test -->
73+
<dependency>
74+
<groupId>org.junit.jupiter</groupId>
75+
<artifactId>junit-jupiter-api</artifactId>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.junit.jupiter</groupId>
79+
<artifactId>junit-jupiter-engine</artifactId>
80+
</dependency>
81+
</dependencies>
82+
83+
<build>
84+
<plugins>
85+
<plugin>
86+
<groupId>org.apache.maven.plugins</groupId>
87+
<artifactId>maven-compiler-plugin</artifactId>
88+
</plugin>
89+
<plugin>
90+
<groupId>org.apache.avro</groupId>
91+
<artifactId>avro-maven-plugin</artifactId>
92+
</plugin>
93+
<plugin>
94+
<groupId>org.apache.maven.plugins</groupId>
95+
<artifactId>maven-shade-plugin</artifactId>
96+
</plugin>
97+
</plugins>
98+
</build>
99+
</project>

0 commit comments

Comments
 (0)