From e275d477686ca2758383e86f7568cdd4d186af01 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 20 Aug 2025 12:38:20 +0100 Subject: [PATCH 1/5] New example skeleton --- java/FetchSecrets/README.md | 52 ++++++ java/FetchSecrets/pom.xml | 153 ++++++++++++++++++ .../services/msf/FetchSecretsJob.java | 109 +++++++++++++ .../services/msf/domain/StockPrice.java | 67 ++++++++ .../msf/domain/StockPriceGenerator.java | 24 +++ .../flink-application-properties-dev.json | 15 ++ .../src/main/resources/log4j2.properties | 7 + java/pom.xml | 1 + 8 files changed, 428 insertions(+) create mode 100644 java/FetchSecrets/README.md create mode 100644 java/FetchSecrets/pom.xml create mode 100644 java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java create mode 100644 java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java create mode 100644 java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java create mode 100644 java/FetchSecrets/src/main/resources/flink-application-properties-dev.json create mode 100644 java/FetchSecrets/src/main/resources/log4j2.properties diff --git a/java/FetchSecrets/README.md b/java/FetchSecrets/README.md new file mode 100644 index 00000000..202b1456 --- /dev/null +++ b/java/FetchSecrets/README.md @@ -0,0 +1,52 @@ +# Stock Price Generator to Kafka + +This Flink application generates random stock price data and writes it to a Kafka topic as JSON messages. + +## Features + +- Generates realistic stock price data for popular symbols (AAPL, GOOGL, MSFT, etc.) +- Configurable data generation rate +- Writes JSON-formatted messages to Kafka +- Uses stock symbol as Kafka message key for partitioning +- Supports both local development and Amazon Managed Service for Apache Flink + +## Building + +```bash +mvn clean package +``` + +## Running Locally + +1. Start a local Kafka cluster +2. Create the target topic: + ```bash + kafka-topics --create --topic stock-prices --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 + ``` +3. Run the application: + ```bash + mvn exec:java -Dexec.mainClass="com.amazonaws.services.msf.FetchSecretsJob" + ``` + +## Configuration + +The application uses the following property groups: + +- **DataGen**: Controls data generation rate + - `records.per.second`: Number of records to generate per second (default: 10) + +- **Output0**: Kafka sink configuration + - `bootstrap.servers`: Kafka bootstrap servers + - `topic`: Target Kafka topic (default: "stock-prices") + +- **AuthProperties**: Authentication properties for MSK (when using IAM auth) + +## Sample Output + +```json +{ + "symbol": "AAPL", + "timestamp": "2025-08-20T11:30:00.123Z", + "price": 175.42 +} +``` diff --git a/java/FetchSecrets/pom.xml b/java/FetchSecrets/pom.xml new file mode 100644 index 00000000..afd74b20 --- /dev/null +++ b/java/FetchSecrets/pom.xml @@ -0,0 +1,153 @@ + + + 4.0.0 + + com.amazonaws + fetch-secrets + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.2.0-1.19 + 1.2.0 + 2.17.2 + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + org.apache.flink + flink-connector-kafka + ${kafka.connector.version} + + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.FetchSecretsJob + + + + + + + + + diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java new file mode 100644 index 00000000..d8dd8cf6 --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java @@ -0,0 +1,109 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGenerator; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.formats.json.JsonSerializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class FetchSecretsJob { + + private static final Logger LOG = LoggerFactory.getLogger(FetchSecretsJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final String DEFAULT_TOPIC = "stock-prices"; + private static final int DEFAULT_RECORDS_PER_SECOND = 10; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(FetchSecretsJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + private static DataGeneratorSource createDataGeneratorSource(Properties dataGenProperties) { + int recordsPerSecond = Integer.parseInt(dataGenProperties.getProperty("records.per.second", String.valueOf(DEFAULT_RECORDS_PER_SECOND))); + + return new DataGeneratorSource<>( + new StockPriceGenerator(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class) + ); + } + + private static KafkaSink createKafkaSink(Properties outputProperties) { + String topic = outputProperties.getProperty("topic", DEFAULT_TOPIC); + + KafkaRecordSerializationSchema recordSerializationSchema = + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setKeySerializationSchema(stock -> stock.getSymbol().getBytes()) + .setValueSerializationSchema(new JsonSerializationSchema<>()) + .build(); + + return KafkaSink.builder() + .setBootstrapServers(outputProperties.getProperty("bootstrap.servers")) + .setKafkaProducerConfig(outputProperties) + .setRecordSerializer(recordSerializationSchema) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + } + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + if (isLocal(env)) { + env.enableCheckpointing(10_000); + env.setParallelism(1); + } + + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties()); + Properties dataGenProperties = applicationProperties.getOrDefault("DataGen", new Properties()); + Properties outputProperties = applicationProperties.get("Output0"); + outputProperties.putAll(authProperties); + + DataGeneratorSource source = createDataGeneratorSource(dataGenProperties); + DataStream stockPriceStream = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "Stock Price Generator" + ); + + KafkaSink sink = createKafkaSink(outputProperties); + stockPriceStream.sinkTo(sink); + + if (isLocal(env)) { + stockPriceStream.print(); + } + + env.execute("Stock Price to Kafka Job"); + } +} diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 00000000..3e754f92 --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,67 @@ +package com.amazonaws.services.msf.domain; + +import java.math.BigDecimal; +import java.time.Instant; +import java.util.Objects; + +public class StockPrice { + private String symbol; + private Instant timestamp; + private BigDecimal price; + + public StockPrice() {} + + public StockPrice(String symbol, Instant timestamp, BigDecimal price) { + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public Instant getTimestamp() { + return timestamp; + } + + public void setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + } + + public BigDecimal getPrice() { + return price; + } + + public void setPrice(BigDecimal price) { + this.price = price; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StockPrice that = (StockPrice) o; + return Objects.equals(symbol, that.symbol) && + Objects.equals(timestamp, that.timestamp) && + Objects.equals(price, that.price); + } + + @Override + public int hashCode() { + return Objects.hash(symbol, timestamp, price); + } + + @Override + public String toString() { + return "StockPrice{" + + "symbol='" + symbol + '\'' + + ", timestamp='" + timestamp.toString() + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java new file mode 100644 index 00000000..b9dae09c --- /dev/null +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java @@ -0,0 +1,24 @@ +package com.amazonaws.services.msf.domain; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.util.Random; + +public class StockPriceGenerator implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "NFLX"}; + private final Random random = new Random(); + + @Override + public StockPrice map(Long value) throws Exception { + String symbol = SYMBOLS[random.nextInt(SYMBOLS.length)]; + double price = 50 + random.nextDouble() * 450; // Price between $50-$500 + BigDecimal priceDecimal = BigDecimal.valueOf(price).setScale(2, RoundingMode.HALF_UP); + Instant timestamp = Instant.now(); + + return new StockPrice(symbol, timestamp, priceDecimal); + } +} diff --git a/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 00000000..26884542 --- /dev/null +++ b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,15 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "Output0", + "PropertyMap": { + "bootstrap.servers": "localhost:9092", + "topic": "stock-prices" + } + } +] diff --git a/java/FetchSecrets/src/main/resources/log4j2.properties b/java/FetchSecrets/src/main/resources/log4j2.properties new file mode 100644 index 00000000..35466433 --- /dev/null +++ b/java/FetchSecrets/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/pom.xml b/java/pom.xml index 26ca9bcc..89e16e04 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -46,5 +46,6 @@ FlinkCDC/FlinkCDCSQLServerSource FlinkDataGenerator JdbcSink + FetchSecrets \ No newline at end of file From 6ae15cd62277d36866c8472dc713f480d8572921 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 20 Aug 2025 15:13:37 +0100 Subject: [PATCH 2/5] Working example --- java/FetchSecrets/README.md | 129 +++++++++++++----- java/FetchSecrets/pom.xml | 27 ++++ .../services/msf/FetchSecretsJob.java | 77 +++++++++-- .../services/msf/domain/StockPrice.java | 35 ++--- .../msf/domain/StockPriceGenerator.java | 7 +- .../flink-application-properties-dev.json | 6 + .../msf/domain/StockPriceGeneratorTest.java | 38 ++++++ 7 files changed, 239 insertions(+), 80 deletions(-) create mode 100644 java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java diff --git a/java/FetchSecrets/README.md b/java/FetchSecrets/README.md index 202b1456..409ca03e 100644 --- a/java/FetchSecrets/README.md +++ b/java/FetchSecrets/README.md @@ -1,52 +1,113 @@ -# Stock Price Generator to Kafka +## Fetching Secrets from Secrets Manager -This Flink application generates random stock price data and writes it to a Kafka topic as JSON messages. +This example demonstrates how to fetch secrets from AWS Secrets Manager at application start. -## Features +* Flink version: 1.20 +* Flink API: DataStream +* Language: Java (11) +* Flink connectors: DataGen, Kafka sink -- Generates realistic stock price data for popular symbols (AAPL, GOOGL, MSFT, etc.) -- Configurable data generation rate -- Writes JSON-formatted messages to Kafka -- Uses stock symbol as Kafka message key for partitioning -- Supports both local development and Amazon Managed Service for Apache Flink +This example shows how you can fetch any secrets from AWS Secrets Manager, without passing them as non-encrypted configuration parameters. +In this case, the job is fetching username and password for MSK SASL/SCRAM authentication. +The application generates random stock prices and writes them, as JSON, to a Kafka topic. -## Building +Note that this method works for any secrets represented as text, which are directly passed to the constructor of the operator. +This method does not work for fetching keystore or truststore files. -```bash -mvn clean package -``` +### Prerequisites -## Running Locally +#### MSK -1. Start a local Kafka cluster -2. Create the target topic: - ```bash - kafka-topics --create --topic stock-prices --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 - ``` -3. Run the application: - ```bash - mvn exec:java -Dexec.mainClass="com.amazonaws.services.msf.FetchSecretsJob" - ``` +To run this application on Amazon Managed Service for Apache Flink, you need an Amazon MSK cluster configured for +SASL/SCRAM authentication. See [MSK Documentation](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) +for details on how to set it up. -## Configuration +The cluster must contain a topic named `stock-prices` or allow auto topic creation. -The application uses the following property groups: +If you set up any Kafka ACL, the user must have permissions to write to this topic. -- **DataGen**: Controls data generation rate - - `records.per.second`: Number of records to generate per second (default: 10) +#### Managed Flink Application Service Role -- **Output0**: Kafka sink configuration - - `bootstrap.servers`: Kafka bootstrap servers - - `topic`: Target Kafka topic (default: "stock-prices") +The IAM Service Role attached to the Managed Flink application must have sufficient permissions to fetch the credentials +from Amazon Secrets Manager. See [Amazon Secrets Manager documentation](https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html) +for further details. -- **AuthProperties**: Authentication properties for MSK (when using IAM auth) +MSK SASL/SCRAM credentials must be encrypted with a customer managed key (CMK). The application Service Role must also +provide permissions to use the CMK to decrypt the secret (`kms:Decrypt`). -## Sample Output +Here is an example of an IAM Policy to allow the application to fetch and decrypt the secret: ```json { - "symbol": "AAPL", - "timestamp": "2025-08-20T11:30:00.123Z", - "price": 175.42 + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "AllowFetchSecret", + "Effect": "Allow", + "Action": "secretsmanager:GetSecretValue", + "Resource": "arn:aws:secretsmanager:::secret:-*" + }, + { + "Sid": "AllowDecryptSecret", + "Effect": "Allow", + "Action": "kms:Decrypt", + "Resource": "arn:aws:kms:::key/" + } + ] } ``` + +⚠️ Note that the KMS Key Policy may also restrict access to the CMK. +If you are using a restrictive Key Policy, you also need to allow your Managed Flink application to decrypt. +Add the following snippet to the KMS Key Policy, in addition to other permissions: + +```json +{ + "Sid": "AllowDecrypting", + "Effect": "Allow", + "Principal": { + "Service": "kinesisanalytics.amazonaws.com" + }, + "Action": "kms:Decrypt", + "Resource": "*" +} +``` + +#### Managed Flink Application VPC Networking + +To be able to connect to the MSK cluster, the Managed Flink application must have VPC networking configured, and must +be able to reach the MSK cluster. For the sake of this example, the simplest setup is using the same VPC, Subnets, and Security Group +as the MSK cluster. + +### Runtime Configuration + +When running on Amazon Managed Service for Apache Flink, the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|------------------|----------------------|--------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | +| `Output0` | `bootstrap.servers` | Kafka bootstrap servers | +| `Output0` | `topic` | Target Kafka topic (default: "stock-prices") | +| `AuthProperties` | `secret.name` | AWS Secrets Manager secret name containing username/password credentials | + +The `bootstrap.servers` should be the one for SASL/SCRAM (port 9096). + +### Testing Locally + +The application cannot be run locally, unless you provide networking from your machine to an MSK cluster supporting +SASL/SCRAM authentication, for example via VPN. + +Fetching the secret from Secrets Manager works from your machine, as long as you have an authenticated AWS CLI profile +which allows fetching the secret, and you let your application use the profile using the IDE AWS Plugin. + + +### Known Limitations + +Credentials can be fetched only once, when the job starts. +Flink does not have any easy way to dynamically update an operator, for example the Kafka Sink, while the job is running. + +If you implement any credential rotation, the new credentials will not be used by the application unless you restart the job. diff --git a/java/FetchSecrets/pom.xml b/java/FetchSecrets/pom.xml index afd74b20..b7e616be 100644 --- a/java/FetchSecrets/pom.xml +++ b/java/FetchSecrets/pom.xml @@ -22,6 +22,18 @@ 2.17.2 + + + + software.amazon.awssdk + bom + 2.20.162 + pom + import + + + + @@ -51,6 +63,12 @@ provided + + + software.amazon.awssdk + secretsmanager + + org.apache.flink @@ -88,6 +106,15 @@ log4j-core ${log4j.version} + + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java index d8dd8cf6..b63d6ff5 100644 --- a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/FetchSecretsJob.java @@ -3,10 +3,12 @@ import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import com.amazonaws.services.msf.domain.StockPrice; import com.amazonaws.services.msf.domain.StockPriceGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; -import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -14,8 +16,12 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; import java.io.IOException; import java.util.Map; @@ -45,9 +51,41 @@ private static Map loadApplicationProperties(StreamExecution } } + /** + * Fetch the secret from Secrets Manager. + * In the case of MSK SASL/SCRAM credentials, the secret is a JSON object with the keys `username` and `password`. + * + * @param authProperties The properties containing the `secret.name` + * @return A Tuple2 containing the username and password + */ + private static Tuple2 fetchCredentialsFromSecretsManager(Properties authProperties) { + String secretName = Preconditions.checkNotNull(authProperties.getProperty("secret.name"), "Missing secret name"); + + try (SecretsManagerClient client = SecretsManagerClient.create()) { + GetSecretValueRequest request = GetSecretValueRequest.builder() + .secretId(secretName) + .build(); + + GetSecretValueResponse response = client.getSecretValue(request); + String secretString = response.secretString(); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode secretJson = mapper.readTree(secretString); + + String username = secretJson.get("username").asText(); + String password = secretJson.get("password").asText(); + + LOG.info("Successfully fetched secrets - username: {}, password: {}", username, "****"); + return new Tuple2<>(username, password); + + } catch (Exception e) { + throw new RuntimeException("Unable to fetch credentials from Secrets Manager", e); + } + } + private static DataGeneratorSource createDataGeneratorSource(Properties dataGenProperties) { int recordsPerSecond = Integer.parseInt(dataGenProperties.getProperty("records.per.second", String.valueOf(DEFAULT_RECORDS_PER_SECOND))); - + return new DataGeneratorSource<>( new StockPriceGenerator(), Long.MAX_VALUE, @@ -56,10 +94,22 @@ private static DataGeneratorSource createDataGeneratorSource(Propert ); } - private static KafkaSink createKafkaSink(Properties outputProperties) { + private static KafkaSink createKafkaSink(Properties outputProperties, Tuple2 saslScramCredentials) { + Properties kafkaProducerConfig = new Properties(outputProperties); + + // Add to the kafka producer properties the parameters to enable SASL/SCRAM auth + LOG.info("Setting up Kafka SASL/SCRAM authentication"); + kafkaProducerConfig.setProperty("security.protocol", "SASL_SSL"); + kafkaProducerConfig.setProperty("sasl.mechanism", "SCRAM-SHA-512"); + String jassConfig = String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", + saslScramCredentials.f0, + saslScramCredentials.f1); + kafkaProducerConfig.setProperty("sasl.jaas.config", jassConfig); + String topic = outputProperties.getProperty("topic", DEFAULT_TOPIC); - - KafkaRecordSerializationSchema recordSerializationSchema = + + KafkaRecordSerializationSchema recordSerializationSchema = KafkaRecordSerializationSchema.builder() .setTopic(topic) .setKeySerializationSchema(stock -> stock.getSymbol().getBytes()) @@ -68,28 +118,26 @@ private static KafkaSink createKafkaSink(Properties outputProperties return KafkaSink.builder() .setBootstrapServers(outputProperties.getProperty("bootstrap.servers")) - .setKafkaProducerConfig(outputProperties) + .setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer(recordSerializationSchema) - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (isLocal(env)) { - env.enableCheckpointing(10_000); - env.setParallelism(1); - } - final Map applicationProperties = loadApplicationProperties(env); LOG.info("Application properties: {}", applicationProperties); Properties authProperties = applicationProperties.getOrDefault("AuthProperties", new Properties()); Properties dataGenProperties = applicationProperties.getOrDefault("DataGen", new Properties()); Properties outputProperties = applicationProperties.get("Output0"); - outputProperties.putAll(authProperties); + // Fetch the credentials from Secrets Manager + // Note that the credentials are fetched only once, when the job start + Tuple2 credentials = fetchCredentialsFromSecretsManager(authProperties); + + // Create the data generator DataGeneratorSource source = createDataGeneratorSource(dataGenProperties); DataStream stockPriceStream = env.fromSource( source, @@ -97,7 +145,8 @@ public static void main(String[] args) throws Exception { "Stock Price Generator" ); - KafkaSink sink = createKafkaSink(outputProperties); + // Create the Kafka Sink, passing the credentials + KafkaSink sink = createKafkaSink(outputProperties, credentials); stockPriceStream.sinkTo(sink); if (isLocal(env)) { diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java index 3e754f92..f662b111 100644 --- a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -1,17 +1,13 @@ package com.amazonaws.services.msf.domain; -import java.math.BigDecimal; -import java.time.Instant; -import java.util.Objects; - public class StockPrice { private String symbol; - private Instant timestamp; - private BigDecimal price; + private String timestamp; + private double price; public StockPrice() {} - public StockPrice(String symbol, Instant timestamp, BigDecimal price) { + public StockPrice(String symbol, String timestamp, double price) { this.symbol = symbol; this.timestamp = timestamp; this.price = price; @@ -25,42 +21,27 @@ public void setSymbol(String symbol) { this.symbol = symbol; } - public Instant getTimestamp() { + public String getTimestamp() { return timestamp; } - public void setTimestamp(Instant timestamp) { + public void setTimestamp(String timestamp) { this.timestamp = timestamp; } - public BigDecimal getPrice() { + public double getPrice() { return price; } - public void setPrice(BigDecimal price) { + public void setPrice(double price) { this.price = price; } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - StockPrice that = (StockPrice) o; - return Objects.equals(symbol, that.symbol) && - Objects.equals(timestamp, that.timestamp) && - Objects.equals(price, that.price); - } - - @Override - public int hashCode() { - return Objects.hash(symbol, timestamp, price); - } - @Override public String toString() { return "StockPrice{" + "symbol='" + symbol + '\'' + - ", timestamp='" + timestamp.toString() + '\'' + + ", timestamp='" + timestamp + '\'' + ", price=" + price + '}'; } diff --git a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java index b9dae09c..d76291ad 100644 --- a/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java +++ b/java/FetchSecrets/src/main/java/com/amazonaws/services/msf/domain/StockPriceGenerator.java @@ -2,8 +2,6 @@ import org.apache.flink.connector.datagen.source.GeneratorFunction; -import java.math.BigDecimal; -import java.math.RoundingMode; import java.time.Instant; import java.util.Random; @@ -16,9 +14,8 @@ public class StockPriceGenerator implements GeneratorFunction public StockPrice map(Long value) throws Exception { String symbol = SYMBOLS[random.nextInt(SYMBOLS.length)]; double price = 50 + random.nextDouble() * 450; // Price between $50-$500 - BigDecimal priceDecimal = BigDecimal.valueOf(price).setScale(2, RoundingMode.HALF_UP); - Instant timestamp = Instant.now(); + String timestamp = Instant.now().toString(); - return new StockPrice(symbol, timestamp, priceDecimal); + return new StockPrice(symbol, timestamp, price); } } diff --git a/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json index 26884542..f2e5611a 100644 --- a/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json +++ b/java/FetchSecrets/src/main/resources/flink-application-properties-dev.json @@ -11,5 +11,11 @@ "bootstrap.servers": "localhost:9092", "topic": "stock-prices" } + }, + { + "PropertyGroupId": "AuthProperties", + "PropertyMap": { + "secret.name": "AmazonMSK_myCredentials" + } } ] diff --git a/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java b/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java new file mode 100644 index 00000000..c7a77ec8 --- /dev/null +++ b/java/FetchSecrets/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorTest.java @@ -0,0 +1,38 @@ +package com.amazonaws.services.msf.domain; + +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class StockPriceGeneratorTest { + + private static final List EXPECTED_SYMBOLS = Arrays.asList( + "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "NFLX" + ); + + @Test + void testGenerateStockPrice() throws Exception { + StockPriceGenerator generator = new StockPriceGenerator(); + + StockPrice stockPrice = generator.map(1L); + + assertNotNull(stockPrice); + assertNotNull(stockPrice.getSymbol()); + assertNotNull(stockPrice.getTimestamp()); + assertTrue(stockPrice.getPrice() > 0); + + // Verify symbol is one of the expected values + assertTrue(EXPECTED_SYMBOLS.contains(stockPrice.getSymbol())); + + // Verify price is in expected range (50-500) + assertTrue(stockPrice.getPrice() >= 50.0); + assertTrue(stockPrice.getPrice() <= 500.0); + + // Verify timestamp is valid ISO format + assertDoesNotThrow(() -> Instant.parse(stockPrice.getTimestamp())); + } +} From cdee6e70096f841e98b9d986fcfc708aa326c1a3 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 20 Aug 2025 15:29:55 +0100 Subject: [PATCH 3/5] Add entry to top-level README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bbaa3b8b..3ad088f8 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,9 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f - [**Serialization**](./java/Serialization) - Serialization of record and state - [**Windowing**](./java/Windowing) - Time-based window aggregation examples - [**Side Outputs**](./java/SideOutputs) - Using side outputs for data routing and filtering -- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls\ +- [**Async I/O**](./java/AsyncIO) - Asynchronous I/O patterns with retries for external API calls - [**Custom Metrics**](./java/CustomMetrics) - Creating and publishing custom application metrics +- [**Fetching credentials from Secrets Manager**](./java/FetchSecrets) - Dynamically fetching credentials from AWS Secrets Manager #### Utilities - [**Fink Data Generator (JSON)**](java/FlinkDataGenerator) - How to use a Flink application as data generator, for functional and load testing. From 752d50955e516c34571994b7aee851850a9a2d74 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 20 Aug 2025 15:37:13 +0100 Subject: [PATCH 4/5] Amend the READMEs of example which are passing credentials as parameters --- java/FlinkCDC/FlinkCDCSQLServerSource/README.md | 9 +++++++++ java/JdbcSink/README.md | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md index b1658320..d0935f3a 100644 --- a/java/FlinkCDC/FlinkCDCSQLServerSource/README.md +++ b/java/FlinkCDC/FlinkCDCSQLServerSource/README.md @@ -139,6 +139,15 @@ Runtime parameters: | `JdbcSink` | `username` | Destination database user | | `JdbcSink` | `password` | Destination database password | + +#### Passing credentials + +For simplicity, this example passes the credentials of the database as runtime parameters. +This is **not best practice** in production workload. + +We recommend to fetch credentials on application start, for example from AWS Secrets Manager, as demonstrated in the +[Fetch Secrets](../../FetchSecrets) example. + ### Known limitations Using the SQL interface of Flink CDC Sources greatly simplifies the implementation of a passthrough application. diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index 8587e60e..71375105 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -124,7 +124,8 @@ To run the application in Amazon Managed Service for Apache Flink ensure the app ### Security Considerations For production deployments: -1. Store database credentials in AWS Secrets Manager. +1. Store database credentials in AWS Secrets Manager. The [Fetch Secrets](../FetchSecrets) example shows how you can fetch + a secret on application start. 2. Use VPC endpoints for secure database connectivity. 3. Enable SSL/TLS for database connections. From 464af68ce2cd5eb3eb24d99bf5c5659a6a3c27cd Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Wed, 20 Aug 2025 15:45:51 +0100 Subject: [PATCH 5/5] Amend other READMEs showing MSK SASL/SCRAM --- .../Kafka-SASL_SSL-ConfigProviders/README.md | 3 +++ java/KafkaConfigProviders/README.md | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md b/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md index 60eec0e0..a8ea6851 100644 --- a/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md +++ b/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md @@ -12,6 +12,9 @@ For simplicity, the application generates random data internally, using the Data set up a KafkaSink with SASL/SCRAM authentication. The configuration of a KafkaSource is identical to the sink, for what regards SASL/SCRAM authentication. +> Note: SASL/SCRAM authentication with MSK can also be implemented fetching the credentials from AWS Secrets Manager on application +start, using AWS SDK, as demonstrated in the [Fetch Secrets](../FetchSecrets) example. +> MSK TLS uses a certificate signed with AWS CA which is recognized by JVM. Passing the truststore is not required. ### High level approach diff --git a/java/KafkaConfigProviders/README.md b/java/KafkaConfigProviders/README.md index db4a2bfd..febb9eea 100644 --- a/java/KafkaConfigProviders/README.md +++ b/java/KafkaConfigProviders/README.md @@ -13,3 +13,7 @@ without embedding sensitive information in the application JAR, leveraging [MSK ### SASL Authentication - [**Kafka SASL/SCRAM**](./Kafka-SASL_SSL-ConfigProviders) - Using Config Providers to fetch SASL/SCRAM credentials from AWS Secrets Manager + +Note: SASL/SCRAM authentication with MSK can also be implemented fetching the credentials from AWS Secrets Manager on application +start, using AWS SDK, as demonstrated in the [Fetch Secrets](../FetchSecrets) example. +MSK TLS uses a certificate signed with AWS CA which is recognized by JVM. Passing the truststore is not required. \ No newline at end of file