diff --git a/README.md b/README.md index 39ad86a..bbaa3b8 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f - [**SQS Sink**](./java/SQSSink) - Writing data to Amazon SQS - [**Prometheus Sink**](./java/PrometheusSink) - Sending metrics to Prometheus - [**Flink CDC**](./java/FlinkCDC) - Change Data Capture examples using Flink CDC +- [**JdbcSink**](./java/JdbcSink) - Writes to a relational database executing upsert statements #### Reading and writing files and transactional data lake formats - [**Iceberg**](./java/Iceberg) - Working with Apache Iceberg and Amazon S3 Tables diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md new file mode 100644 index 0000000..8587e60 --- /dev/null +++ b/java/JdbcSink/README.md @@ -0,0 +1,180 @@ +## Flink JDBC Sink + +This example demonstrates how to use the DataStream API JdbcSink to write to a relational database. + +* Flink version: 1.20 +* Flink API: DataStream +* Language: Java (11) +* Flink connectors: JDBC sink, DataGen + +This example demonstrates how to do UPSERT into a relational database. +The example uses the UPSERT syntax of PostgreSQL, but it can be easily adapted to the syntaxes of other databases or into +an append-only sink, with an INSERT INTO statement. + +#### Which JdbcSink? + +At the moment of publishing this example (August 2025) there are two different DataStream API JdbcSink implementations, +available with the version `3.3.0-1.20` of the JDBC connector. + +1. The new `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink` which uses the Sink API V2 and + is initialized using a builder: `JdbcSink.builder()..build()` +2. The legacy `org.apache.flink.connector.jdbc.JdbcSink` which uses the legacy `SinkFunction` API, now deprecated. + The legacy sink is initialized with the syntax `JdbcSink.sink(...)` + +This example uses the new sink. + +At the moment of publishing this example (August 2025) the Apache Flink documentation +[still refers to the deprecated sink](https://nightlies.apache.org/flink/flink-docs-lts/docs/connectors/datastream/jdbc/#jdbcsinksink). + +### Data + +The application generates comprehensive `StockPrice` objects with realistic fake data: + +```json +{ + "symbol": "AAPL", + "timestamp": "2025-08-07T10:30:45", + "price": 150.25 +} +``` + +This data is written using upsert in the following database table, containing the latest price for every symbol. + +The sink uses the PostgreSQL upsert syntax: + +``` +INSERT INTO prices (symbol, price, timestamp) VALUES (?, ?, ?) + ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ? +``` + +This is specific to PostgreSQL, but the code can be adjusted to other databases as long as the SQL syntax supports doing +an upsert with a single SQL statement. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|------------|----------------------|-------------------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | +| `JdbcSink` | `url` | PostgreSQL JDBC URL. e.g. `jdbc:postgresql://your-rds-endpoint:5432/your-database`. Note: the URL includes the database name. | +| `JdbcSink` | `table.name` | Destination table. e.g. `prices` (default: "prices") | +| `JdbcSink` | `username` | Database user with INSERT and UPDATE permissions | +| `JdbcSink` | `password` | Database password | +| `JdbcSink` | `batch.size` | Number of records to batch before executing the SQL statement (default: 100) | +| `JdbcSink` | `batch.interval.ms` | Maximum time in milliseconds to wait before executing a batch (default: 200) | +| `JdbcSink` | `max.retries` | Maximum number of retries for failed database operations (default: 5) | + + +### Database prerequisites + +When running on Amazon Managed Service for Apache Flink with databases on AWS, you need to set up the database manually, +ensuring you set up all the following: + +> You can find the SQL script that sets up the dockerized database by checking out the init script for +> [PostgreSQL](docker/postgres-init/init.sql). + +1. **PostgreSQL Database** + 1. The database name must match the `url` configured in the JDBC sink + 2. The destination table must have the following schema: + ```sql + CREATE TABLE prices ( + symbol VARCHAR(10) PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + price DECIMAL(10,2) NOT NULL + ); + ``` + 3. The database user must have SELECT, INSERT, and UPDATE permissions on the prices table + + +### Testing with local database using Docker Compose + +This example can be run locally using Docker. + +A [Docker Compose file](./docker/docker-compose.yml) is provided to run a local PostgreSQL database. +The local database is initialized by creating the database, user, and prices table with sample data. + +You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij). + +To start the local database run `docker compose up -d` in the `./docker` folder. + +Use `docker compose down -v` to shut it down, also removing the data volumes. + + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Run the database locally using Docker Compose, as described [above](#testing-with-local-database-using-docker-compose). + +See [Running examples locally](../running-examples-locally.md) for details about running the application in the IDE. + + +### Running on Amazon Managed Service for Apache Flink + +To run the application in Amazon Managed Service for Apache Flink ensure the application configuration has the following: +* VPC networking +* The selected Subnets can route traffic to the PostgreSQL database +* The Security Group allows traffic from the application to the database + + +### Security Considerations + +For production deployments: +1. Store database credentials in AWS Secrets Manager. +2. Use VPC endpoints for secure database connectivity. +3. Enable SSL/TLS for database connections. + +> ⚠️ **Password rotation**: if the password of your database is rotated, the JdbcSink fails, causing the job to restart. +> If you fetch the password dynamically on application start (when you create the JdbcSink object) the job will be able +> to restart with the new password. Fetching the password on start is not shown in this example. + +### Implementation considerations + +#### At-least-once or exactly-once + +This implementation leverages the at-least-once mode of the JdbcSink. This is normally sufficient when the sink is +executing a single idempotent statement such as an UPSERT: any duplicate will just overwrite the same record. + +The JdbcSink also supports exactly-once mode which leverages XA transactions synchronized with Flink checkpoints, +and relies on XADataSource. This prevents duplicate writes in case of failure and restart from checkpoint. Note that it +does not prevent duplicates if you restart the application from an older Snapshot (Flink Savepoint), unless your SQL statement +implements some form of idempotency. + +#### No connection pooler? + +The JdbcSink does not support using any database connection pooler, such as HikariCP. + +The reason is that no connection pooling is required. The sink will open one database connection per parallelism (one per subtask), +and reuse these connections unless they get closed. + +#### Batching + +The JdbcSink batches writes to reduce the number of requests to the database. +The batch size and interval used in this example are for demonstrational purposes only. + +You should test your actual application with a realistic throughput and realistic data to optimize these values for your +workload. + + +#### Which flink-connector-jdbc-* dependency? + +To use JdbcSink in DataStream API, you need `flink-connector-jdbc-core` and the JDBC driver of the specific database. For example: +``` + + org.apache.flink + flink-connector-jdbc-core + 3.3.0-1.20 + + + + org.postgresql + postgresql + 42.7.2 + +``` + +Including `flink-connector-jdbc` would bring in unnecessary dependencies and increase the size of the uber-jar file. diff --git a/java/JdbcSink/docker/docker-compose.yml b/java/JdbcSink/docker/docker-compose.yml new file mode 100644 index 0000000..36d073e --- /dev/null +++ b/java/JdbcSink/docker/docker-compose.yml @@ -0,0 +1,25 @@ +services: + # PostgreSQL database + postgres: + image: postgres:15 + container_name: postgres + restart: always + environment: + POSTGRES_DB: testdb + POSTGRES_USER: flinkuser + POSTGRES_PASSWORD: flinkpassword + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./postgres-init:/docker-entrypoint-initdb.d + healthcheck: + test: ["CMD-SHELL", "pg_isready -U flinkuser -d testdb"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + +volumes: + postgres_data: + driver: local diff --git a/java/JdbcSink/docker/postgres-init/init.sql b/java/JdbcSink/docker/postgres-init/init.sql new file mode 100644 index 0000000..d62972e --- /dev/null +++ b/java/JdbcSink/docker/postgres-init/init.sql @@ -0,0 +1,26 @@ +-- Create prices table for JDBC sink +CREATE TABLE prices ( + symbol VARCHAR(10) PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + price DECIMAL(10,2) NOT NULL +); + +-- Insert some sample data for testing +INSERT INTO prices (symbol, timestamp, price) VALUES +('AAPL', NOW(), 150.25); + +-- Display table structure +\d prices; + +-- Display sample data +SELECT * FROM prices; + +-- Show table statistics +SELECT + schemaname, + tablename, + attname as column_name, + n_distinct, + correlation +FROM pg_stats +WHERE tablename = 'prices'; diff --git a/java/JdbcSink/pom.xml b/java/JdbcSink/pom.xml new file mode 100644 index 0000000..640014e --- /dev/null +++ b/java/JdbcSink/pom.xml @@ -0,0 +1,188 @@ + + + 4.0.0 + + com.amazonaws + flink-jdbc-sink + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.3.0-1.20 + 42.7.2 + 1.2.0 + 2.23.1 + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + + org.apache.flink + flink-connector-jdbc-core + ${flink.jdbc.connector.version} + + + + + org.postgresql + postgresql + ${postgresql.jdbc.driver.version} + + + + + com.github.javafaker + javafaker + 1.0.2 + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + junit + junit + 4.13.2 + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.JdbcSinkJob + + + + + + + + + diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java new file mode 100644 index 0000000..92a0a11 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java @@ -0,0 +1,109 @@ +package com.amazonaws.services.msf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.function.Function; + +public class ConfigurationHelper { + private static final Logger LOG = LoggerFactory.getLogger(ConfigurationHelper.class); + + /** + * Generic method to extract and parse numeric parameters from Properties + * All parameters are validated to be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @param parser Function to parse the string value to the desired numeric type + * @param The numeric type (Integer, Long, etc.) + * @return The extracted and validated value or the default value + */ + private static T extractNumericParameter( + Properties properties, + String parameterName, + T defaultValue, + Function parser) { + + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + return defaultValue; + } + + try { + T value = parser.apply(parameterValue.trim()); + + if (value.doubleValue() <= 0) { + throw new IllegalArgumentException( + String.format("Parameter %s with value %s must be positive", parameterName, value)); + } + + return value; + } catch (IllegalArgumentException e) { + LOG.error("Invalid {} value: '{}'. Must be a valid positive {} value. Using default: {}", + parameterName, parameterValue, defaultValue.getClass().getSimpleName().toLowerCase(), defaultValue); + return defaultValue; + } + } + + /** + * Extract a required string parameter from Properties + * Throws IllegalArgumentException if the parameter is missing or empty + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param errorMessage The error message to use if parameter is missing + * @return The extracted string value (never null or empty) + * @throws IllegalArgumentException if the parameter is missing or empty + */ + static String extractRequiredStringParameter(Properties properties, String parameterName, String errorMessage) { + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + throw new IllegalArgumentException(errorMessage); + } + return parameterValue.trim(); + } + + /** + * Extract an optional string parameter from Properties with default fallback + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or empty + * @return The extracted string value or the default value + */ + static String extractStringParameter(Properties properties, String parameterName, String defaultValue) { + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + return defaultValue; + } + return parameterValue.trim(); + } + + /** + * Extract an integer parameter from Properties with validation and default fallback + * The parameter must be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @return The extracted integer value or the default value + */ + static int extractIntParameter(Properties properties, String parameterName, int defaultValue) { + return extractNumericParameter(properties, parameterName, defaultValue, Integer::parseInt); + } + + /** + * Extract a long parameter from Properties with validation and default fallback + * The parameter must be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @return The extracted long value or the default value + */ + static long extractLongParameter(Properties properties, String parameterName, long defaultValue) { + return extractNumericParameter(properties, parameterName, defaultValue, Long::parseLong); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java new file mode 100644 index 0000000..d90b09f --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java @@ -0,0 +1,173 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; +import com.amazonaws.services.msf.jdbc.StockPriceUpsertQueryStatement; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import static com.amazonaws.services.msf.ConfigurationHelper.*; + +/** + * A Flink application that generates random stock price data using DataGeneratorSource + * and writes it to a PostgreSQL database using the JDBC connector. + */ +public class JdbcSinkJob { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + // Default values for configuration + private static final int DEFAULT_RECORDS_PER_SECOND = 10; + private static final int DEFAULT_BATCH_SIZE = 100; + private static final long DEFAULT_BATCH_INTERVAL_MS = 200L; + private static final int DEFAULT_MAX_RETRIES = 5; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + JdbcSinkJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + /** + * Create a DataGeneratorSource with configurable rate from DataGen properties + */ + private static DataGeneratorSource createDataGeneratorSource( + Properties dataGenProperties, + GeneratorFunction generatorFunction, + TypeInformation typeInformation) { + + Preconditions.checkNotNull(dataGenProperties, "DataGen configuration group missing"); + + int recordsPerSecond = extractIntParameter(dataGenProperties, "records.per.second", DEFAULT_RECORDS_PER_SECOND); + + + return new DataGeneratorSource( + generatorFunction, + Long.MAX_VALUE, // Generate (practically) unlimited records + RateLimiterStrategy.perSecond(recordsPerSecond), // Configurable rate + typeInformation // Explicit type information + ); + } + + + /** + * Create the JDBC Sink + */ + private static JdbcSink createUpsertJdbcSink(Properties sinkProperties) { + Preconditions.checkNotNull(sinkProperties, "JdbcSink configuration group missing"); + + // This example is designed for PostgreSQL. Switching to a different RDBMS requires modifying + // StockPriceUpsertQueryStatement implementation which depends on the upsert syntax of the specific RDBMS. + String jdbcDriver = "org.postgresql.Driver"; + + String jdbcUrl = extractRequiredStringParameter(sinkProperties, "url", "JDBC URL is required"); + String dbUser = extractRequiredStringParameter(sinkProperties, "username", "JDBC username is required"); + // In the real application the password should have been encrypted or fetched at runtime + String dbPassword = extractRequiredStringParameter(sinkProperties, "password", "JDBC password is required"); + + String tableName = extractStringParameter(sinkProperties, "table.name", "prices"); + + int batchSize = extractIntParameter(sinkProperties, "batch.size", DEFAULT_BATCH_SIZE); + long batchIntervalMs = extractLongParameter(sinkProperties, "batch.interval.ms", DEFAULT_BATCH_INTERVAL_MS); + int maxRetries = extractIntParameter(sinkProperties, "max.retries", DEFAULT_MAX_RETRIES); + + LOG.info("JDBC Sink configuration - batchSize: {}, batchIntervalMs: {}, maxRetries: {}", + batchSize, batchIntervalMs, maxRetries); + + return JdbcSink.builder() + // The JdbcQueryStatement implementation provides the SQL statement template and converts the input record + // into parameters passed to the statement. + .withQueryStatement(new StockPriceUpsertQueryStatement(tableName)) + .withExecutionOptions(JdbcExecutionOptions.builder() + .withBatchSize(batchSize) + .withBatchIntervalMs(batchIntervalMs) + .withMaxRetries(maxRetries) + .build()) + // The SimpleJdbcConnectionProvider is good enough in this case. The connector will open one db connection per parallelism + // and reuse the same connection on every write. There is no need of a connection pooler + .buildAtLeastOnce(new SimpleJdbcConnectionProvider(new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(jdbcUrl) + .withDriverName(jdbcDriver) + .withUsername(dbUser) + .withPassword(dbPassword) + .build()) + ); + } + + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + + // Load application properties + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + // Create a DataGeneratorSource that generates StockPrice objects + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataGeneratorSource source = createDataGeneratorSource( + dataGenProperties, + new StockPriceGeneratorFunction(), + TypeInformation.of(StockPrice.class) + ); + + // Create the data stream from the source + DataStream stockPriceStream = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "Stock Price Data Generator" + ).uid("stock-price-data-generator"); + + // Create the JDBC sink + Properties sinkProperties = applicationProperties.get("JdbcSink"); + JdbcSink jdbcSink = createUpsertJdbcSink(sinkProperties); + + // Attach the sink + stockPriceStream.sinkTo(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); + + // Add print sink for local testing + if (isLocal(env)) { + stockPriceStream.print().uid("print-sink").name("Print Sink"); + LOG.info("Print sink configured for local testing"); + } + + LOG.info("JDBC sink configured"); + + // Execute the job + env.execute("Flink JDBC Sink Job - Stock Prices"); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 0000000..5c0bc8e --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,69 @@ +package com.amazonaws.services.msf.domain; + +import java.math.BigDecimal; +import java.time.Instant; +import java.util.Objects; + +public class StockPrice { + private String symbol; + + private Instant timestamp; + + private BigDecimal price; + + public StockPrice() {} + + public StockPrice(String symbol, Instant timestamp, BigDecimal price) { + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public Instant getTimestamp() { + return timestamp; + } + + public void setTimestamp(Instant timestamp) { + this.timestamp = timestamp; + } + + public BigDecimal getPrice() { + return price; + } + + public void setPrice(BigDecimal price) { + this.price = price; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StockPrice that = (StockPrice) o; + return Objects.equals(symbol, that.symbol) && + Objects.equals(timestamp, that.timestamp) && + Objects.equals(price, that.price); + } + + @Override + public int hashCode() { + return Objects.hash(symbol, timestamp, price); + } + + @Override + public String toString() { + return "StockPrice{" + + "symbol='" + symbol + '\'' + + ", timestamp='" + timestamp.toString() + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..b4aceb5 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java @@ -0,0 +1,45 @@ +package com.amazonaws.services.msf.domain; + +import com.github.javafaker.Faker; +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; + +/** + * Generator function that creates realistic fake StockPrice objects using JavaFaker. + * Implements GeneratorFunction to work with DataGeneratorSource. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + // JavaFaker instance for generating fake data + private static final Faker faker = new Faker(Locale.ENGLISH); + + + @Override + public StockPrice map(Long value) throws Exception { + + // Use JavaFaker's Stock class to generate realistic NASDAQ ticker symbolsy + String symbol = faker.stock().nsdqSymbol(); + + // Generate realistic stock price between $1.00 and $500.00 + // Using faker to generate a base price and then applying some randomness + double basePrice = faker.number().randomDouble(2, 1, 500); + + // Add some volatility to make prices more realistic + // Apply a small random change (-5% to +5%) + double volatilityPercent = faker.number().randomDouble(4, -5, 5); + double finalPrice = basePrice * (1 + volatilityPercent / 100.0); + + // Ensure price is positive and round to 2 decimal places + finalPrice = Math.max(0.01, finalPrice); + BigDecimal price = BigDecimal.valueOf(finalPrice).setScale(2, RoundingMode.HALF_UP); + + Instant timestamp = Instant.now(); + return new StockPrice(symbol, timestamp, price); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java new file mode 100644 index 0000000..7b5483d --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java @@ -0,0 +1,91 @@ +package com.amazonaws.services.msf.jdbc; + +import com.amazonaws.services.msf.domain.StockPrice; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; + +/** + * Query statement for an upsert of a StockPrice, leveraging the PostgreSQL upsert syntax INSERT INTO...ON CONFLICT...DO UPDATE... + *

+ * You can adapt this class to the upsert syntaxes of other databases, such as INSERT INTO...ON DUPLICATE KEY UPDATE... for + * MySQL, or MERGE INTO... for SQL Server. + *

+ * This class wraps both the parametrized SQL statement to be executed and replacing the parameters in the prepared statement. + *

+ * The table name can be decided when the sink is instantiated. + */ +public class StockPriceUpsertQueryStatement implements JdbcQueryStatement { + + /** + * Template for the SQL statement executing the upsert. Depends on the specific RDBMS syntax. + * The name of the table is parametric (`%s`) + */ + private static final String UPSERT_QUERY_TEMPLATE = + "INSERT INTO %s (symbol, price, timestamp) VALUES (?, ?, ?) " + + "ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ?"; + + private final String sql; + private final JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { + + /** + * Replace the positional parameters in the prepared statement. + * The implementation of this method depends on the SQL statement which, in turn, is specific of the RDBMS. + * + * @param preparedStatement the prepared statement + * @param stockPrice the StockPrice to upsert + * @throws SQLException exception thrown replacing parameters + */ + @Override + public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { + String symbol = stockPrice.getSymbol(); + Timestamp timestamp = Timestamp.from(stockPrice.getTimestamp()); + BigDecimal price = stockPrice.getPrice(); + + // Replace the parameters positionally (note that some parameters are repeated in the SQL statement) + preparedStatement.setString(1, symbol); + preparedStatement.setBigDecimal(2, price); + preparedStatement.setTimestamp(3, timestamp); + preparedStatement.setBigDecimal(4, price); + preparedStatement.setTimestamp(5, timestamp); + } + }; + + /** + * Create an UPSERT stock price query statement for a given table name. + * Note that, while the values are passed at runtime, the table name must be defined when the sink is instantiated, + * on start. + * + * @param tableName name of the table + */ + public StockPriceUpsertQueryStatement(String tableName) { + this.sql = String.format(UPSERT_QUERY_TEMPLATE, tableName); + } + + /** + * Returns the SQL of the PreparedStatement + * + * @return SQL + */ + @Override + public String query() { + return sql; + } + + /** + * Called by the sink for every record to be upserted. + * The PreparedStatement is mutated, replacing the parameters extracted from the record + * + * @param preparedStatement prepared statement + * @param stockPrice record + * @throws SQLException any exception thrown during parameter replacement + */ + @Override + public void statement(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { + statementBuilder.accept(preparedStatement, stockPrice); + } +} diff --git a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..23d4ce4 --- /dev/null +++ b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "JdbcSink", + "PropertyMap": { + "url": "jdbc:postgresql://localhost:5432/testdb", + "username": "flinkuser", + "password": "flinkpassword", + "table.name": "prices", + "batch.size": "100", + "batch.interval.ms": "200", + "max.retries": "5" + } + } +] diff --git a/java/JdbcSink/src/main/resources/log4j2.properties b/java/JdbcSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..e09c2b4 --- /dev/null +++ b/java/JdbcSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = Console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/pom.xml b/java/pom.xml index 0cba628..26ca9bc 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -45,5 +45,6 @@ S3AvroSource FlinkCDC/FlinkCDCSQLServerSource FlinkDataGenerator + JdbcSink \ No newline at end of file