Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f
- [**Serialization**](./java/Serialization) - Serialization of record and state
- [**Windowing**](./java/Windowing) - Time-based window aggregation examples
- [**Side Outputs**](./java/SideOutputs) - Using side outputs for data routing and filtering
- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\
- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls
- [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics
- [**Fetching credentials from Secrets Manager**](./java/FetchSecrets) - Dynamically fetching credentials from AWS Secrets Manager

#### Utilities
- [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing.
Expand Down
113 changes: 113 additions & 0 deletions java/FetchSecrets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
## Fetching Secrets from Secrets Manager

This example demonstrates how to fetch secrets from AWS Secrets Manager at application start.

* Flink version: 1.20
* Flink API: DataStream
* Language: Java (11)
* Flink connectors: DataGen, Kafka sink

This example shows how you can fetch any secrets from AWS Secrets Manager, without passing them as non-encrypted configuration parameters.
In this case, the job is fetching username and password for MSK SASL/SCRAM authentication.
The application generates random stock prices and writes them, as JSON, to a Kafka topic.

Note that this method works for any secrets represented as text, which are directly passed to the constructor of the operator.
This method does not work for fetching keystore or truststore files.

### Prerequisites

#### MSK

To run this application on Amazon Managed Service for Apache Flink, you need an Amazon MSK cluster configured for
SASL/SCRAM authentication. See [MSK Documentation](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html)
for details on how to set it up.

The cluster must contain a topic named `stock-prices` or allow auto topic creation.

If you set up any Kafka ACL, the user must have permissions to write to this topic.

#### Managed Flink Application Service Role

The IAM Service Role attached to the Managed Flink application must have sufficient permissions to fetch the credentials
from Amazon Secrets Manager. See [Amazon Secrets Manager documentation](https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html)
for further details.

MSK SASL/SCRAM credentials must be encrypted with a customer managed key (CMK). The application Service Role must also
provide permissions to use the CMK to decrypt the secret (`kms:Decrypt`).

Here is an example of an IAM Policy to allow the application to fetch and decrypt the secret:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowFetchSecret",
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "arn:aws:secretsmanager:<region>:<account>:secret:<secretName>-*"
},
{
"Sid": "AllowDecryptSecret",
"Effect": "Allow",
"Action": "kms:Decrypt",
"Resource": "arn:aws:kms:<region>:<account>:key/<key-id>"
}
]
}
```

⚠️ Note that the KMS Key Policy may also restrict access to the CMK.
If you are using a restrictive Key Policy, you also need to allow your Managed Flink application to decrypt.
Add the following snippet to the KMS Key Policy, in addition to other permissions:

```json
{
"Sid": "AllowDecrypting",
"Effect": "Allow",
"Principal": {
"Service": "kinesisanalytics.amazonaws.com"
},
"Action": "kms:Decrypt",
"Resource": "*"
}
```

#### Managed Flink Application VPC Networking

To be able to connect to the MSK cluster, the Managed Flink application must have VPC networking configured, and must
be able to reach the MSK cluster. For the sake of this example, the simplest setup is using the same VPC, Subnets, and Security Group
as the MSK cluster.

### Runtime Configuration

When running on Amazon Managed Service for Apache Flink, the runtime configuration is read from *Runtime Properties*.

When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.

Runtime parameters:

| Group ID | Key | Description |
|------------------|----------------------|--------------------------------------------------------------------------|
| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) |
| `Output0` | `bootstrap.servers` | Kafka bootstrap servers |
| `Output0` | `topic` | Target Kafka topic (default: "stock-prices") |
| `AuthProperties` | `secret.name` | AWS Secrets Manager secret name containing username/password credentials |

The `bootstrap.servers` should be the one for SASL/SCRAM (port 9096).

### Testing Locally

The application cannot be run locally, unless you provide networking from your machine to an MSK cluster supporting
SASL/SCRAM authentication, for example via VPN.

Fetching the secret from Secrets Manager works from your machine, as long as you have an authenticated AWS CLI profile
which allows fetching the secret, and you let your application use the profile using the IDE AWS Plugin.


### Known Limitations

Credentials can be fetched only once, when the job starts.
Flink does not have any easy way to dynamically update an operator, for example the Kafka Sink, while the job is running.

If you implement any credential rotation, the new credentials will not be used by the application unless you restart the job.
180 changes: 180 additions & 0 deletions java/FetchSecrets/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>fetch-secrets</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<kafka.connector.version>3.2.0-1.19</kafka.connector.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.17.2</log4j.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.20.162</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Flink Core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- AWS SDK for Secrets Manager -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
</dependency>

<!-- DataGen connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Flink Kafka connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${kafka.connector.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Logging framework -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- JUnit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<directory>${buildDirectory}</directory>
<finalName>${jar.finalName}</finalName>

<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

<!-- Shade plugin to build the fat-jar including all required dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.FetchSecretsJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading