diff --git a/README.md b/README.md index bbaa3b8..3ad088f 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,9 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f - [**Serialization**](./java/Serialization) - Serialization of record and state - [**Windowing**](./java/Windowing) - Time-based window aggregation examples - [**Side Outputs**](./java/SideOutputs) - Using side outputs for data routing and filtering -- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\ +- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls - [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics +- [**Fetching credentials from Secrets Manager**](./java/FetchSecrets) - Dynamically fetching credentials from AWS Secrets Manager #### Utilities - [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing. diff --git a/java/FetchSecrets/README.md b/java/FetchSecrets/README.md new file mode 100644 index 0000000..409ca03 --- /dev/null +++ b/java/FetchSecrets/README.md @@ -0,0 +1,113 @@ +## Fetching Secrets from Secrets Manager + +This example demonstrates how to fetch secrets from AWS Secrets Manager at application start. + +* Flink version: 1.20 +* Flink API: DataStream +* Language: Java (11) +* Flink connectors: DataGen, Kafka sink + +This example shows how you can fetch any secrets from AWS Secrets Manager, without passing them as non-encrypted configuration parameters. +In this case, the job is fetching username and password for MSK SASL/SCRAM authentication. +The application generates random stock prices and writes them, as JSON, to a Kafka topic. + +Note that this method works for any secrets represented as text, which are directly passed to the constructor of the operator. +This method does not work for fetching keystore or truststore files. + +### Prerequisites + +#### MSK + +To run this application on Amazon Managed Service for Apache Flink, you need an Amazon MSK cluster configured for +SASL/SCRAM authentication. See [MSK Documentation](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) +for details on how to set it up. + +The cluster must contain a topic named `stock-prices` or allow auto topic creation. + +If you set up any Kafka ACL, the user must have permissions to write to this topic. + +#### Managed Flink Application Service Role + +The IAM Service Role attached to the Managed Flink application must have sufficient permissions to fetch the credentials +from Amazon Secrets Manager. See [Amazon Secrets Manager documentation](https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html) +for further details. + +MSK SASL/SCRAM credentials must be encrypted with a customer managed key (CMK). The application Service Role must also +provide permissions to use the CMK to decrypt the secret (`kms:Decrypt`). + +Here is an example of an IAM Policy to allow the application to fetch and decrypt the secret: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowFetchSecret", + "Effect": "Allow", + "Action": "secretsmanager:GetSecretValue", + "Resource": "arn:aws:secretsmanager:::secret:-*" + }, + { + "Sid": "AllowDecryptSecret", + "Effect": "Allow", + "Action": "kms:Decrypt", + "Resource": "arn:aws:kms:::key/" + } + ] +} +``` + +⚠️ Note that the KMS Key Policy may also restrict access to the CMK. +If you are using a restrictive Key Policy, you also need to allow your Managed Flink application to decrypt. +Add the following snippet to the KMS Key Policy, in addition to other permissions: + +```json +{ + "Sid": "AllowDecrypting", + "Effect": "Allow", + "Principal": { + "Service": "kinesisanalytics.amazonaws.com" + }, + "Action": "kms:Decrypt", + "Resource": "*" +} +``` + +#### Managed Flink Application VPC Networking + +To be able to connect to the MSK cluster, the Managed Flink application must have VPC networking configured, and must +be able to reach the MSK cluster. For the sake of this example, the simplest setup is using the same VPC, Subnets, and Security Group +as the MSK cluster. + +### Runtime Configuration + +When running on Amazon Managed Service for Apache Flink, the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|------------------|----------------------|--------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | +| `Output0` | `bootstrap.servers` | Kafka bootstrap servers | +| `Output0` | `topic` | Target Kafka topic (default: "stock-prices") | +| `AuthProperties` | `secret.name` | AWS Secrets Manager secret name containing username/password credentials | + +The `bootstrap.servers` should be the one for SASL/SCRAM (port 9096). + +### Testing Locally + +The application cannot be run locally, unless you provide networking from your machine to an MSK cluster supporting +SASL/SCRAM authentication, for example via VPN. + +Fetching the secret from Secrets Manager works from your machine, as long as you have an authenticated AWS CLI profile +which allows fetching the secret, and you let your application use the profile using the IDE AWS Plugin. + + +### Known Limitations + +Credentials can be fetched only once, when the job starts. +Flink does not have any easy way to dynamically update an operator, for example the Kafka Sink, while the job is running. + +If you implement any credential rotation, the new credentials will not be used by the application unless you restart the job. diff --git a/java/FetchSecrets/pom.xml b/java/FetchSecrets/pom.xml new file mode 100644 index 0000000..b7e616b --- /dev/null +++ b/java/FetchSecrets/pom.xml @@ -0,0 +1,180 @@ + + + 4.0.0 + + com.amazonaws + fetch-secrets + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.2.0-1.19 + 1.2.0 + 2.17.2 + + + + + + software.amazon.awssdk + bom + 2.20.162 + pom + import + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + secretsmanager + + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + org.apache.flink + flink-connector-kafka + ${kafka.connector.version} + + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.FetchSecretsJob + + + + + + + + + diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java new file mode 100644 index 0000000..b63d6ff --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java @@ -0,0 +1,158 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.formats.json.JsonSerializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class FetchSecretsJob { + + private static final Logger LOG = LoggerFactory.getLogger(FetchSecretsJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final String DEFAULT_TOPIC = "stock-prices"; + private static final int DEFAULT_RECORDS_PER_SECOND = 10; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(FetchSecretsJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + /** + * Fetch the secret from Secrets Manager. + * In the case of MSK SASL/SCRAM credentials, the secret is a JSON object with the keys `username` and `password`. + * + * @param authProperties The properties containing the `secret.name` + * @return A Tuple2 containing the username and password + */ + private static Tuple2 fetchCredentialsFromSecretsManager(Properties authProperties) { + String secretName = Preconditions.checkNotNull(authProperties.getProperty("secret.name"), "Missing secret name"); + + try (SecretsManagerClient client = SecretsManagerClient.create()) { + GetSecretValueRequest request = GetSecretValueRequest.builder() + .secretId(secretName) + .build(); + + GetSecretValueResponse response = client.getSecretValue(request); + String secretString = response.secretString(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode secretJson = mapper.readTree(secretString); + + String username = secretJson.get("username").asText(); + String password = secretJson.get("password").asText(); + + LOG.info("Successfully fetched secrets - username: {}, password: {}", username, "****"); + return new Tuple2<>(username, password); + + } catch (Exception e) { + throw new RuntimeException("Unable to fetch credentials from Secrets Manager", e); + } + } + + private static DataGeneratorSource createDataGeneratorSource(Properties dataGenProperties) { + int recordsPerSecond = Integer.parseInt(dataGenProperties.getProperty("records.per.second", String.valueOf(DEFAULT_RECORDS_PER_SECOND))); + + return new DataGeneratorSource<>( + new StockPriceGenerator(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class) + ); + } + + private static KafkaSink createKafkaSink(Properties outputProperties, Tuple2 saslScramCredentials) { + Properties kafkaProducerConfig = new Properties(outputProperties); + + // Add to the kafka producer properties the parameters to enable SASL/SCRAM auth + LOG.info("Setting up Kafka SASL/SCRAM authentication"); + kafkaProducerConfig.setProperty("security.protocol", "SASL_SSL"); + kafkaProducerConfig.setProperty("sasl.mechanism", "SCRAM-SHA-512"); + String jassConfig = String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", + saslScramCredentials.f0, + saslScramCredentials.f1); + kafkaProducerConfig.setProperty("sasl.jaas.config", jassConfig); + + String topic = outputProperties.getProperty("topic", DEFAULT_TOPIC); + + KafkaRecordSerializationSchema recordSerializationSchema = + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setKeySerializationSchema(stock -> stock.getSymbol().getBytes()) + .setValueSerializationSchema(new JsonSerializationSchema<>()) + .build(); + + return KafkaSink.builder() + .setBootstrapServers(outputProperties.getProperty("bootstrap.servers")) + .setKafkaProducerConfig(kafkaProducerConfig) + .setRecordSerializer(recordSerializationSchema) + .build(); + } + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties()); + Properties dataGenProperties = applicationProperties.getOrDefault("DataGen", new Properties()); + Properties outputProperties = applicationProperties.get("Output0"); + + // Fetch the credentials from Secrets Manager + // Note that the credentials are fetched only once, when the job start + Tuple2 credentials = fetchCredentialsFromSecretsManager(authProperties); + + // Create the data generator + DataGeneratorSource source = createDataGeneratorSource(dataGenProperties); + DataStream stockPriceStream = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "Stock Price Generator" + ); + + // Create the Kafka Sink, passing the credentials + KafkaSink sink = createKafkaSink(outputProperties, credentials); + stockPriceStream.sinkTo(sink); + + if (isLocal(env)) { + stockPriceStream.print(); + } + + env.execute("Stock Price to Kafka Job"); + } +} diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 0000000..f662b11 --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.msf.domain; + +public class StockPrice { + private String symbol; + private String timestamp; + private double price; + + public StockPrice() {} + + public StockPrice(String symbol, String timestamp, double price) { + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public double getPrice() { + return price; + } + + public void setPrice(double price) { + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "symbol='" + symbol + '\'' + + ", timestamp='" + timestamp + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java new file mode 100644 index 0000000..d76291a --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java @@ -0,0 +1,21 @@ +package com.amazonaws.services.msf.domain; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.time.Instant; +import java.util.Random; + +public class StockPriceGenerator implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "NFLX"}; + private final Random random = new Random(); + + @Override + public StockPrice map(Long value) throws Exception { + String symbol = SYMBOLS[random.nextInt(SYMBOLS.length)]; + double price = 50 + random.nextDouble() * 450; // Price between $50-$500 + String timestamp = Instant.now().toString(); + + return new StockPrice(symbol, timestamp, price); + } +} diff --git a/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..f2e5611 --- /dev/null +++ b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,21 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "Output0", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "stock-prices" + } + }, + { + "PropertyGroupId": "AuthProperties", + "PropertyMap": { + "secret.name": "AmazonMSK_myCredentials" + } + } +] diff --git a/java/FetchSecrets/src/main/resources/log4j2.properties b/java/FetchSecrets/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3546643 --- /dev/null +++ b/java/FetchSecrets/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java b/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java new file mode 100644 index 0000000..c7a77ec --- /dev/null +++ b/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java @@ -0,0 +1,38 @@ +package com.amazonaws.services.msf.domain; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class StockPriceGeneratorTest { + + private static final List EXPECTED_SYMBOLS = Arrays.asList( + "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "NFLX" + ); + + @Test + void testGenerateStockPrice() throws Exception { + StockPriceGenerator generator = new StockPriceGenerator(); + + StockPrice stockPrice = generator.map(1L); + + assertNotNull(stockPrice); + assertNotNull(stockPrice.getSymbol()); + assertNotNull(stockPrice.getTimestamp()); + assertTrue(stockPrice.getPrice() > 0); + + // Verify symbol is one of the expected values + assertTrue(EXPECTED_SYMBOLS.contains(stockPrice.getSymbol())); + + // Verify price is in expected range (50-500) + assertTrue(stockPrice.getPrice() >= 50.0); + assertTrue(stockPrice.getPrice() <= 500.0); + + // Verify timestamp is valid ISO format + assertDoesNotThrow(() -> Instant.parse(stockPrice.getTimestamp())); + } +} diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index b165832..d0935f3 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -139,6 +139,15 @@ Runtime parameters: | `JdbcSink` | `username` | Destination database user | | `JdbcSink` | `password` | Destination database password | + +#### Passing credentials + +For simplicity, this example passes the credentials of the database as runtime parameters. +This is **not best practice** in production workload. + +We recommend to fetch credentials on application start, for example from AWS Secrets Manager, as demonstrated in the +[Fetch Secrets](../../FetchSecrets) example. + ### Known limitations Using the SQL interface of Flink CDC Sources greatly simplifies the implementation of a passthrough application. diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index 8587e60..7137510 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -124,7 +124,8 @@ To run the application in Amazon Managed Service for Apache Flink ensure the app ### Security Considerations For production deployments: -1. Store database credentials in AWS Secrets Manager. +1. Store database credentials in AWS Secrets Manager. The [Fetch Secrets](../FetchSecrets) example shows how you can fetch + a secret on application start. 2. Use VPC endpoints for secure database connectivity. 3. Enable SSL/TLS for database connections. diff --git a/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md b/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md index 60eec0e..a8ea685 100644 --- a/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md +++ b/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md @@ -12,6 +12,9 @@ For simplicity, the application generates random data internally, using the Data set up a KafkaSink with SASL/SCRAM authentication. The configuration of a KafkaSource is identical to the sink, for what regards SASL/SCRAM authentication. +> Note: SASL/SCRAM authentication with MSK can also be implemented fetching the credentials from AWS Secrets Manager on application +start, using AWS SDK, as demonstrated in the [Fetch Secrets](../FetchSecrets) example. +> MSK TLS uses a certificate signed with AWS CA which is recognized by JVM. Passing the truststore is not required. ### High level approach diff --git a/java/KafkaConfigProviders/README.md b/java/KafkaConfigProviders/README.md index db4a2bf..febb9ee 100644 --- a/java/KafkaConfigProviders/README.md +++ b/java/KafkaConfigProviders/README.md @@ -13,3 +13,7 @@ without embedding sensitive information in the application JAR, leveraging [MSK ### SASL Authentication - [**Kafka SASL/SCRAM**](./Kafka-SASL_SSL-ConfigProviders) - Using Config Providers to fetch SASL/SCRAM credentials from AWS Secrets Manager + +Note: SASL/SCRAM authentication with MSK can also be implemented fetching the credentials from AWS Secrets Manager on application +start, using AWS SDK, as demonstrated in the [Fetch Secrets](../FetchSecrets) example. +MSK TLS uses a certificate signed with AWS CA which is recognized by JVM. Passing the truststore is not required. \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 26ca9bc..89e16e0 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -46,5 +46,6 @@ FlinkCDC/FlinkCDCSQLServerSource FlinkDataGenerator JdbcSink + FetchSecrets \ No newline at end of file