From dc4ac4a24d010633804dffebdac89fad5563433f Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Tue, 15 Jul 2025 18:44:08 +0200 Subject: [PATCH 1/7] Jdbc Sinl example WIP --- java/JdbcSink/README.md | 299 ++++++++++++++++++ java/JdbcSink/docker/docker-compose.yml | 25 ++ java/JdbcSink/docker/postgres-init/init.sql | 43 +++ java/JdbcSink/monitor.sh | 138 ++++++++ java/JdbcSink/pom.xml | 199 ++++++++++++ .../amazonaws/services/msf/JdbcSinkJob.java | 245 ++++++++++++++ .../amazonaws/services/msf/domain/User.java | 196 ++++++++++++ .../msf/domain/UserGeneratorFunction.java | 65 ++++ .../flink-application-properties-dev.json | 17 + .../src/main/resources/log4j2.properties | 7 + java/JdbcSink/test-local.sh | 81 +++++ java/pom.xml | 1 + 12 files changed, 1316 insertions(+) create mode 100644 java/JdbcSink/README.md create mode 100644 java/JdbcSink/docker/docker-compose.yml create mode 100644 java/JdbcSink/docker/postgres-init/init.sql create mode 100755 java/JdbcSink/monitor.sh create mode 100644 java/JdbcSink/pom.xml create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java create mode 100644 java/JdbcSink/src/main/resources/flink-application-properties-dev.json create mode 100644 java/JdbcSink/src/main/resources/log4j2.properties create mode 100755 java/JdbcSink/test-local.sh diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md new file mode 100644 index 0000000..4c3cd88 --- /dev/null +++ b/java/JdbcSink/README.md @@ -0,0 +1,299 @@ +# Flink JDBC Sink Example with JavaFaker + +This example demonstrates how to use Apache Flink's DataStream API to generate realistic fake user data using JavaFaker and write it to a PostgreSQL database using the JDBC connector. + +## Overview + +The application: +1. Generates realistic fake user data using JavaFaker library +2. Uses the DataGen connector for controlled data generation rates +3. Writes the data to a PostgreSQL database using the JDBC connector +4. Supports configurable data generation rates +5. Includes proper error handling and retry mechanisms + +## Architecture + +``` +DataGeneratorSource -> JavaFaker -> DataStream -> JDBC Sink -> PostgreSQL +``` + +## Generated Data Schema + +The application generates comprehensive `User` objects with realistic fake data: + +```json +{ + "user_id": 1, + "first_name": "John", + "last_name": "Smith", + "email": "john.smith@example.com", + "phone_number": "+1-555-123-4567", + "address": "123 Main Street", + "city": "New York", + "country": "United States", + "job_title": "Software Engineer", + "company": "Tech Corp", + "date_of_birth": "1990-05-15", + "created_at": "2024-07-15T10:30:45" +} +``` + +## JavaFaker Integration + +The application uses [JavaFaker](https://github.com/DiUS/java-faker) to generate realistic fake data: + +- **Personal Information**: Names, emails, phone numbers, addresses +- **Location Data**: Cities, countries with realistic combinations +- **Professional Data**: Job titles, company names +- **Demographics**: Date of birth (ensuring users are 18+ years old) +- **Timestamps**: ISO formatted creation timestamps + +## Database Schema + +The PostgreSQL table structure accommodates comprehensive user information: + +```sql +CREATE TABLE users ( + user_id INT PRIMARY KEY, + first_name VARCHAR(50) NOT NULL, + last_name VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + phone_number VARCHAR(20), + address VARCHAR(200), + city VARCHAR(50), + country VARCHAR(50), + job_title VARCHAR(100), + company VARCHAR(100), + date_of_birth DATE, + created_at TIMESTAMP NOT NULL +); +``` + +### Indexes + +The table includes optimized indexes for common query patterns: +- Email (unique constraint + index) +- Creation timestamp +- Name combinations +- Location (city, country) +- Company and job title + +## Configuration + +The application uses two property groups: + +### DataGen Properties +- `records.per.second`: Number of records to generate per second (default: 10) + +### JdbcSink Properties +- `url`: JDBC connection URL +- `username`: Database username +- `password`: Database password +- `table.name`: Target table name (default: "users") + +## Local Development + +### Prerequisites +- Java 11 or higher +- Maven 3.6 or higher +- Docker and Docker Compose + +### Running Locally + +1. **Start PostgreSQL database:** + ```bash + cd docker + docker-compose up -d + ``` + +2. **Verify database is running:** + ```bash + docker-compose logs postgres + ``` + +3. **Connect to database (optional):** + ```bash + docker exec -it postgres_jdbc_sink psql -U flinkuser -d testdb + ``` + +4. **Build the application:** + ```bash + mvn clean package + ``` + +5. **Run the application:** + ```bash + mvn exec:java -Dexec.mainClass="com.amazonaws.services.msf.JdbcSinkJob" + ``` + +### Quick Start + +Use the automated setup script: +```bash +./test-local.sh +``` + +### Configuration for Local Development + +The local configuration is defined in `src/main/resources/flink-application-properties-dev.json`: + +```json +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "JdbcSink", + "PropertyMap": { + "url": "jdbc:postgresql://localhost:5432/testdb", + "username": "flinkuser", + "password": "flinkpassword", + "table.name": "users" + } + } +] +``` + +### Monitoring + +Use the enhanced monitoring dashboard: +```bash +./monitor.sh +``` + +The monitoring script provides insights into: +- **Record counts and generation rates** +- **Geographic distribution** (countries, cities) +- **Professional data** (companies, job titles) +- **Demographics** (age distribution) +- **Data quality metrics** +- **Email domain analysis** + +Sample monitoring output: +```sql +-- View recent realistic records +SELECT user_id, first_name, last_name, city, country, job_title, company +FROM users ORDER BY created_at DESC LIMIT 5; + +-- Analyze geographic distribution +SELECT country, COUNT(*) as count +FROM users GROUP BY country ORDER BY count DESC LIMIT 10; + +-- Check age distribution +SELECT + CASE + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 18 AND 25 THEN '18-25' + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 26 AND 35 THEN '26-35' + -- ... more age groups + END as age_group, + COUNT(*) as count +FROM users GROUP BY age_group; +``` + +## Deployment to Amazon Managed Service for Apache Flink + +### Application Properties + +Configure the following application properties in your Managed Flink application: + +```json +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "100" + } + }, + { + "PropertyGroupId": "JdbcSink", + "PropertyMap": { + "url": "jdbc:postgresql://your-rds-endpoint:5432/your-database", + "username": "your-username", + "password": "your-password", + "table.name": "users" + } + } +] +``` + +### Security Considerations + +For production deployments: +1. Store database credentials in AWS Secrets Manager +2. Use VPC endpoints for secure database connectivity +3. Enable SSL/TLS for database connections +4. Configure appropriate IAM roles and policies +5. Consider data privacy implications of generated fake data + +### Performance Tuning + +The JDBC sink is configured with: +- Batch size: 1000 records +- Batch interval: 200ms +- Max retries: 5 + +JavaFaker performance considerations: +- Faker instances are thread-safe and reusable +- Data generation is deterministic with consistent quality +- Memory usage is minimal for the faker instance + +**Note on JDBC API**: This example currently uses the `JdbcSink.sink()` method which may be deprecated in favor of `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink`. However, the new API is not yet available in the current JDBC connector version (3.3.0-1.20). The implementation will be updated to use the newer non-deprecated API once it becomes available in stable form. + +## Data Quality and Realism + +JavaFaker provides: +- **Realistic names** from various cultures and backgrounds +- **Valid email formats** with diverse domains +- **Proper phone number formats** for different regions +- **Real city and country combinations** +- **Believable job titles and company names** +- **Consistent age ranges** (18+ years old) + +## Troubleshooting + +### Common Issues + +1. **Connection refused to PostgreSQL:** + - Ensure Docker container is running: `docker-compose ps` + - Check port availability: `netstat -an | grep 5432` + +2. **Table does not exist:** + - Verify initialization script ran: `docker-compose logs postgres` + - Check if new schema was applied correctly + +3. **JavaFaker dependency issues:** + - Ensure Maven downloaded dependencies: `mvn dependency:resolve` + - Check for version conflicts: `mvn dependency:tree` + +4. **Data generation performance:** + - JavaFaker is optimized for realistic data generation + - Consider adjusting `records.per.second` for your use case + +### Logs + +Check application logs for: +- JDBC connection status +- Batch execution metrics +- JavaFaker data generation performance +- Error messages and stack traces + +## Dependencies + +Key dependencies used in this example: +- `flink-connector-datagen`: For controlled data generation +- `flink-connector-jdbc`: For JDBC database connectivity +- `postgresql`: PostgreSQL JDBC driver +- `javafaker`: For realistic fake data generation +- `aws-kinesisanalytics-runtime`: For reading application properties + +## Related Examples + +- **FlinkDataGenerator**: Shows basic DataGen connector usage +- **FlinkCDC/FlinkCDCSQLServerSource**: Demonstrates CDC source with JDBC sink using Table API + +## License + +This example is provided under the same license as the parent repository. diff --git a/java/JdbcSink/docker/docker-compose.yml b/java/JdbcSink/docker/docker-compose.yml new file mode 100644 index 0000000..a77bac3 --- /dev/null +++ b/java/JdbcSink/docker/docker-compose.yml @@ -0,0 +1,25 @@ +services: + # PostgreSQL database + postgres: + image: postgres:15 + container_name: postgres_jdbc_sink + restart: always + environment: + POSTGRES_DB: testdb + POSTGRES_USER: flinkuser + POSTGRES_PASSWORD: flinkpassword + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./postgres-init:/docker-entrypoint-initdb.d + healthcheck: + test: ["CMD-SHELL", "pg_isready -U flinkuser -d testdb"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + +volumes: + postgres_data: + driver: local diff --git a/java/JdbcSink/docker/postgres-init/init.sql b/java/JdbcSink/docker/postgres-init/init.sql new file mode 100644 index 0000000..452ee50 --- /dev/null +++ b/java/JdbcSink/docker/postgres-init/init.sql @@ -0,0 +1,43 @@ +-- Create users table for JDBC sink with comprehensive user information +CREATE TABLE users ( + user_id INT PRIMARY KEY, + first_name VARCHAR(50) NOT NULL, + last_name VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + phone_number VARCHAR(20), + address VARCHAR(200), + city VARCHAR(50), + country VARCHAR(50), + job_title VARCHAR(100), + company VARCHAR(100), + date_of_birth DATE, + created_at TIMESTAMP NOT NULL +); + +-- Create indexes for better query performance +CREATE INDEX idx_users_email ON users(email); +CREATE INDEX idx_users_created_at ON users(created_at); +CREATE INDEX idx_users_name ON users(first_name, last_name); +CREATE INDEX idx_users_location ON users(city, country); +CREATE INDEX idx_users_company ON users(company); +CREATE INDEX idx_users_job_title ON users(job_title); + +-- Insert some sample data for testing +INSERT INTO users (user_id, first_name, last_name, email, phone_number, address, city, country, job_title, company, date_of_birth, created_at) VALUES +(0, 'Sample', 'User', 'sample.user@example.com', '+1-555-123-4567', '123 Main St', 'Sample City', 'Sample Country', 'Software Engineer', 'Sample Corp', '1990-01-01', NOW()); + +-- Display table structure +\d users; + +-- Display sample data +SELECT * FROM users; + +-- Show table statistics +SELECT + schemaname, + tablename, + attname as column_name, + n_distinct, + correlation +FROM pg_stats +WHERE tablename = 'users'; diff --git a/java/JdbcSink/monitor.sh b/java/JdbcSink/monitor.sh new file mode 100755 index 0000000..8b9407b --- /dev/null +++ b/java/JdbcSink/monitor.sh @@ -0,0 +1,138 @@ +#!/bin/bash + +# Monitoring script for JdbcSink example with JavaFaker data +set -e + +echo "📊 JdbcSink Monitoring Dashboard (JavaFaker Edition)" +echo "====================================================" + +# Function to execute SQL and display results +execute_sql() { + local query="$1" + local description="$2" + + echo "" + echo "🔍 $description" + echo "---" + docker-compose -f docker/docker-compose.yml exec -T postgres psql -U flinkuser -d testdb -c "$query" +} + +# Check if PostgreSQL is running +if ! docker-compose -f docker/docker-compose.yml ps postgres | grep -q "Up"; then + echo "❌ PostgreSQL is not running. Please start it first:" + echo " cd docker && docker-compose up -d" + exit 1 +fi + +# Display current timestamp +echo "⏰ Current time: $(date)" + +# Total record count +execute_sql "SELECT COUNT(*) as total_records FROM users;" "Total Records" + +# Recent records with key information +execute_sql "SELECT user_id, first_name, last_name, email, city, country, job_title, company FROM users ORDER BY created_at DESC LIMIT 5;" "Latest 5 Records" + +# Records per minute (last 10 minutes) +execute_sql " +SELECT + DATE_TRUNC('minute', created_at) as minute, + COUNT(*) as records_per_minute +FROM users +WHERE created_at >= NOW() - INTERVAL '10 minutes' +GROUP BY DATE_TRUNC('minute', created_at) +ORDER BY minute DESC +LIMIT 10;" "Records per Minute (Last 10 minutes)" + +# Top countries +execute_sql " +SELECT + country, + COUNT(*) as count +FROM users +GROUP BY country +ORDER BY count DESC +LIMIT 10;" "Top Countries" + +# Top cities +execute_sql " +SELECT + city, + country, + COUNT(*) as count +FROM users +GROUP BY city, country +ORDER BY count DESC +LIMIT 10;" "Top Cities" + +# Top companies +execute_sql " +SELECT + company, + COUNT(*) as count +FROM users +GROUP BY company +ORDER BY count DESC +LIMIT 10;" "Top Companies" + +# Top job titles +execute_sql " +SELECT + job_title, + COUNT(*) as count +FROM users +GROUP BY job_title +ORDER BY count DESC +LIMIT 10;" "Top Job Titles" + +# Age distribution (approximate) +execute_sql " +SELECT + CASE + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 18 AND 25 THEN '18-25' + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 26 AND 35 THEN '26-35' + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 36 AND 45 THEN '36-45' + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 46 AND 55 THEN '46-55' + WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 56 AND 65 THEN '56-65' + ELSE '65+' + END as age_group, + COUNT(*) as count +FROM users +WHERE date_of_birth IS NOT NULL +GROUP BY age_group +ORDER BY age_group;" "Age Distribution" + +# Email domains +execute_sql " +SELECT + SPLIT_PART(email, '@', 2) as domain, + COUNT(*) as count +FROM users +GROUP BY SPLIT_PART(email, '@', 2) +ORDER BY count DESC +LIMIT 10;" "Email Domains Distribution" + +# First and last record timestamps +execute_sql " +SELECT + MIN(created_at) as first_record, + MAX(created_at) as last_record, + MAX(created_at) - MIN(created_at) as duration +FROM users;" "Time Range" + +# Data quality check +execute_sql " +SELECT + COUNT(*) as total_records, + COUNT(DISTINCT email) as unique_emails, + COUNT(CASE WHEN phone_number IS NOT NULL THEN 1 END) as records_with_phone, + COUNT(CASE WHEN address IS NOT NULL THEN 1 END) as records_with_address, + COUNT(CASE WHEN date_of_birth IS NOT NULL THEN 1 END) as records_with_dob, + ROUND(AVG(EXTRACT(YEAR FROM AGE(date_of_birth))), 1) as avg_age +FROM users;" "Data Quality Summary" + +echo "" +echo "🔄 To refresh this dashboard, run: ./monitor.sh" +echo "🛑 To stop monitoring: Ctrl+C" +echo "📝 To view logs: docker-compose -f docker/docker-compose.yml logs -f postgres" +echo "🎭 Powered by JavaFaker for realistic fake data generation" diff --git a/java/JdbcSink/pom.xml b/java/JdbcSink/pom.xml new file mode 100644 index 0000000..38c35c7 --- /dev/null +++ b/java/JdbcSink/pom.xml @@ -0,0 +1,199 @@ + + + 4.0.0 + + com.amazonaws + flink-jdbc-sink + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + ${target.java.version} + ${target.java.version} + 1.20.0 + 3.3.0-1.20 + 5.1.0 + 42.7.2 + 1.2.0 + 2.23.1 + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-datagen + ${flink.version} + + + + + org.apache.flink + flink-connector-jdbc + ${flink.jdbc.connector.version} + + + + + org.postgresql + postgresql + ${postgresql.jdbc.driver.version} + + + + + com.zaxxer + HikariCP + ${hikari.version} + + + + + com.github.javafaker + javafaker + 1.0.2 + + + + + com.fasterxml.jackson.core + jackson-annotations + 2.15.2 + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + junit + junit + 4.13.2 + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.JdbcSinkJob + + + + + + + + + diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java new file mode 100644 index 0000000..a3c53c1 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java @@ -0,0 +1,245 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.domain.User; +import com.amazonaws.services.msf.domain.UserGeneratorFunction; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Properties; + +/** + * A Flink application that generates random user data using DataGeneratorSource + * and writes it to a PostgreSQL database using the JDBC connector. + */ +public class JdbcSinkJob { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + // Default values for configuration + private static final int DEFAULT_RECORDS_PER_SECOND = 10; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + JdbcSinkJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + /** + * Create a DataGeneratorSource with configurable rate from DataGen properties + * + * @param dataGenProperties Properties from the "DataGen" property group + * @param generatorFunction The generator function to use for data generation + * @param typeInformation Type information for the generated data type + * @param The type of data to generate + * @return Configured DataGeneratorSource + */ + private static DataGeneratorSource createDataGeneratorSource( + Properties dataGenProperties, + GeneratorFunction generatorFunction, + TypeInformation typeInformation) { + + int recordsPerSecond; + if (dataGenProperties != null) { + String recordsPerSecondStr = dataGenProperties.getProperty("records.per.second"); + if (recordsPerSecondStr != null && !recordsPerSecondStr.trim().isEmpty()) { + try { + recordsPerSecond = Integer.parseInt(recordsPerSecondStr.trim()); + } catch (NumberFormatException e) { + LOG.error("Invalid records.per.second value: '{}'. Must be a valid integer. ", recordsPerSecondStr); + throw e; + } + } else { + LOG.info("No records.per.second configured. Using default: {}", DEFAULT_RECORDS_PER_SECOND); + recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; + } + } else { + LOG.info("No DataGen properties found. Using default records per second: {}", DEFAULT_RECORDS_PER_SECOND); + recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; + } + + Preconditions.checkArgument(recordsPerSecond > 0, + "Invalid records.per.second value. Must be positive."); + + return new DataGeneratorSource( + generatorFunction, + Long.MAX_VALUE, // Generate (practically) unlimited records + RateLimiterStrategy.perSecond(recordsPerSecond), // Configurable rate + typeInformation // Explicit type information + ); + } + + private static JdbcSink createJdbcSink(Properties sinkProperties) { + + + return JdbcSink.builder() + .buildAtLeastOnce(); + } + + +// /** +// * Create a JDBC Sink for PostgreSQL using the non-deprecated API +// * +// * @param jdbcProperties Properties from the "JdbcSink" property group +// * @return an instance of SinkFunction for User objects +// */ +// private static SinkFunction createJdbcSink(Properties jdbcProperties) { +// String jdbcUrl = Preconditions.checkNotNull( +// jdbcProperties.getProperty("url"), +// "JDBC URL is required" +// ); +// String username = Preconditions.checkNotNull( +// jdbcProperties.getProperty("username"), +// "JDBC username is required" +// ); +// String password = Preconditions.checkNotNull( +// jdbcProperties.getProperty("password"), +// "JDBC password is required" +// ); +// String tableName = jdbcProperties.getProperty("table.name", "users"); +// +// // SQL statement for inserting user data with all fields +// String insertSQL = String.format( +// "INSERT INTO %s (user_id, first_name, last_name, email, phone_number, address, city, country, job_title, company, date_of_birth, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", +// tableName +// ); +// +// // JDBC statement builder +// JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { +// @Override +// public void accept(PreparedStatement preparedStatement, User user) throws SQLException { +// preparedStatement.setInt(1, user.getUserId()); +// preparedStatement.setString(2, user.getFirstName()); +// preparedStatement.setString(3, user.getLastName()); +// preparedStatement.setString(4, user.getEmail()); +// preparedStatement.setString(5, user.getPhoneNumber()); +// preparedStatement.setString(6, user.getAddress()); +// preparedStatement.setString(7, user.getCity()); +// preparedStatement.setString(8, user.getCountry()); +// preparedStatement.setString(9, user.getJobTitle()); +// preparedStatement.setString(10, user.getCompany()); +// +// // Parse the date of birth and convert to SQL Date +// if (user.getDateOfBirth() != null && !user.getDateOfBirth().isEmpty()) { +// java.time.LocalDate birthDate = java.time.LocalDate.parse(user.getDateOfBirth(), DateTimeFormatter.ofPattern("yyyy-MM-dd")); +// preparedStatement.setDate(11, java.sql.Date.valueOf(birthDate)); +// } else { +// preparedStatement.setDate(11, null); +// } +// +// // Parse the ISO timestamp and convert to SQL Timestamp +// LocalDateTime dateTime = LocalDateTime.parse(user.getCreatedAt(), DateTimeFormatter.ISO_LOCAL_DATE_TIME); +// preparedStatement.setTimestamp(12, Timestamp.valueOf(dateTime)); +// } +// }; +// +// // JDBC connection options +// JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() +// .withUrl(jdbcUrl) +// .withDriverName("org.postgresql.Driver") +// .withUsername(username) +// .withPassword(password) +// .build(); +// +// // JDBC execution options +// JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() +// .withBatchSize(1000) +// .withBatchIntervalMs(200) +// .withMaxRetries(5) +// .build(); +// +// // Use the non-deprecated JdbcSink from org.apache.flink.connector.jdbc.sink +// return JdbcSink.builder() +// .setJdbcConnectionOptions(connectionOptions) +// .setJdbcExecutionOptions(executionOptions) +// .setJdbcStatementBuilder(statementBuilder) +// .setSql(insertSQL) +// .build(); +// } + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Allows Flink to reuse objects across forwarded operators, as opposed to do a deep copy + // (this is safe because record objects are never mutated or passed by reference) + env.getConfig().enableObjectReuse(); + + LOG.info("Starting Flink JDBC Sink Job"); + + // Load application properties + final Map applicationProperties = loadApplicationProperties(env); + LOG.info("Application properties: {}", applicationProperties); + + // Create a DataGeneratorSource that generates User objects + DataGeneratorSource source = createDataGeneratorSource( + applicationProperties.get("DataGen"), + new UserGeneratorFunction(), + TypeInformation.of(User.class) + ); + + // Create the data stream from the source + DataStream userStream = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "User Data Generator" + ).uid("user-data-generator"); + + // Check if JDBC sink is configured + Properties jdbcProperties = applicationProperties.get("JdbcSink"); + if (jdbcProperties == null) { + throw new IllegalArgumentException( + "JdbcSink configuration is required. Please provide 'JdbcSink' configuration group."); + } + + // Create JDBC sink + JdbcSink jdbcSink = createJdbcSink(jdbcProperties); + userStream.sinkTo(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); + + // Add print sink for local testing + if (isLocal(env)) { + userStream.print().uid("print-sink").name("Print Sink"); + LOG.info("Print sink configured for local testing"); + } + + LOG.info("JDBC sink configured"); + + // Execute the job + env.execute("Flink JDBC Sink Job"); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java new file mode 100644 index 0000000..f913a09 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java @@ -0,0 +1,196 @@ +package com.amazonaws.services.msf.domain; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class User { + @JsonProperty("user_id") + private int userId; + + @JsonProperty("first_name") + private String firstName; + + @JsonProperty("last_name") + private String lastName; + + private String email; + + @JsonProperty("phone_number") + private String phoneNumber; + + private String address; + + private String city; + + private String country; + + @JsonProperty("job_title") + private String jobTitle; + + private String company; + + @JsonProperty("date_of_birth") + private String dateOfBirth; + + @JsonProperty("created_at") + private String createdAt; + + public User() {} + + public User(int userId, String firstName, String lastName, String email, String phoneNumber, + String address, String city, String country, String jobTitle, String company, + String dateOfBirth, String createdAt) { + this.userId = userId; + this.firstName = firstName; + this.lastName = lastName; + this.email = email; + this.phoneNumber = phoneNumber; + this.address = address; + this.city = city; + this.country = country; + this.jobTitle = jobTitle; + this.company = company; + this.dateOfBirth = dateOfBirth; + this.createdAt = createdAt; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getPhoneNumber() { + return phoneNumber; + } + + public void setPhoneNumber(String phoneNumber) { + this.phoneNumber = phoneNumber; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getCity() { + return city; + } + + public void setCity(String city) { + this.city = city; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + public String getJobTitle() { + return jobTitle; + } + + public void setJobTitle(String jobTitle) { + this.jobTitle = jobTitle; + } + + public String getCompany() { + return company; + } + + public void setCompany(String company) { + this.company = company; + } + + public String getDateOfBirth() { + return dateOfBirth; + } + + public void setDateOfBirth(String dateOfBirth) { + this.dateOfBirth = dateOfBirth; + } + + public String getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(String createdAt) { + this.createdAt = createdAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + User user = (User) o; + return userId == user.userId && + Objects.equals(firstName, user.firstName) && + Objects.equals(lastName, user.lastName) && + Objects.equals(email, user.email) && + Objects.equals(phoneNumber, user.phoneNumber) && + Objects.equals(address, user.address) && + Objects.equals(city, user.city) && + Objects.equals(country, user.country) && + Objects.equals(jobTitle, user.jobTitle) && + Objects.equals(company, user.company) && + Objects.equals(dateOfBirth, user.dateOfBirth) && + Objects.equals(createdAt, user.createdAt); + } + + @Override + public int hashCode() { + return Objects.hash(userId, firstName, lastName, email, phoneNumber, address, + city, country, jobTitle, company, dateOfBirth, createdAt); + } + + @Override + public String toString() { + return "User{" + + "user_id=" + userId + + ", first_name='" + firstName + '\'' + + ", last_name='" + lastName + '\'' + + ", email='" + email + '\'' + + ", phone_number='" + phoneNumber + '\'' + + ", address='" + address + '\'' + + ", city='" + city + '\'' + + ", country='" + country + '\'' + + ", job_title='" + jobTitle + '\'' + + ", company='" + company + '\'' + + ", date_of_birth='" + dateOfBirth + '\'' + + ", created_at='" + createdAt + '\'' + + '}'; + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java new file mode 100644 index 0000000..c6aa8f8 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java @@ -0,0 +1,65 @@ +package com.amazonaws.services.msf.domain; + +import com.github.javafaker.Faker; +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +/** + * Generator function that creates realistic fake User objects using JavaFaker. + * Implements GeneratorFunction to work with DataGeneratorSource. + */ +public class UserGeneratorFunction implements GeneratorFunction { + + // JavaFaker instance for generating fake data + private static final Faker faker = new Faker(Locale.ENGLISH); + + // Date formatter for ISO format timestamps + private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + @Override + public User map(Long value) throws Exception { + // Generate user ID based on the sequence value + int userId = value.intValue(); + + // Generate current timestamp in ISO format + String createdAt = LocalDateTime.now().format(ISO_FORMATTER); + + // Generate fake personal information + String firstName = faker.name().firstName(); + String lastName = faker.name().lastName(); + String email = faker.internet().emailAddress(); + String phoneNumber = faker.phoneNumber().phoneNumber(); + + // Generate fake address information + String address = faker.address().streetAddress(); + String city = faker.address().city(); + String country = faker.address().country(); + + // Generate fake professional information + String jobTitle = faker.job().title(); + String company = faker.company().name(); + + // Generate fake date of birth (between 18 and 80 years ago) + String dateOfBirth = faker.date() + .past(365 * 62, TimeUnit.DAYS) // Up to 62 years ago + .toInstant() + .atZone(java.time.ZoneId.systemDefault()) + .toLocalDate() + .format(DATE_FORMATTER); + + // Ensure the person is at least 18 years old + if (LocalDateTime.now().minusYears(18).toLocalDate() + .isBefore(java.time.LocalDate.parse(dateOfBirth, DATE_FORMATTER))) { + dateOfBirth = LocalDateTime.now().minusYears(18 + faker.number().numberBetween(0, 44)) + .toLocalDate().format(DATE_FORMATTER); + } + + return new User(userId, firstName, lastName, email, phoneNumber, address, + city, country, jobTitle, company, dateOfBirth, createdAt); + } +} diff --git a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..e5a6885 --- /dev/null +++ b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,17 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.second": "5" + } + }, + { + "PropertyGroupId": "JdbcSink", + "PropertyMap": { + "url": "jdbc:postgresql://localhost:5432/testdb", + "username": "flinkuser", + "password": "flinkpassword", + "table.name": "users" + } + } +] diff --git a/java/JdbcSink/src/main/resources/log4j2.properties b/java/JdbcSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..e09c2b4 --- /dev/null +++ b/java/JdbcSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = Console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/JdbcSink/test-local.sh b/java/JdbcSink/test-local.sh new file mode 100755 index 0000000..103f3e0 --- /dev/null +++ b/java/JdbcSink/test-local.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +# Test script for JdbcSink example with JavaFaker +set -e + +echo "🚀 Starting JdbcSink Example Test (JavaFaker Edition)" + +# Check if Docker is running +if ! docker info > /dev/null 2>&1; then + echo "❌ Docker is not running. Please start Docker first." + exit 1 +fi + +echo "✅ Docker is running" + +# Navigate to docker directory +cd docker + +# Start PostgreSQL +echo "🐘 Starting PostgreSQL..." +docker-compose up -d + +# Wait for PostgreSQL to be ready +echo "⏳ Waiting for PostgreSQL to be ready..." +timeout=60 +counter=0 +while ! docker-compose exec -T postgres pg_isready -U flinkuser -d testdb > /dev/null 2>&1; do + if [ $counter -ge $timeout ]; then + echo "❌ PostgreSQL failed to start within $timeout seconds" + docker-compose logs postgres + exit 1 + fi + sleep 1 + counter=$((counter + 1)) +done + +echo "✅ PostgreSQL is ready" + +# Check if table was created +echo "🔍 Checking if users table was created..." +if docker-compose exec -T postgres psql -U flinkuser -d testdb -c "\dt" | grep -q users; then + echo "✅ Users table exists" +else + echo "❌ Users table was not created" + exit 1 +fi + +# Show table structure +echo "📋 Table structure:" +docker-compose exec -T postgres psql -U flinkuser -d testdb -c "\d users" + +# Show initial data +echo "📊 Initial data:" +docker-compose exec -T postgres psql -U flinkuser -d testdb -c "SELECT * FROM users;" + +# Go back to project root +cd .. + +# Build the project +echo "🔨 Building the project..." +mvn clean package -q + +if [ $? -eq 0 ]; then + echo "✅ Build successful" +else + echo "❌ Build failed" + exit 1 +fi + +echo "" +echo "🎉 Setup complete! You can now:" +echo " 1. Run the Flink job: mvn exec:java -Dexec.mainClass=\"com.amazonaws.services.msf.JdbcSinkJob\"" +echo " 2. Monitor data: ./monitor.sh" +echo " 3. Stop PostgreSQL: docker-compose -f docker/docker-compose.yml down" +echo "" +echo "📝 Database connection details:" +echo " URL: jdbc:postgresql://localhost:5432/testdb" +echo " Username: flinkuser" +echo " Password: flinkpassword" +echo "" +echo "🎭 This example uses JavaFaker to generate realistic fake user data!" diff --git a/java/pom.xml b/java/pom.xml index 0cba628..26ca9bc 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -45,5 +45,6 @@ S3AvroSource FlinkCDC/FlinkCDCSQLServerSource FlinkDataGenerator + JdbcSink \ No newline at end of file From 7e0d1c2a75ea0ac06fb8d287e793bde64524e007 Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 25 Jul 2025 17:04:14 +0100 Subject: [PATCH 2/7] Working JdbcSink UPSERT example using old API --- java/JdbcSink/README.md | 314 ++++-------------- java/JdbcSink/docker/docker-compose.yml | 2 +- java/JdbcSink/docker/postgres-init/init.sql | 37 +-- java/JdbcSink/monitor.sh | 138 -------- java/JdbcSink/pom.xml | 9 +- .../amazonaws/services/msf/JdbcSinkJob.java | 200 ++++++----- .../services/msf/domain/StockPrice.java | 68 ++++ .../domain/StockPriceGeneratorFunction.java | 47 +++ .../amazonaws/services/msf/domain/User.java | 196 ----------- .../msf/domain/UserGeneratorFunction.java | 65 ---- .../flink-application-properties-dev.json | 2 +- .../NamedParameterPreparedStatementTest.java | 82 +++++ java/JdbcSink/test-local.sh | 81 ----- 13 files changed, 365 insertions(+), 876 deletions(-) delete mode 100755 java/JdbcSink/monitor.sh create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java delete mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java delete mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java create mode 100644 java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java delete mode 100755 java/JdbcSink/test-local.sh diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index 4c3cd88..efa0b40 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -1,223 +1,91 @@ -# Flink JDBC Sink Example with JavaFaker +## Flink JDBC Sink -This example demonstrates how to use Apache Flink's DataStream API to generate realistic fake user data using JavaFaker and write it to a PostgreSQL database using the JDBC connector. +This example demonstrates how to use Apache Flink's DataStream API JDBC Sink to execute UPSERT into a relational database. +The example leverages the UPSERT functionality of PostgreSQL. -## Overview +* Flink version: 1.20 +* Flink API: DataStream +* Language: Java (11) +* Flink connectors: JDBC sink, DataGen -The application: -1. Generates realistic fake user data using JavaFaker library -2. Uses the DataGen connector for controlled data generation rates -3. Writes the data to a PostgreSQL database using the JDBC connector -4. Supports configurable data generation rates -5. Includes proper error handling and retry mechanisms +### Generated Data Schema -## Architecture - -``` -DataGeneratorSource -> JavaFaker -> DataStream -> JDBC Sink -> PostgreSQL -``` - -## Generated Data Schema - -The application generates comprehensive `User` objects with realistic fake data: +The application generates comprehensive `StockPrice` objects with realistic fake data: ```json { - "user_id": 1, - "first_name": "John", - "last_name": "Smith", - "email": "john.smith@example.com", - "phone_number": "+1-555-123-4567", - "address": "123 Main Street", - "city": "New York", - "country": "United States", - "job_title": "Software Engineer", - "company": "Tech Corp", - "date_of_birth": "1990-05-15", - "created_at": "2024-07-15T10:30:45" + "price_id": 1, + "symbol": "AAPL", + "timestamp": "2024-07-25T10:30:45", + "price": 150.25 } ``` -## JavaFaker Integration - -The application uses [JavaFaker](https://github.com/DiUS/java-faker) to generate realistic fake data: - -- **Personal Information**: Names, emails, phone numbers, addresses -- **Location Data**: Cities, countries with realistic combinations -- **Professional Data**: Job titles, company names -- **Demographics**: Date of birth (ensuring users are 18+ years old) -- **Timestamps**: ISO formatted creation timestamps - -## Database Schema - -The PostgreSQL table structure accommodates comprehensive user information: - -```sql -CREATE TABLE users ( - user_id INT PRIMARY KEY, - first_name VARCHAR(50) NOT NULL, - last_name VARCHAR(50) NOT NULL, - email VARCHAR(100) NOT NULL UNIQUE, - phone_number VARCHAR(20), - address VARCHAR(200), - city VARCHAR(50), - country VARCHAR(50), - job_title VARCHAR(100), - company VARCHAR(100), - date_of_birth DATE, - created_at TIMESTAMP NOT NULL -); -``` - -### Indexes - -The table includes optimized indexes for common query patterns: -- Email (unique constraint + index) -- Creation timestamp -- Name combinations -- Location (city, country) -- Company and job title +### Database prerequisites -## Configuration +When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the database manually, ensuring you set up all the following: -The application uses two property groups: +> You can find the SQL script that sets up the dockerized database by checking out the init script for +> [PostgreSQL](docker/postgres-init/init.sql). -### DataGen Properties -- `records.per.second`: Number of records to generate per second (default: 10) +1. **PostgreSQL Database** + 1. The database name must match the `url` configured in the JDBC sink + 2. The destination table must have the following schema: + ```sql + CREATE TABLE prices ( + symbol VARCHAR(10) PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + price DECIMAL(10,2) NOT NULL + ); + ``` + 3. The database user must have SELECT, INSERT, and UPDATE permissions on the prices table -### JdbcSink Properties -- `url`: JDBC connection URL -- `username`: Database username -- `password`: Database password -- `table.name`: Target table name (default: "users") -## Local Development +## Testing with local database using Docker Compose -### Prerequisites -- Java 11 or higher -- Maven 3.6 or higher -- Docker and Docker Compose +This example can be run locally using Docker. -### Running Locally +A [Docker Compose file](./docker/docker-compose.yml) is provided to run a local PostgreSQL database. +The local database is initialized by creating the database, user, and prices table with sample data. -1. **Start PostgreSQL database:** - ```bash - cd docker - docker-compose up -d - ``` +You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij). -2. **Verify database is running:** - ```bash - docker-compose logs postgres - ``` +To start the local database run `docker compose up -d` in the `./docker` folder. -3. **Connect to database (optional):** - ```bash - docker exec -it postgres_jdbc_sink psql -U flinkuser -d testdb - ``` +Use `docker compose down -v` to shut it down, also removing the data volumes. -4. **Build the application:** - ```bash - mvn clean package - ``` +### Runtime configuration -5. **Run the application:** - ```bash - mvn exec:java -Dexec.mainClass="com.amazonaws.services.msf.JdbcSinkJob" - ``` +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. -### Quick Start +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. -Use the automated setup script: -```bash -./test-local.sh -``` +Runtime parameters: -### Configuration for Local Development +| Group ID | Key | Description | +|------------|--------------------|----------------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | +| `JdbcSink` | `url` | PostgreSQL JDBC URL. e.g. `jdbc:postgresql://your-rds-endpoint:5432/your-database`. Note: the URL includes the database name. | +| `JdbcSink` | `table.name` | Destination table. e.g. `prices` (default: "prices") | +| `JdbcSink` | `username` | Database user with INSERT and UPDATE permissions | +| `JdbcSink` | `password` | Database password | -The local configuration is defined in `src/main/resources/flink-application-properties-dev.json`: -```json -[ - { - "PropertyGroupId": "DataGen", - "PropertyMap": { - "records.per.second": "5" - } - }, - { - "PropertyGroupId": "JdbcSink", - "PropertyMap": { - "url": "jdbc:postgresql://localhost:5432/testdb", - "username": "flinkuser", - "password": "flinkpassword", - "table.name": "users" - } - } -] -``` +### Running in IntelliJ -### Monitoring +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Run the database locally using Docker Compose, as described [above](#testing-with-local-database-using-docker-compose). -Use the enhanced monitoring dashboard: -```bash -./monitor.sh -``` - -The monitoring script provides insights into: -- **Record counts and generation rates** -- **Geographic distribution** (countries, cities) -- **Professional data** (companies, job titles) -- **Demographics** (age distribution) -- **Data quality metrics** -- **Email domain analysis** - -Sample monitoring output: -```sql --- View recent realistic records -SELECT user_id, first_name, last_name, city, country, job_title, company -FROM users ORDER BY created_at DESC LIMIT 5; - --- Analyze geographic distribution -SELECT country, COUNT(*) as count -FROM users GROUP BY country ORDER BY count DESC LIMIT 10; - --- Check age distribution -SELECT - CASE - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 18 AND 25 THEN '18-25' - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 26 AND 35 THEN '26-35' - -- ... more age groups - END as age_group, - COUNT(*) as count -FROM users GROUP BY age_group; -``` +See [Running examples locally](../running-examples-locally.md) for details about running the application in the IDE. -## Deployment to Amazon Managed Service for Apache Flink -### Application Properties +### Running on Amazon Managed Service for Apache Flink -Configure the following application properties in your Managed Flink application: +To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following: +* VPC networking +* The selected Subnets can route traffic to the PostgreSQL database +* The Security Group allows traffic from the application to the database -```json -[ - { - "PropertyGroupId": "DataGen", - "PropertyMap": { - "records.per.second": "100" - } - }, - { - "PropertyGroupId": "JdbcSink", - "PropertyMap": { - "url": "jdbc:postgresql://your-rds-endpoint:5432/your-database", - "username": "your-username", - "password": "your-password", - "table.name": "users" - } - } -] -``` ### Security Considerations @@ -226,74 +94,4 @@ For production deployments: 2. Use VPC endpoints for secure database connectivity 3. Enable SSL/TLS for database connections 4. Configure appropriate IAM roles and policies -5. Consider data privacy implications of generated fake data - -### Performance Tuning - -The JDBC sink is configured with: -- Batch size: 1000 records -- Batch interval: 200ms -- Max retries: 5 - -JavaFaker performance considerations: -- Faker instances are thread-safe and reusable -- Data generation is deterministic with consistent quality -- Memory usage is minimal for the faker instance - -**Note on JDBC API**: This example currently uses the `JdbcSink.sink()` method which may be deprecated in favor of `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink`. However, the new API is not yet available in the current JDBC connector version (3.3.0-1.20). The implementation will be updated to use the newer non-deprecated API once it becomes available in stable form. - -## Data Quality and Realism - -JavaFaker provides: -- **Realistic names** from various cultures and backgrounds -- **Valid email formats** with diverse domains -- **Proper phone number formats** for different regions -- **Real city and country combinations** -- **Believable job titles and company names** -- **Consistent age ranges** (18+ years old) - -## Troubleshooting - -### Common Issues - -1. **Connection refused to PostgreSQL:** - - Ensure Docker container is running: `docker-compose ps` - - Check port availability: `netstat -an | grep 5432` - -2. **Table does not exist:** - - Verify initialization script ran: `docker-compose logs postgres` - - Check if new schema was applied correctly - -3. **JavaFaker dependency issues:** - - Ensure Maven downloaded dependencies: `mvn dependency:resolve` - - Check for version conflicts: `mvn dependency:tree` - -4. **Data generation performance:** - - JavaFaker is optimized for realistic data generation - - Consider adjusting `records.per.second` for your use case - -### Logs - -Check application logs for: -- JDBC connection status -- Batch execution metrics -- JavaFaker data generation performance -- Error messages and stack traces - -## Dependencies - -Key dependencies used in this example: -- `flink-connector-datagen`: For controlled data generation -- `flink-connector-jdbc`: For JDBC database connectivity -- `postgresql`: PostgreSQL JDBC driver -- `javafaker`: For realistic fake data generation -- `aws-kinesisanalytics-runtime`: For reading application properties - -## Related Examples - -- **FlinkDataGenerator**: Shows basic DataGen connector usage -- **FlinkCDC/FlinkCDCSQLServerSource**: Demonstrates CDC source with JDBC sink using Table API - -## License - -This example is provided under the same license as the parent repository. +5. Use RDS with encryption at rest and in transit diff --git a/java/JdbcSink/docker/docker-compose.yml b/java/JdbcSink/docker/docker-compose.yml index a77bac3..36d073e 100644 --- a/java/JdbcSink/docker/docker-compose.yml +++ b/java/JdbcSink/docker/docker-compose.yml @@ -2,7 +2,7 @@ services: # PostgreSQL database postgres: image: postgres:15 - container_name: postgres_jdbc_sink + container_name: postgres restart: always environment: POSTGRES_DB: testdb diff --git a/java/JdbcSink/docker/postgres-init/init.sql b/java/JdbcSink/docker/postgres-init/init.sql index 452ee50..d62972e 100644 --- a/java/JdbcSink/docker/postgres-init/init.sql +++ b/java/JdbcSink/docker/postgres-init/init.sql @@ -1,36 +1,19 @@ --- Create users table for JDBC sink with comprehensive user information -CREATE TABLE users ( - user_id INT PRIMARY KEY, - first_name VARCHAR(50) NOT NULL, - last_name VARCHAR(50) NOT NULL, - email VARCHAR(100) NOT NULL UNIQUE, - phone_number VARCHAR(20), - address VARCHAR(200), - city VARCHAR(50), - country VARCHAR(50), - job_title VARCHAR(100), - company VARCHAR(100), - date_of_birth DATE, - created_at TIMESTAMP NOT NULL +-- Create prices table for JDBC sink +CREATE TABLE prices ( + symbol VARCHAR(10) PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + price DECIMAL(10,2) NOT NULL ); --- Create indexes for better query performance -CREATE INDEX idx_users_email ON users(email); -CREATE INDEX idx_users_created_at ON users(created_at); -CREATE INDEX idx_users_name ON users(first_name, last_name); -CREATE INDEX idx_users_location ON users(city, country); -CREATE INDEX idx_users_company ON users(company); -CREATE INDEX idx_users_job_title ON users(job_title); - -- Insert some sample data for testing -INSERT INTO users (user_id, first_name, last_name, email, phone_number, address, city, country, job_title, company, date_of_birth, created_at) VALUES -(0, 'Sample', 'User', 'sample.user@example.com', '+1-555-123-4567', '123 Main St', 'Sample City', 'Sample Country', 'Software Engineer', 'Sample Corp', '1990-01-01', NOW()); +INSERT INTO prices (symbol, timestamp, price) VALUES +('AAPL', NOW(), 150.25); -- Display table structure -\d users; +\d prices; -- Display sample data -SELECT * FROM users; +SELECT * FROM prices; -- Show table statistics SELECT @@ -40,4 +23,4 @@ SELECT n_distinct, correlation FROM pg_stats -WHERE tablename = 'users'; +WHERE tablename = 'prices'; diff --git a/java/JdbcSink/monitor.sh b/java/JdbcSink/monitor.sh deleted file mode 100755 index 8b9407b..0000000 --- a/java/JdbcSink/monitor.sh +++ /dev/null @@ -1,138 +0,0 @@ -#!/bin/bash - -# Monitoring script for JdbcSink example with JavaFaker data -set -e - -echo "📊 JdbcSink Monitoring Dashboard (JavaFaker Edition)" -echo "====================================================" - -# Function to execute SQL and display results -execute_sql() { - local query="$1" - local description="$2" - - echo "" - echo "🔍 $description" - echo "---" - docker-compose -f docker/docker-compose.yml exec -T postgres psql -U flinkuser -d testdb -c "$query" -} - -# Check if PostgreSQL is running -if ! docker-compose -f docker/docker-compose.yml ps postgres | grep -q "Up"; then - echo "❌ PostgreSQL is not running. Please start it first:" - echo " cd docker && docker-compose up -d" - exit 1 -fi - -# Display current timestamp -echo "⏰ Current time: $(date)" - -# Total record count -execute_sql "SELECT COUNT(*) as total_records FROM users;" "Total Records" - -# Recent records with key information -execute_sql "SELECT user_id, first_name, last_name, email, city, country, job_title, company FROM users ORDER BY created_at DESC LIMIT 5;" "Latest 5 Records" - -# Records per minute (last 10 minutes) -execute_sql " -SELECT - DATE_TRUNC('minute', created_at) as minute, - COUNT(*) as records_per_minute -FROM users -WHERE created_at >= NOW() - INTERVAL '10 minutes' -GROUP BY DATE_TRUNC('minute', created_at) -ORDER BY minute DESC -LIMIT 10;" "Records per Minute (Last 10 minutes)" - -# Top countries -execute_sql " -SELECT - country, - COUNT(*) as count -FROM users -GROUP BY country -ORDER BY count DESC -LIMIT 10;" "Top Countries" - -# Top cities -execute_sql " -SELECT - city, - country, - COUNT(*) as count -FROM users -GROUP BY city, country -ORDER BY count DESC -LIMIT 10;" "Top Cities" - -# Top companies -execute_sql " -SELECT - company, - COUNT(*) as count -FROM users -GROUP BY company -ORDER BY count DESC -LIMIT 10;" "Top Companies" - -# Top job titles -execute_sql " -SELECT - job_title, - COUNT(*) as count -FROM users -GROUP BY job_title -ORDER BY count DESC -LIMIT 10;" "Top Job Titles" - -# Age distribution (approximate) -execute_sql " -SELECT - CASE - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 18 AND 25 THEN '18-25' - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 26 AND 35 THEN '26-35' - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 36 AND 45 THEN '36-45' - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 46 AND 55 THEN '46-55' - WHEN EXTRACT(YEAR FROM AGE(date_of_birth)) BETWEEN 56 AND 65 THEN '56-65' - ELSE '65+' - END as age_group, - COUNT(*) as count -FROM users -WHERE date_of_birth IS NOT NULL -GROUP BY age_group -ORDER BY age_group;" "Age Distribution" - -# Email domains -execute_sql " -SELECT - SPLIT_PART(email, '@', 2) as domain, - COUNT(*) as count -FROM users -GROUP BY SPLIT_PART(email, '@', 2) -ORDER BY count DESC -LIMIT 10;" "Email Domains Distribution" - -# First and last record timestamps -execute_sql " -SELECT - MIN(created_at) as first_record, - MAX(created_at) as last_record, - MAX(created_at) - MIN(created_at) as duration -FROM users;" "Time Range" - -# Data quality check -execute_sql " -SELECT - COUNT(*) as total_records, - COUNT(DISTINCT email) as unique_emails, - COUNT(CASE WHEN phone_number IS NOT NULL THEN 1 END) as records_with_phone, - COUNT(CASE WHEN address IS NOT NULL THEN 1 END) as records_with_address, - COUNT(CASE WHEN date_of_birth IS NOT NULL THEN 1 END) as records_with_dob, - ROUND(AVG(EXTRACT(YEAR FROM AGE(date_of_birth))), 1) as avg_age -FROM users;" "Data Quality Summary" - -echo "" -echo "🔄 To refresh this dashboard, run: ./monitor.sh" -echo "🛑 To stop monitoring: Ctrl+C" -echo "📝 To view logs: docker-compose -f docker/docker-compose.yml logs -f postgres" -echo "🎭 Powered by JavaFaker for realistic fake data generation" diff --git a/java/JdbcSink/pom.xml b/java/JdbcSink/pom.xml index 38c35c7..8f124fe 100644 --- a/java/JdbcSink/pom.xml +++ b/java/JdbcSink/pom.xml @@ -18,7 +18,6 @@ ${target.java.version} 1.20.0 3.3.0-1.20 - 5.1.0 42.7.2 1.2.0 2.23.1 @@ -87,11 +86,11 @@ ${postgresql.jdbc.driver.version} - + - com.zaxxer - HikariCP - ${hikari.version} + com.axiomalaska + jdbc-named-parameters + 1.1 diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java index a3c53c1..a1764bd 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java @@ -1,16 +1,18 @@ package com.amazonaws.services.msf; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; -import com.amazonaws.services.msf.domain.User; -import com.amazonaws.services.msf.domain.UserGeneratorFunction; +import com.amazonaws.services.msf.domain.StockPrice; +import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; +import com.axiomalaska.jdbc.NamedParameterPreparedStatement; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; -import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; -import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -20,6 +22,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; @@ -29,7 +33,7 @@ import java.util.Properties; /** - * A Flink application that generates random user data using DataGeneratorSource + * A Flink application that generates random stock price data using DataGeneratorSource * and writes it to a PostgreSQL database using the JDBC connector. */ public class JdbcSinkJob { @@ -104,121 +108,109 @@ private static DataGeneratorSource createDataGeneratorSource( ); } - private static JdbcSink createJdbcSink(Properties sinkProperties) { - return JdbcSink.builder() - .buildAtLeastOnce(); - } + /** + * Create a JDBC Sink for PostgreSQL using NamedParameterPreparedStatement from axiomalaska library + * + * @param jdbcProperties Properties from the "JdbcSink" property group + * @return an instance of SinkFunction for StockPrice objects + */ + private static SinkFunction createJdbcSink(Properties jdbcProperties) { + String jdbcUrl = Preconditions.checkNotNull( + jdbcProperties.getProperty("url"), + "JDBC URL is required" + ); + String username = Preconditions.checkNotNull( + jdbcProperties.getProperty("username"), + "JDBC username is required" + ); + String password = Preconditions.checkNotNull( + jdbcProperties.getProperty("password"), + "JDBC password is required" + ); + String tableName = jdbcProperties.getProperty("table.name", "prices"); + // SQL statement leveraging PostgreSQL UPSERT syntax + String namedSQL = String.format( + "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", + tableName + ); -// /** -// * Create a JDBC Sink for PostgreSQL using the non-deprecated API -// * -// * @param jdbcProperties Properties from the "JdbcSink" property group -// * @return an instance of SinkFunction for User objects -// */ -// private static SinkFunction createJdbcSink(Properties jdbcProperties) { -// String jdbcUrl = Preconditions.checkNotNull( -// jdbcProperties.getProperty("url"), -// "JDBC URL is required" -// ); -// String username = Preconditions.checkNotNull( -// jdbcProperties.getProperty("username"), -// "JDBC username is required" -// ); -// String password = Preconditions.checkNotNull( -// jdbcProperties.getProperty("password"), -// "JDBC password is required" -// ); -// String tableName = jdbcProperties.getProperty("table.name", "users"); -// -// // SQL statement for inserting user data with all fields -// String insertSQL = String.format( -// "INSERT INTO %s (user_id, first_name, last_name, email, phone_number, address, city, country, job_title, company, date_of_birth, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", -// tableName -// ); -// -// // JDBC statement builder -// JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { -// @Override -// public void accept(PreparedStatement preparedStatement, User user) throws SQLException { -// preparedStatement.setInt(1, user.getUserId()); -// preparedStatement.setString(2, user.getFirstName()); -// preparedStatement.setString(3, user.getLastName()); -// preparedStatement.setString(4, user.getEmail()); -// preparedStatement.setString(5, user.getPhoneNumber()); -// preparedStatement.setString(6, user.getAddress()); -// preparedStatement.setString(7, user.getCity()); -// preparedStatement.setString(8, user.getCountry()); -// preparedStatement.setString(9, user.getJobTitle()); -// preparedStatement.setString(10, user.getCompany()); -// -// // Parse the date of birth and convert to SQL Date -// if (user.getDateOfBirth() != null && !user.getDateOfBirth().isEmpty()) { -// java.time.LocalDate birthDate = java.time.LocalDate.parse(user.getDateOfBirth(), DateTimeFormatter.ofPattern("yyyy-MM-dd")); -// preparedStatement.setDate(11, java.sql.Date.valueOf(birthDate)); -// } else { -// preparedStatement.setDate(11, null); -// } -// -// // Parse the ISO timestamp and convert to SQL Timestamp -// LocalDateTime dateTime = LocalDateTime.parse(user.getCreatedAt(), DateTimeFormatter.ISO_LOCAL_DATE_TIME); -// preparedStatement.setTimestamp(12, Timestamp.valueOf(dateTime)); -// } -// }; -// -// // JDBC connection options -// JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() -// .withUrl(jdbcUrl) -// .withDriverName("org.postgresql.Driver") -// .withUsername(username) -// .withPassword(password) -// .build(); -// -// // JDBC execution options -// JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() -// .withBatchSize(1000) -// .withBatchIntervalMs(200) -// .withMaxRetries(5) -// .build(); -// -// // Use the non-deprecated JdbcSink from org.apache.flink.connector.jdbc.sink -// return JdbcSink.builder() -// .setJdbcConnectionOptions(connectionOptions) -// .setJdbcExecutionOptions(executionOptions) -// .setJdbcStatementBuilder(statementBuilder) -// .setSql(insertSQL) -// .build(); -// } + LOG.info("Named SQL: {}", namedSQL); + + // JDBC connection options + JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(jdbcUrl) + .withDriverName("org.postgresql.Driver") + .withUsername(username) + .withPassword(password) + .build(); + + // JDBC execution options + JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() + .withBatchSize(1000) + .withBatchIntervalMs(200) + .withMaxRetries(5) + .build(); + + // JDBC statement builder using NamedParameterPreparedStatement from axiomalaska + JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { + @Override + public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { + // Get the connection from the PreparedStatement + Connection connection = preparedStatement.getConnection(); + + // Create NamedParameterPreparedStatement using the axiomalaska library + // This library creates its own PreparedStatement internally from the connection and named SQL + try (NamedParameterPreparedStatement namedStmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, namedSQL)) { + + // Set parameters by name using the axiomalaska library + namedStmt.setString("symbol", stockPrice.getSymbol()); + + // Parse the ISO timestamp and convert to SQL Timestamp + LocalDateTime dateTime = LocalDateTime.parse(stockPrice.getTimestamp(), DateTimeFormatter.ISO_LOCAL_DATE_TIME); + namedStmt.setTimestamp("timestamp", Timestamp.valueOf(dateTime)); + + namedStmt.setBigDecimal("price", stockPrice.getPrice()); + + // Execute the statement + namedStmt.executeUpdate(); + } + } + }; + + // We need to provide a dummy SQL for Flink's JDBC sink since we're handling execution ourselves + // The actual SQL execution is done by NamedParameterPreparedStatement in the statement builder + String dummySQL = "SELECT 1"; + + // Use the deprecated but working JdbcSink.sink() method + return JdbcSink.sink(dummySQL, statementBuilder, executionOptions, connectionOptions); + } public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // Allows Flink to reuse objects across forwarded operators, as opposed to do a deep copy - // (this is safe because record objects are never mutated or passed by reference) - env.getConfig().enableObjectReuse(); - - LOG.info("Starting Flink JDBC Sink Job"); // Load application properties final Map applicationProperties = loadApplicationProperties(env); LOG.info("Application properties: {}", applicationProperties); - // Create a DataGeneratorSource that generates User objects - DataGeneratorSource source = createDataGeneratorSource( + // Create a DataGeneratorSource that generates StockPrice objects + DataGeneratorSource source = createDataGeneratorSource( applicationProperties.get("DataGen"), - new UserGeneratorFunction(), - TypeInformation.of(User.class) + new StockPriceGeneratorFunction(), + TypeInformation.of(StockPrice.class) ); // Create the data stream from the source - DataStream userStream = env.fromSource( + DataStream stockPriceStream = env.fromSource( source, WatermarkStrategy.noWatermarks(), - "User Data Generator" - ).uid("user-data-generator"); + "Stock Price Data Generator" + ).uid("stock-price-data-generator"); // Check if JDBC sink is configured Properties jdbcProperties = applicationProperties.get("JdbcSink"); @@ -228,18 +220,18 @@ public static void main(String[] args) throws Exception { } // Create JDBC sink - JdbcSink jdbcSink = createJdbcSink(jdbcProperties); - userStream.sinkTo(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); + SinkFunction jdbcSink = createJdbcSink(jdbcProperties); + stockPriceStream.addSink(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); // Add print sink for local testing if (isLocal(env)) { - userStream.print().uid("print-sink").name("Print Sink"); + stockPriceStream.print().uid("print-sink").name("Print Sink"); LOG.info("Print sink configured for local testing"); } LOG.info("JDBC sink configured"); // Execute the job - env.execute("Flink JDBC Sink Job"); + env.execute("Flink JDBC Sink Job - Stock Prices"); } } diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java new file mode 100644 index 0000000..6488044 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -0,0 +1,68 @@ +package com.amazonaws.services.msf.domain; + +import java.math.BigDecimal; +import java.util.Objects; + +public class StockPrice { + private String symbol; + + private String timestamp; + + private BigDecimal price; + + public StockPrice() {} + + public StockPrice(String symbol, String timestamp, BigDecimal price) { + this.symbol = symbol; + this.timestamp = timestamp; + this.price = price; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public BigDecimal getPrice() { + return price; + } + + public void setPrice(BigDecimal price) { + this.price = price; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StockPrice that = (StockPrice) o; + return Objects.equals(symbol, that.symbol) && + Objects.equals(timestamp, that.timestamp) && + Objects.equals(price, that.price); + } + + @Override + public int hashCode() { + return Objects.hash(symbol, timestamp, price); + } + + @Override + public String toString() { + return "StockPrice{" + + "symbol='" + symbol + '\'' + + ", timestamp='" + timestamp + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..9aa4271 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java @@ -0,0 +1,47 @@ +package com.amazonaws.services.msf.domain; + +import com.github.javafaker.Faker; +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; + +/** + * Generator function that creates realistic fake StockPrice objects using JavaFaker. + * Implements GeneratorFunction to work with DataGeneratorSource. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + // JavaFaker instance for generating fake data + private static final Faker faker = new Faker(Locale.ENGLISH); + + // Date formatter for ISO format timestamps + private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + + @Override + public StockPrice map(Long value) throws Exception { + // Generate current timestamp in ISO format + String timestamp = LocalDateTime.now().format(ISO_FORMATTER); + + // Use JavaFaker's Stock class to generate realistic NASDAQ ticker symbolsy + String symbol = faker.stock().nsdqSymbol(); + + // Generate realistic stock price between $1.00 and $500.00 + // Using faker to generate a base price and then applying some randomness + double basePrice = faker.number().randomDouble(2, 1, 500); + + // Add some volatility to make prices more realistic + // Apply a small random change (-5% to +5%) + double volatilityPercent = faker.number().randomDouble(4, -5, 5); + double finalPrice = basePrice * (1 + volatilityPercent / 100.0); + + // Ensure price is positive and round to 2 decimal places + finalPrice = Math.max(0.01, finalPrice); + BigDecimal price = BigDecimal.valueOf(finalPrice).setScale(2, RoundingMode.HALF_UP); + + return new StockPrice(symbol, timestamp, price); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java deleted file mode 100644 index f913a09..0000000 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/User.java +++ /dev/null @@ -1,196 +0,0 @@ -package com.amazonaws.services.msf.domain; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; - -public class User { - @JsonProperty("user_id") - private int userId; - - @JsonProperty("first_name") - private String firstName; - - @JsonProperty("last_name") - private String lastName; - - private String email; - - @JsonProperty("phone_number") - private String phoneNumber; - - private String address; - - private String city; - - private String country; - - @JsonProperty("job_title") - private String jobTitle; - - private String company; - - @JsonProperty("date_of_birth") - private String dateOfBirth; - - @JsonProperty("created_at") - private String createdAt; - - public User() {} - - public User(int userId, String firstName, String lastName, String email, String phoneNumber, - String address, String city, String country, String jobTitle, String company, - String dateOfBirth, String createdAt) { - this.userId = userId; - this.firstName = firstName; - this.lastName = lastName; - this.email = email; - this.phoneNumber = phoneNumber; - this.address = address; - this.city = city; - this.country = country; - this.jobTitle = jobTitle; - this.company = company; - this.dateOfBirth = dateOfBirth; - this.createdAt = createdAt; - } - - public int getUserId() { - return userId; - } - - public void setUserId(int userId) { - this.userId = userId; - } - - public String getFirstName() { - return firstName; - } - - public void setFirstName(String firstName) { - this.firstName = firstName; - } - - public String getLastName() { - return lastName; - } - - public void setLastName(String lastName) { - this.lastName = lastName; - } - - public String getEmail() { - return email; - } - - public void setEmail(String email) { - this.email = email; - } - - public String getPhoneNumber() { - return phoneNumber; - } - - public void setPhoneNumber(String phoneNumber) { - this.phoneNumber = phoneNumber; - } - - public String getAddress() { - return address; - } - - public void setAddress(String address) { - this.address = address; - } - - public String getCity() { - return city; - } - - public void setCity(String city) { - this.city = city; - } - - public String getCountry() { - return country; - } - - public void setCountry(String country) { - this.country = country; - } - - public String getJobTitle() { - return jobTitle; - } - - public void setJobTitle(String jobTitle) { - this.jobTitle = jobTitle; - } - - public String getCompany() { - return company; - } - - public void setCompany(String company) { - this.company = company; - } - - public String getDateOfBirth() { - return dateOfBirth; - } - - public void setDateOfBirth(String dateOfBirth) { - this.dateOfBirth = dateOfBirth; - } - - public String getCreatedAt() { - return createdAt; - } - - public void setCreatedAt(String createdAt) { - this.createdAt = createdAt; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - User user = (User) o; - return userId == user.userId && - Objects.equals(firstName, user.firstName) && - Objects.equals(lastName, user.lastName) && - Objects.equals(email, user.email) && - Objects.equals(phoneNumber, user.phoneNumber) && - Objects.equals(address, user.address) && - Objects.equals(city, user.city) && - Objects.equals(country, user.country) && - Objects.equals(jobTitle, user.jobTitle) && - Objects.equals(company, user.company) && - Objects.equals(dateOfBirth, user.dateOfBirth) && - Objects.equals(createdAt, user.createdAt); - } - - @Override - public int hashCode() { - return Objects.hash(userId, firstName, lastName, email, phoneNumber, address, - city, country, jobTitle, company, dateOfBirth, createdAt); - } - - @Override - public String toString() { - return "User{" + - "user_id=" + userId + - ", first_name='" + firstName + '\'' + - ", last_name='" + lastName + '\'' + - ", email='" + email + '\'' + - ", phone_number='" + phoneNumber + '\'' + - ", address='" + address + '\'' + - ", city='" + city + '\'' + - ", country='" + country + '\'' + - ", job_title='" + jobTitle + '\'' + - ", company='" + company + '\'' + - ", date_of_birth='" + dateOfBirth + '\'' + - ", created_at='" + createdAt + '\'' + - '}'; - } -} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java deleted file mode 100644 index c6aa8f8..0000000 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/UserGeneratorFunction.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.amazonaws.services.msf.domain; - -import com.github.javafaker.Faker; -import org.apache.flink.connector.datagen.source.GeneratorFunction; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Locale; -import java.util.concurrent.TimeUnit; - -/** - * Generator function that creates realistic fake User objects using JavaFaker. - * Implements GeneratorFunction to work with DataGeneratorSource. - */ -public class UserGeneratorFunction implements GeneratorFunction { - - // JavaFaker instance for generating fake data - private static final Faker faker = new Faker(Locale.ENGLISH); - - // Date formatter for ISO format timestamps - private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; - private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - - @Override - public User map(Long value) throws Exception { - // Generate user ID based on the sequence value - int userId = value.intValue(); - - // Generate current timestamp in ISO format - String createdAt = LocalDateTime.now().format(ISO_FORMATTER); - - // Generate fake personal information - String firstName = faker.name().firstName(); - String lastName = faker.name().lastName(); - String email = faker.internet().emailAddress(); - String phoneNumber = faker.phoneNumber().phoneNumber(); - - // Generate fake address information - String address = faker.address().streetAddress(); - String city = faker.address().city(); - String country = faker.address().country(); - - // Generate fake professional information - String jobTitle = faker.job().title(); - String company = faker.company().name(); - - // Generate fake date of birth (between 18 and 80 years ago) - String dateOfBirth = faker.date() - .past(365 * 62, TimeUnit.DAYS) // Up to 62 years ago - .toInstant() - .atZone(java.time.ZoneId.systemDefault()) - .toLocalDate() - .format(DATE_FORMATTER); - - // Ensure the person is at least 18 years old - if (LocalDateTime.now().minusYears(18).toLocalDate() - .isBefore(java.time.LocalDate.parse(dateOfBirth, DATE_FORMATTER))) { - dateOfBirth = LocalDateTime.now().minusYears(18 + faker.number().numberBetween(0, 44)) - .toLocalDate().format(DATE_FORMATTER); - } - - return new User(userId, firstName, lastName, email, phoneNumber, address, - city, country, jobTitle, company, dateOfBirth, createdAt); - } -} diff --git a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json index e5a6885..43f22a5 100644 --- a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json +++ b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json @@ -11,7 +11,7 @@ "url": "jdbc:postgresql://localhost:5432/testdb", "username": "flinkuser", "password": "flinkpassword", - "table.name": "users" + "table.name": "prices" } } ] diff --git a/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java b/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java new file mode 100644 index 0000000..a254ace --- /dev/null +++ b/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java @@ -0,0 +1,82 @@ +package com.amazonaws.services.msf; + +import com.axiomalaska.jdbc.NamedParameterPreparedStatement; +import org.junit.Test; +import static org.junit.Assert.*; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * Unit tests for NamedParameterPreparedStatement integration + * Tests the usage of axiomalaska library for named parameters + */ +public class NamedParameterPreparedStatementTest { + + @Test + public void testNamedParameterPreparedStatementCreation() { + String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; + + // Test that we can create the SQL string correctly + assertNotNull("Named SQL should not be null", namedSQL); + assertTrue("SQL should contain named parameters", namedSQL.contains(":symbol")); + assertTrue("SQL should contain named parameters", namedSQL.contains(":timestamp")); + assertTrue("SQL should contain named parameters", namedSQL.contains(":price")); + assertTrue("SQL should contain UPSERT syntax", namedSQL.contains("ON CONFLICT")); + } + + @Test + public void testSQLStructure() { + String tableName = "prices"; + String namedSQL = String.format( + "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", + tableName + ); + + String expectedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; + assertEquals("SQL should be formatted correctly", expectedSQL, namedSQL); + } + + @Test + public void testTableNameSubstitution() { + String tableName = "test_prices"; + String namedSQL = String.format( + "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", + tableName + ); + + assertTrue("SQL should contain the correct table name", namedSQL.contains("test_prices")); + assertFalse("SQL should not contain placeholder", namedSQL.contains("%s")); + } + + @Test + public void testParameterNames() { + String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; + + // Test that all expected parameter names are present + String[] expectedParams = {":symbol", ":timestamp", ":price"}; + + for (String param : expectedParams) { + assertTrue("SQL should contain parameter " + param, namedSQL.contains(param)); + } + } + + @Test + public void testSQLSyntax() { + String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + + "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; + + // Basic SQL syntax checks + assertTrue("SQL should start with INSERT", namedSQL.startsWith("INSERT")); + assertTrue("SQL should contain INTO", namedSQL.contains("INTO")); + assertTrue("SQL should contain VALUES", namedSQL.contains("VALUES")); + assertTrue("SQL should contain ON CONFLICT for UPSERT", namedSQL.contains("ON CONFLICT")); + assertTrue("SQL should contain DO UPDATE SET", namedSQL.contains("DO UPDATE SET")); + assertTrue("SQL should have proper parentheses", namedSQL.contains("(") && namedSQL.contains(")")); + } +} diff --git a/java/JdbcSink/test-local.sh b/java/JdbcSink/test-local.sh deleted file mode 100755 index 103f3e0..0000000 --- a/java/JdbcSink/test-local.sh +++ /dev/null @@ -1,81 +0,0 @@ -#!/bin/bash - -# Test script for JdbcSink example with JavaFaker -set -e - -echo "🚀 Starting JdbcSink Example Test (JavaFaker Edition)" - -# Check if Docker is running -if ! docker info > /dev/null 2>&1; then - echo "❌ Docker is not running. Please start Docker first." - exit 1 -fi - -echo "✅ Docker is running" - -# Navigate to docker directory -cd docker - -# Start PostgreSQL -echo "🐘 Starting PostgreSQL..." -docker-compose up -d - -# Wait for PostgreSQL to be ready -echo "⏳ Waiting for PostgreSQL to be ready..." -timeout=60 -counter=0 -while ! docker-compose exec -T postgres pg_isready -U flinkuser -d testdb > /dev/null 2>&1; do - if [ $counter -ge $timeout ]; then - echo "❌ PostgreSQL failed to start within $timeout seconds" - docker-compose logs postgres - exit 1 - fi - sleep 1 - counter=$((counter + 1)) -done - -echo "✅ PostgreSQL is ready" - -# Check if table was created -echo "🔍 Checking if users table was created..." -if docker-compose exec -T postgres psql -U flinkuser -d testdb -c "\dt" | grep -q users; then - echo "✅ Users table exists" -else - echo "❌ Users table was not created" - exit 1 -fi - -# Show table structure -echo "📋 Table structure:" -docker-compose exec -T postgres psql -U flinkuser -d testdb -c "\d users" - -# Show initial data -echo "📊 Initial data:" -docker-compose exec -T postgres psql -U flinkuser -d testdb -c "SELECT * FROM users;" - -# Go back to project root -cd .. - -# Build the project -echo "🔨 Building the project..." -mvn clean package -q - -if [ $? -eq 0 ]; then - echo "✅ Build successful" -else - echo "❌ Build failed" - exit 1 -fi - -echo "" -echo "🎉 Setup complete! You can now:" -echo " 1. Run the Flink job: mvn exec:java -Dexec.mainClass=\"com.amazonaws.services.msf.JdbcSinkJob\"" -echo " 2. Monitor data: ./monitor.sh" -echo " 3. Stop PostgreSQL: docker-compose -f docker/docker-compose.yml down" -echo "" -echo "📝 Database connection details:" -echo " URL: jdbc:postgresql://localhost:5432/testdb" -echo " Username: flinkuser" -echo " Password: flinkpassword" -echo "" -echo "🎭 This example uses JavaFaker to generate realistic fake user data!" From 0386d0a411f1ca4cb59015b069382a2709a82e4b Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Thu, 31 Jul 2025 17:58:42 +0100 Subject: [PATCH 3/7] Refactored to use new JdbcSink --- .../amazonaws/services/msf/JdbcSinkJob.java | 140 ++++++------------ .../services/msf/domain/StockPrice.java | 11 +- .../domain/StockPriceGeneratorFunction.java | 10 +- ...tockPricePostgresUpsertQueryStatement.java | 88 +++++++++++ 4 files changed, 143 insertions(+), 106 deletions(-) create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPricePostgresUpsertQueryStatement.java diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java index a1764bd..99c4711 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java @@ -3,7 +3,7 @@ import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import com.amazonaws.services.msf.domain.StockPrice; import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; -import com.axiomalaska.jdbc.NamedParameterPreparedStatement; +import com.amazonaws.services.msf.jdbc.StockPricePostgresUpsertQueryStatement; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; @@ -11,24 +11,16 @@ import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.JdbcStatementBuilder; -import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.Properties; @@ -66,12 +58,6 @@ private static Map loadApplicationProperties(StreamExecution /** * Create a DataGeneratorSource with configurable rate from DataGen properties - * - * @param dataGenProperties Properties from the "DataGen" property group - * @param generatorFunction The generator function to use for data generation - * @param typeInformation Type information for the generated data type - * @param The type of data to generate - * @return Configured DataGeneratorSource */ private static DataGeneratorSource createDataGeneratorSource( Properties dataGenProperties, @@ -109,86 +95,53 @@ private static DataGeneratorSource createDataGeneratorSource( } - /** - * Create a JDBC Sink for PostgreSQL using NamedParameterPreparedStatement from axiomalaska library - * - * @param jdbcProperties Properties from the "JdbcSink" property group - * @return an instance of SinkFunction for StockPrice objects + * Create the JDBC Sink */ - private static SinkFunction createJdbcSink(Properties jdbcProperties) { + private static JdbcSink createUpsertJdbcSink(Properties sinkProperties) { + Preconditions.checkNotNull(sinkProperties, "JdbcSink configuration group missing"); + + // This example is designed for PostgreSQL. Switching to a different RDBMS requires modifying the JdbcQueryStatement + // implementation which depends on the upsert syntax of the specific RDBMS. + String jdbcDriver = "org.postgresql.Driver"; + + String jdbcUrl = Preconditions.checkNotNull( - jdbcProperties.getProperty("url"), + sinkProperties.getProperty("url"), "JDBC URL is required" ); - String username = Preconditions.checkNotNull( - jdbcProperties.getProperty("username"), + String dbUser = Preconditions.checkNotNull( + sinkProperties.getProperty("username"), "JDBC username is required" ); - String password = Preconditions.checkNotNull( - jdbcProperties.getProperty("password"), + // In the real application the password should have been encrypted or fetched at runtime + String dbPassword = Preconditions.checkNotNull( + sinkProperties.getProperty("password"), "JDBC password is required" ); - String tableName = jdbcProperties.getProperty("table.name", "prices"); - - // SQL statement leveraging PostgreSQL UPSERT syntax - String namedSQL = String.format( - "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", - tableName - ); - LOG.info("Named SQL: {}", namedSQL); - - // JDBC connection options - JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withUrl(jdbcUrl) - .withDriverName("org.postgresql.Driver") - .withUsername(username) - .withPassword(password) - .build(); - - // JDBC execution options - JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() - .withBatchSize(1000) - .withBatchIntervalMs(200) - .withMaxRetries(5) - .build(); - - // JDBC statement builder using NamedParameterPreparedStatement from axiomalaska - JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { - @Override - public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { - // Get the connection from the PreparedStatement - Connection connection = preparedStatement.getConnection(); - - // Create NamedParameterPreparedStatement using the axiomalaska library - // This library creates its own PreparedStatement internally from the connection and named SQL - try (NamedParameterPreparedStatement namedStmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, namedSQL)) { - - // Set parameters by name using the axiomalaska library - namedStmt.setString("symbol", stockPrice.getSymbol()); - - // Parse the ISO timestamp and convert to SQL Timestamp - LocalDateTime dateTime = LocalDateTime.parse(stockPrice.getTimestamp(), DateTimeFormatter.ISO_LOCAL_DATE_TIME); - namedStmt.setTimestamp("timestamp", Timestamp.valueOf(dateTime)); - - namedStmt.setBigDecimal("price", stockPrice.getPrice()); - - // Execute the statement - namedStmt.executeUpdate(); - } - } - }; - - // We need to provide a dummy SQL for Flink's JDBC sink since we're handling execution ourselves - // The actual SQL execution is done by NamedParameterPreparedStatement in the statement builder - String dummySQL = "SELECT 1"; - - // Use the deprecated but working JdbcSink.sink() method - return JdbcSink.sink(dummySQL, statementBuilder, executionOptions, connectionOptions); + String tableName = sinkProperties.getProperty("table.name", "prices"); + + return JdbcSink.builder() + // The JdbcQueryStatement implementation provides the SQL statement template and converts the input record + // into parameters passed to the statement. + .withQueryStatement(new StockPricePostgresUpsertQueryStatement(tableName)) + .withExecutionOptions(JdbcExecutionOptions.builder() + .withBatchSize(100) + .withBatchIntervalMs(200L) + .withMaxRetries(5) + .build()) + // Using a simple connection provider which does not reuse connection + .buildAtLeastOnce(new SimpleJdbcConnectionProvider(new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(jdbcUrl) + .withDriverName(jdbcDriver) + .withUsername(dbUser) + .withPassword(dbPassword) + .build()) + ); } + public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -199,8 +152,9 @@ public static void main(String[] args) throws Exception { LOG.info("Application properties: {}", applicationProperties); // Create a DataGeneratorSource that generates StockPrice objects + Properties dataGenProperties = applicationProperties.get("DataGen"); DataGeneratorSource source = createDataGeneratorSource( - applicationProperties.get("DataGen"), + dataGenProperties, new StockPriceGeneratorFunction(), TypeInformation.of(StockPrice.class) ); @@ -212,16 +166,12 @@ public static void main(String[] args) throws Exception { "Stock Price Data Generator" ).uid("stock-price-data-generator"); - // Check if JDBC sink is configured - Properties jdbcProperties = applicationProperties.get("JdbcSink"); - if (jdbcProperties == null) { - throw new IllegalArgumentException( - "JdbcSink configuration is required. Please provide 'JdbcSink' configuration group."); - } + // Create the JDBC sink + Properties sinkProperties = applicationProperties.get("JdbcSink"); + JdbcSink jdbcSink = createUpsertJdbcSink(sinkProperties); - // Create JDBC sink - SinkFunction jdbcSink = createJdbcSink(jdbcProperties); - stockPriceStream.addSink(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); + // Attach the sink + stockPriceStream.sinkTo(jdbcSink).uid("jdbc-sink").name("PostgreSQL Sink"); // Add print sink for local testing if (isLocal(env)) { diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java index 6488044..5c0bc8e 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java @@ -1,18 +1,19 @@ package com.amazonaws.services.msf.domain; import java.math.BigDecimal; +import java.time.Instant; import java.util.Objects; public class StockPrice { private String symbol; - private String timestamp; + private Instant timestamp; private BigDecimal price; public StockPrice() {} - public StockPrice(String symbol, String timestamp, BigDecimal price) { + public StockPrice(String symbol, Instant timestamp, BigDecimal price) { this.symbol = symbol; this.timestamp = timestamp; this.price = price; @@ -26,11 +27,11 @@ public void setSymbol(String symbol) { this.symbol = symbol; } - public String getTimestamp() { + public Instant getTimestamp() { return timestamp; } - public void setTimestamp(String timestamp) { + public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; } @@ -61,7 +62,7 @@ public int hashCode() { public String toString() { return "StockPrice{" + "symbol='" + symbol + '\'' + - ", timestamp='" + timestamp + '\'' + + ", timestamp='" + timestamp.toString() + '\'' + ", price=" + price + '}'; } diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java index 9aa4271..b4aceb5 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunction.java @@ -5,6 +5,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.Instant; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Locale; @@ -18,13 +19,9 @@ public class StockPriceGeneratorFunction implements GeneratorFunction + * This class wraps both the parametrized SQL statement to be executed and replacing the parameters in the prepared statement. + *

+ * The table name can be decided when the sink is instantiated. + */ +public class StockPricePostgresUpsertQueryStatement implements JdbcQueryStatement { + + /** + * Template for the SQL statement executing the upsert. Depends on the specific RDBMS syntax. + * The name of the table is parametric (`%s`) + */ + private static final String UPSERT_QUERY_TEMPLATE = + "INSERT INTO %s (symbol, price, timestamp) VALUES (?, ?, ?) " + + "ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ?"; + + private final String sql; + private final JdbcStatementBuilder statementBuilder = new JdbcStatementBuilder() { + + /** + * Replace the positional parameters in the prepared statement. + * The implementation of this method depends on the SQL statement which, in turn, is specific of the RDBMS. + * + * @param preparedStatement the prepared statement + * @param stockPrice the StockPrice to upsert + * @throws SQLException exception thrown replacing parameters + */ + @Override + public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { + String symbol = stockPrice.getSymbol(); + Timestamp timestamp = Timestamp.from(stockPrice.getTimestamp()); + BigDecimal price = stockPrice.getPrice(); + + // Replace the parameters positionally + preparedStatement.setString(1, symbol); + preparedStatement.setBigDecimal(2, price); + preparedStatement.setTimestamp(3, timestamp); + preparedStatement.setBigDecimal(4, price); + preparedStatement.setTimestamp(5, timestamp); + } + }; + + /** + * Create an UPSERT stock price query statement for a given table name. + * Note that, while the values are passed at runtime, the table name must be defined when the sink is instantiated, + * on start. + * + * @param tableName name of the table + */ + public StockPricePostgresUpsertQueryStatement(String tableName) { + this.sql = String.format(UPSERT_QUERY_TEMPLATE, tableName); + } + + /** + * Returns the SQL of the PreparedStatement + * + * @return SQL + */ + @Override + public String query() { + return sql; + } + + /** + * Called by the sink for every record to be upserted. + * The PreparedStatement is mutated, replacing the parameters extracted from the record + * + * @param preparedStatement prepared statement + * @param stockPrice record + * @throws SQLException any exception thrown during parameter replacement + */ + @Override + public void statement(PreparedStatement preparedStatement, StockPrice stockPrice) throws SQLException { + statementBuilder.accept(preparedStatement, stockPrice); + } +} From 2d164829233ff77217d4ec63cff068043a35366e Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Thu, 31 Jul 2025 18:07:13 +0100 Subject: [PATCH 4/7] Removed unused dependencies --- java/JdbcSink/README.md | 26 +++++- java/JdbcSink/pom.xml | 13 --- .../NamedParameterPreparedStatementTest.java | 82 ------------------- 3 files changed, 23 insertions(+), 98 deletions(-) delete mode 100644 java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index efa0b40..ee545bf 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -8,22 +8,42 @@ The example leverages the UPSERT functionality of PostgreSQL. * Language: Java (11) * Flink connectors: JDBC sink, DataGen -### Generated Data Schema +### Data The application generates comprehensive `StockPrice` objects with realistic fake data: ```json { - "price_id": 1, "symbol": "AAPL", "timestamp": "2024-07-25T10:30:45", "price": 150.25 } ``` +This data are written doing upsert in the following database table, containing the latest price for every symbol. + +```sql +CREATE TABLE prices ( + symbol VARCHAR(10) PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + price DECIMAL(10,2) NOT NULL +); +``` + +The sink uses the PostgreSQL upsert syntax: + +``` +INSERT INTO prices (symbol, price, timestamp) VALUES (?, ?, ?) + ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ? +``` + +This is specific to PostgreSQL, but the code can be adjusted to other databases as long as the SQL syntax support doing +an upsert with a single SQL statement. + ### Database prerequisites -When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the database manually, ensuring you set up all the following: +When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the database manually, +ensuring you set up all the following: > You can find the SQL script that sets up the dockerized database by checking out the init script for > [PostgreSQL](docker/postgres-init/init.sql). diff --git a/java/JdbcSink/pom.xml b/java/JdbcSink/pom.xml index 8f124fe..b38468d 100644 --- a/java/JdbcSink/pom.xml +++ b/java/JdbcSink/pom.xml @@ -86,13 +86,6 @@ ${postgresql.jdbc.driver.version} - - - com.axiomalaska - jdbc-named-parameters - 1.1 - - com.github.javafaker @@ -100,12 +93,6 @@ 1.0.2 - - - com.fasterxml.jackson.core - jackson-annotations - 2.15.2 - diff --git a/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java b/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java deleted file mode 100644 index a254ace..0000000 --- a/java/JdbcSink/src/test/java/com/amazonaws/services/msf/NamedParameterPreparedStatementTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.amazonaws.services.msf; - -import com.axiomalaska.jdbc.NamedParameterPreparedStatement; -import org.junit.Test; -import static org.junit.Assert.*; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -/** - * Unit tests for NamedParameterPreparedStatement integration - * Tests the usage of axiomalaska library for named parameters - */ -public class NamedParameterPreparedStatementTest { - - @Test - public void testNamedParameterPreparedStatementCreation() { - String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; - - // Test that we can create the SQL string correctly - assertNotNull("Named SQL should not be null", namedSQL); - assertTrue("SQL should contain named parameters", namedSQL.contains(":symbol")); - assertTrue("SQL should contain named parameters", namedSQL.contains(":timestamp")); - assertTrue("SQL should contain named parameters", namedSQL.contains(":price")); - assertTrue("SQL should contain UPSERT syntax", namedSQL.contains("ON CONFLICT")); - } - - @Test - public void testSQLStructure() { - String tableName = "prices"; - String namedSQL = String.format( - "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", - tableName - ); - - String expectedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; - assertEquals("SQL should be formatted correctly", expectedSQL, namedSQL); - } - - @Test - public void testTableNameSubstitution() { - String tableName = "test_prices"; - String namedSQL = String.format( - "INSERT INTO %s (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp", - tableName - ); - - assertTrue("SQL should contain the correct table name", namedSQL.contains("test_prices")); - assertFalse("SQL should not contain placeholder", namedSQL.contains("%s")); - } - - @Test - public void testParameterNames() { - String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; - - // Test that all expected parameter names are present - String[] expectedParams = {":symbol", ":timestamp", ":price"}; - - for (String param : expectedParams) { - assertTrue("SQL should contain parameter " + param, namedSQL.contains(param)); - } - } - - @Test - public void testSQLSyntax() { - String namedSQL = "INSERT INTO prices (symbol, timestamp, price) VALUES (:symbol, :timestamp, :price) " + - "ON CONFLICT(symbol) DO UPDATE SET price = :price, timestamp = :timestamp"; - - // Basic SQL syntax checks - assertTrue("SQL should start with INSERT", namedSQL.startsWith("INSERT")); - assertTrue("SQL should contain INTO", namedSQL.contains("INTO")); - assertTrue("SQL should contain VALUES", namedSQL.contains("VALUES")); - assertTrue("SQL should contain ON CONFLICT for UPSERT", namedSQL.contains("ON CONFLICT")); - assertTrue("SQL should contain DO UPDATE SET", namedSQL.contains("DO UPDATE SET")); - assertTrue("SQL should have proper parentheses", namedSQL.contains("(") && namedSQL.contains(")")); - } -} From 2d0113197b214f19c2402a9fe05c23faf1b75efd Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 8 Aug 2025 09:13:05 +0100 Subject: [PATCH 5/7] Finalize the example --- README.md | 1 + java/JdbcSink/README.md | 105 +++++++++++------ .../services/msf/ConfigurationHelper.java | 109 ++++++++++++++++++ .../amazonaws/services/msf/JdbcSinkJob.java | 70 +++++------ ...va => StockPriceUpsertQueryStatement.java} | 11 +- .../flink-application-properties-dev.json | 5 +- 6 files changed, 221 insertions(+), 80 deletions(-) create mode 100644 java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java rename java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/{StockPricePostgresUpsertQueryStatement.java => StockPriceUpsertQueryStatement.java} (86%) diff --git a/README.md b/README.md index 39ad86a..bbaa3b8 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f - [**SQS Sink**](./java/SQSSink) - Writing data to Amazon SQS - [**Prometheus Sink**](./java/PrometheusSink) - Sending metrics to Prometheus - [**Flink CDC**](./java/FlinkCDC) - Change Data Capture examples using Flink CDC +- [**JdbcSink**](./java/JdbcSink) - Writes to a relational database executing upsert statements #### Reading and writing files and transactional data lake formats - [**Iceberg**](./java/Iceberg) - Working with Apache Iceberg and Amazon S3 Tables diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index ee545bf..d283c9a 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -1,13 +1,31 @@ ## Flink JDBC Sink -This example demonstrates how to use Apache Flink's DataStream API JDBC Sink to execute UPSERT into a relational database. -The example leverages the UPSERT functionality of PostgreSQL. +This example demonstrates how to use the DataStream API JdbcSink to write into a relational database. * Flink version: 1.20 * Flink API: DataStream * Language: Java (11) * Flink connectors: JDBC sink, DataGen +This example demonstrates how to do UPSERT into a relational database. +The example uses the UPSERT syntax of PostgreSQL, but it can be easily adapted to the syntaxes of other databases or into +an append-only sink, with an INSERT INTO statement. + +#### Which JdbcSink? + +At the moment of publishing this example (August 2025) there are two different DataStream API JdbcSink implementations, +available with the `org.apache.flink:flink-connector-jdbc:3.3.0-1.20` dependency. + +1. The new `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink` which uses the Sink API V2 and + is initialized using a builder: `JdbcSink.builder()..build()` +2. The legacy `org.apache.flink.connector.jdbc.JdbcSink` which uses the legacy `SinkFunction` API now deprecated. + The legacy sink is initialized with the syntax `JdbcSink.sink(...)` + +This example uses the new sink. + +At the moment of publishing this example (August 2025) the Apache Flink documentation +[still refers to the deprecated sink](https://nightlies.apache.org/flink/flink-docs-lts/docs/connectors/datastream/jdbc/#jdbcsinksink). + ### Data The application generates comprehensive `StockPrice` objects with realistic fake data: @@ -15,20 +33,12 @@ The application generates comprehensive `StockPrice` objects with realistic fake ```json { "symbol": "AAPL", - "timestamp": "2024-07-25T10:30:45", + "timestamp": "2025-08-07T10:30:45", "price": 150.25 } ``` -This data are written doing upsert in the following database table, containing the latest price for every symbol. - -```sql -CREATE TABLE prices ( - symbol VARCHAR(10) PRIMARY KEY, - timestamp TIMESTAMP NOT NULL, - price DECIMAL(10,2) NOT NULL -); -``` +This data is written using upsert in the following database table, containing the latest price for every symbol. The sink uses the PostgreSQL upsert syntax: @@ -37,9 +47,29 @@ INSERT INTO prices (symbol, price, timestamp) VALUES (?, ?, ?) ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ? ``` -This is specific to PostgreSQL, but the code can be adjusted to other databases as long as the SQL syntax support doing +This is specific to PostgreSQL, but the code can be adjusted to other databases as long as the SQL syntax supports doing an upsert with a single SQL statement. +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|------------|----------------------|-------------------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | +| `JdbcSink` | `url` | PostgreSQL JDBC URL. e.g. `jdbc:postgresql://your-rds-endpoint:5432/your-database`. Note: the URL includes the database name. | +| `JdbcSink` | `table.name` | Destination table. e.g. `prices` (default: "prices") | +| `JdbcSink` | `username` | Database user with INSERT and UPDATE permissions | +| `JdbcSink` | `password` | Database password | +| `JdbcSink` | `batch.size` | Number of records to batch before executing the SQL statement (default: 100) | +| `JdbcSink` | `batch.interval.ms` | Maximum time in milliseconds to wait before executing a batch (default: 200) | +| `JdbcSink` | `max.retries` | Maximum number of retries for failed database operations (default: 5) | + + ### Database prerequisites When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the database manually, @@ -61,7 +91,7 @@ ensuring you set up all the following: 3. The database user must have SELECT, INSERT, and UPDATE permissions on the prices table -## Testing with local database using Docker Compose +### Testing with local database using Docker Compose This example can be run locally using Docker. @@ -74,22 +104,6 @@ To start the local database run `docker compose up -d` in the `./docker` folder. Use `docker compose down -v` to shut it down, also removing the data volumes. -### Runtime configuration - -When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. - -When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder. - -Runtime parameters: - -| Group ID | Key | Description | -|------------|--------------------|----------------------------------------------------------------------------------------------------------------------------| -| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) | -| `JdbcSink` | `url` | PostgreSQL JDBC URL. e.g. `jdbc:postgresql://your-rds-endpoint:5432/your-database`. Note: the URL includes the database name. | -| `JdbcSink` | `table.name` | Destination table. e.g. `prices` (default: "prices") | -| `JdbcSink` | `username` | Database user with INSERT and UPDATE permissions | -| `JdbcSink` | `password` | Database password | - ### Running in IntelliJ @@ -110,8 +124,33 @@ To run the application in Amazon Managed Service for Apache Flink make sure the ### Security Considerations For production deployments: -1. Store database credentials in AWS Secrets Manager +1. Store database credentials in AWS Secrets Manager. 2. Use VPC endpoints for secure database connectivity 3. Enable SSL/TLS for database connections -4. Configure appropriate IAM roles and policies -5. Use RDS with encryption at rest and in transit + +### Implementation considerations + +#### At-least-once or exactly-once + +This implementation leverages the at-least-once mode of the JdbcSink. This is normally sufficient when the sink is +executing a single idempotent statement such as an UPSERT: any duplicate will just overwrite the same record. + +The JdbcSink also supports exactly-once mode which leverages XA transactions synchronized with Flink checkpoints, +and relies on XADataSource. This prevents duplicate writes in case of failure and restart from checkpoint. Note that it +does not prevent duplicates if you restart the application from an older Snapshot (Flink Savepoint), unless your SQL statement +implements some form of idempotency. + +#### No connection pooler? + +The JdbcSink does not support using any database connection pooler, such as HikariCP. + +The reason is that no connection pooling is required. The sink will open one database connection per parallelism (one per subtask), +and reuse these connections unless they get closed. + +#### Batching + +The JdbcSink batches writes to reduce the number of requests to the database. +The batch size and interval used in this example are for demonstrational purposes only. + +You should test your actual application with a realistic throughput and realistic data to optimize these values for your +workload. diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java new file mode 100644 index 0000000..92a0a11 --- /dev/null +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/ConfigurationHelper.java @@ -0,0 +1,109 @@ +package com.amazonaws.services.msf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.function.Function; + +public class ConfigurationHelper { + private static final Logger LOG = LoggerFactory.getLogger(ConfigurationHelper.class); + + /** + * Generic method to extract and parse numeric parameters from Properties + * All parameters are validated to be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @param parser Function to parse the string value to the desired numeric type + * @param The numeric type (Integer, Long, etc.) + * @return The extracted and validated value or the default value + */ + private static T extractNumericParameter( + Properties properties, + String parameterName, + T defaultValue, + Function parser) { + + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + return defaultValue; + } + + try { + T value = parser.apply(parameterValue.trim()); + + if (value.doubleValue() <= 0) { + throw new IllegalArgumentException( + String.format("Parameter %s with value %s must be positive", parameterName, value)); + } + + return value; + } catch (IllegalArgumentException e) { + LOG.error("Invalid {} value: '{}'. Must be a valid positive {} value. Using default: {}", + parameterName, parameterValue, defaultValue.getClass().getSimpleName().toLowerCase(), defaultValue); + return defaultValue; + } + } + + /** + * Extract a required string parameter from Properties + * Throws IllegalArgumentException if the parameter is missing or empty + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param errorMessage The error message to use if parameter is missing + * @return The extracted string value (never null or empty) + * @throws IllegalArgumentException if the parameter is missing or empty + */ + static String extractRequiredStringParameter(Properties properties, String parameterName, String errorMessage) { + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + throw new IllegalArgumentException(errorMessage); + } + return parameterValue.trim(); + } + + /** + * Extract an optional string parameter from Properties with default fallback + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or empty + * @return The extracted string value or the default value + */ + static String extractStringParameter(Properties properties, String parameterName, String defaultValue) { + String parameterValue = properties.getProperty(parameterName); + if (parameterValue == null || parameterValue.trim().isEmpty()) { + return defaultValue; + } + return parameterValue.trim(); + } + + /** + * Extract an integer parameter from Properties with validation and default fallback + * The parameter must be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @return The extracted integer value or the default value + */ + static int extractIntParameter(Properties properties, String parameterName, int defaultValue) { + return extractNumericParameter(properties, parameterName, defaultValue, Integer::parseInt); + } + + /** + * Extract a long parameter from Properties with validation and default fallback + * The parameter must be positive (> 0) + * + * @param properties The Properties object to extract from + * @param parameterName The name of the parameter to extract + * @param defaultValue The default value to use if parameter is missing or invalid + * @return The extracted long value or the default value + */ + static long extractLongParameter(Properties properties, String parameterName, long defaultValue) { + return extractNumericParameter(properties, parameterName, defaultValue, Long::parseLong); + } +} diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java index 99c4711..d90b09f 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/JdbcSinkJob.java @@ -3,7 +3,7 @@ import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import com.amazonaws.services.msf.domain.StockPrice; import com.amazonaws.services.msf.domain.StockPriceGeneratorFunction; -import com.amazonaws.services.msf.jdbc.StockPricePostgresUpsertQueryStatement; +import com.amazonaws.services.msf.jdbc.StockPriceUpsertQueryStatement; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Properties; +import static com.amazonaws.services.msf.ConfigurationHelper.*; + /** * A Flink application that generates random stock price data using DataGeneratorSource * and writes it to a PostgreSQL database using the JDBC connector. @@ -36,6 +38,9 @@ public class JdbcSinkJob { // Default values for configuration private static final int DEFAULT_RECORDS_PER_SECOND = 10; + private static final int DEFAULT_BATCH_SIZE = 100; + private static final long DEFAULT_BATCH_INTERVAL_MS = 200L; + private static final int DEFAULT_MAX_RETRIES = 5; private static boolean isLocal(StreamExecutionEnvironment env) { return env instanceof LocalStreamEnvironment; @@ -64,27 +69,10 @@ private static DataGeneratorSource createDataGeneratorSource( GeneratorFunction generatorFunction, TypeInformation typeInformation) { - int recordsPerSecond; - if (dataGenProperties != null) { - String recordsPerSecondStr = dataGenProperties.getProperty("records.per.second"); - if (recordsPerSecondStr != null && !recordsPerSecondStr.trim().isEmpty()) { - try { - recordsPerSecond = Integer.parseInt(recordsPerSecondStr.trim()); - } catch (NumberFormatException e) { - LOG.error("Invalid records.per.second value: '{}'. Must be a valid integer. ", recordsPerSecondStr); - throw e; - } - } else { - LOG.info("No records.per.second configured. Using default: {}", DEFAULT_RECORDS_PER_SECOND); - recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; - } - } else { - LOG.info("No DataGen properties found. Using default records per second: {}", DEFAULT_RECORDS_PER_SECOND); - recordsPerSecond = DEFAULT_RECORDS_PER_SECOND; - } + Preconditions.checkNotNull(dataGenProperties, "DataGen configuration group missing"); + + int recordsPerSecond = extractIntParameter(dataGenProperties, "records.per.second", DEFAULT_RECORDS_PER_SECOND); - Preconditions.checkArgument(recordsPerSecond > 0, - "Invalid records.per.second value. Must be positive."); return new DataGeneratorSource( generatorFunction, @@ -101,37 +89,35 @@ private static DataGeneratorSource createDataGeneratorSource( private static JdbcSink createUpsertJdbcSink(Properties sinkProperties) { Preconditions.checkNotNull(sinkProperties, "JdbcSink configuration group missing"); - // This example is designed for PostgreSQL. Switching to a different RDBMS requires modifying the JdbcQueryStatement - // implementation which depends on the upsert syntax of the specific RDBMS. + // This example is designed for PostgreSQL. Switching to a different RDBMS requires modifying + // StockPriceUpsertQueryStatement implementation which depends on the upsert syntax of the specific RDBMS. String jdbcDriver = "org.postgresql.Driver"; - - String jdbcUrl = Preconditions.checkNotNull( - sinkProperties.getProperty("url"), - "JDBC URL is required" - ); - String dbUser = Preconditions.checkNotNull( - sinkProperties.getProperty("username"), - "JDBC username is required" - ); + String jdbcUrl = extractRequiredStringParameter(sinkProperties, "url", "JDBC URL is required"); + String dbUser = extractRequiredStringParameter(sinkProperties, "username", "JDBC username is required"); // In the real application the password should have been encrypted or fetched at runtime - String dbPassword = Preconditions.checkNotNull( - sinkProperties.getProperty("password"), - "JDBC password is required" - ); + String dbPassword = extractRequiredStringParameter(sinkProperties, "password", "JDBC password is required"); + + String tableName = extractStringParameter(sinkProperties, "table.name", "prices"); + + int batchSize = extractIntParameter(sinkProperties, "batch.size", DEFAULT_BATCH_SIZE); + long batchIntervalMs = extractLongParameter(sinkProperties, "batch.interval.ms", DEFAULT_BATCH_INTERVAL_MS); + int maxRetries = extractIntParameter(sinkProperties, "max.retries", DEFAULT_MAX_RETRIES); - String tableName = sinkProperties.getProperty("table.name", "prices"); + LOG.info("JDBC Sink configuration - batchSize: {}, batchIntervalMs: {}, maxRetries: {}", + batchSize, batchIntervalMs, maxRetries); return JdbcSink.builder() // The JdbcQueryStatement implementation provides the SQL statement template and converts the input record // into parameters passed to the statement. - .withQueryStatement(new StockPricePostgresUpsertQueryStatement(tableName)) + .withQueryStatement(new StockPriceUpsertQueryStatement(tableName)) .withExecutionOptions(JdbcExecutionOptions.builder() - .withBatchSize(100) - .withBatchIntervalMs(200L) - .withMaxRetries(5) + .withBatchSize(batchSize) + .withBatchIntervalMs(batchIntervalMs) + .withMaxRetries(maxRetries) .build()) - // Using a simple connection provider which does not reuse connection + // The SimpleJdbcConnectionProvider is good enough in this case. The connector will open one db connection per parallelism + // and reuse the same connection on every write. There is no need of a connection pooler .buildAtLeastOnce(new SimpleJdbcConnectionProvider(new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(jdbcUrl) .withDriverName(jdbcDriver) diff --git a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPricePostgresUpsertQueryStatement.java b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java similarity index 86% rename from java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPricePostgresUpsertQueryStatement.java rename to java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java index cd9d15b..7b5483d 100644 --- a/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPricePostgresUpsertQueryStatement.java +++ b/java/JdbcSink/src/main/java/com/amazonaws/services/msf/jdbc/StockPriceUpsertQueryStatement.java @@ -10,13 +10,16 @@ import java.sql.Timestamp; /** - * Query statement for an upsert of a StockPrice, leveraging the PostgreSQL upsert syntax. + * Query statement for an upsert of a StockPrice, leveraging the PostgreSQL upsert syntax INSERT INTO...ON CONFLICT...DO UPDATE... + *

+ * You can adapt this class to the upsert syntaxes of other databases, such as INSERT INTO...ON DUPLICATE KEY UPDATE... for + * MySQL, or MERGE INTO... for SQL Server. *

* This class wraps both the parametrized SQL statement to be executed and replacing the parameters in the prepared statement. *

* The table name can be decided when the sink is instantiated. */ -public class StockPricePostgresUpsertQueryStatement implements JdbcQueryStatement { +public class StockPriceUpsertQueryStatement implements JdbcQueryStatement { /** * Template for the SQL statement executing the upsert. Depends on the specific RDBMS syntax. @@ -43,7 +46,7 @@ public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) t Timestamp timestamp = Timestamp.from(stockPrice.getTimestamp()); BigDecimal price = stockPrice.getPrice(); - // Replace the parameters positionally + // Replace the parameters positionally (note that some parameters are repeated in the SQL statement) preparedStatement.setString(1, symbol); preparedStatement.setBigDecimal(2, price); preparedStatement.setTimestamp(3, timestamp); @@ -59,7 +62,7 @@ public void accept(PreparedStatement preparedStatement, StockPrice stockPrice) t * * @param tableName name of the table */ - public StockPricePostgresUpsertQueryStatement(String tableName) { + public StockPriceUpsertQueryStatement(String tableName) { this.sql = String.format(UPSERT_QUERY_TEMPLATE, tableName); } diff --git a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json index 43f22a5..23d4ce4 100644 --- a/java/JdbcSink/src/main/resources/flink-application-properties-dev.json +++ b/java/JdbcSink/src/main/resources/flink-application-properties-dev.json @@ -11,7 +11,10 @@ "url": "jdbc:postgresql://localhost:5432/testdb", "username": "flinkuser", "password": "flinkpassword", - "table.name": "prices" + "table.name": "prices", + "batch.size": "100", + "batch.interval.ms": "200", + "max.retries": "5" } } ] From b9d9cebfa14c08cff14b24f415c7b74afbb233dc Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 8 Aug 2025 09:50:16 +0100 Subject: [PATCH 6/7] Simplify dependencies. Add comment about password rotation --- java/JdbcSink/README.md | 26 ++++++++++++++++++++++++++ java/JdbcSink/pom.xml | 5 ++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index d283c9a..9a43b5b 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -128,6 +128,10 @@ For production deployments: 2. Use VPC endpoints for secure database connectivity 3. Enable SSL/TLS for database connections +> ⚠️ **Password rotation**: if the password of your database is rotated, the JdbcSink fails causing the job to restart. +> If you fetch the password dynamically on application start (when you create the JdbcSink object) the job will be able +> to restart with the new password. Fetching the password on start is not shown in this example. + ### Implementation considerations #### At-least-once or exactly-once @@ -154,3 +158,25 @@ The batch size and interval used in this example are for demonstrational purpose You should test your actual application with a realistic throughput and realistic data to optimize these values for your workload. + + +#### Which flink-connector-jdbc-* dependency? + +For using JdbcSink in DataStream API you need `flink-connector-jdbc-core` and the JDBC Driver of the +specific database. For example +``` + + org.apache.flink + flink-connector-jdbc-core + 3.3.0-1.20 + + + + org.postgresql + postgresql + 42.7.2 + +``` + +Including `flink-connector-jdbc` would bring in unnecessary dependencies, and make the uber-jar file increasing the size +of the uber-jar. diff --git a/java/JdbcSink/pom.xml b/java/JdbcSink/pom.xml index b38468d..640014e 100644 --- a/java/JdbcSink/pom.xml +++ b/java/JdbcSink/pom.xml @@ -73,9 +73,12 @@ + org.apache.flink - flink-connector-jdbc + flink-connector-jdbc-core ${flink.jdbc.connector.version} From fc0a61ebcec10b625af24fbaa1d25cb0134a6d6c Mon Sep 17 00:00:00 2001 From: Lorenzo Nicora Date: Fri, 8 Aug 2025 09:53:01 +0100 Subject: [PATCH 7/7] Fix readme --- java/JdbcSink/README.md | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/java/JdbcSink/README.md b/java/JdbcSink/README.md index 9a43b5b..8587e60 100644 --- a/java/JdbcSink/README.md +++ b/java/JdbcSink/README.md @@ -1,6 +1,6 @@ ## Flink JDBC Sink -This example demonstrates how to use the DataStream API JdbcSink to write into a relational database. +This example demonstrates how to use the DataStream API JdbcSink to write to a relational database. * Flink version: 1.20 * Flink API: DataStream @@ -14,11 +14,11 @@ an append-only sink, with an INSERT INTO statement. #### Which JdbcSink? At the moment of publishing this example (August 2025) there are two different DataStream API JdbcSink implementations, -available with the `org.apache.flink:flink-connector-jdbc:3.3.0-1.20` dependency. +available with the version `3.3.0-1.20` of the JDBC connector. 1. The new `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink` which uses the Sink API V2 and is initialized using a builder: `JdbcSink.builder()..build()` -2. The legacy `org.apache.flink.connector.jdbc.JdbcSink` which uses the legacy `SinkFunction` API now deprecated. +2. The legacy `org.apache.flink.connector.jdbc.JdbcSink` which uses the legacy `SinkFunction` API, now deprecated. The legacy sink is initialized with the syntax `JdbcSink.sink(...)` This example uses the new sink. @@ -72,7 +72,7 @@ Runtime parameters: ### Database prerequisites -When running on Amazon Managed Service for Apache Flink and with databases on AWS, you need to set up the database manually, +When running on Amazon Managed Service for Apache Flink with databases on AWS, you need to set up the database manually, ensuring you set up all the following: > You can find the SQL script that sets up the dockerized database by checking out the init script for @@ -115,7 +115,7 @@ See [Running examples locally](../running-examples-locally.md) for details about ### Running on Amazon Managed Service for Apache Flink -To run the application in Amazon Managed Service for Apache Flink make sure the application configuration has the following: +To run the application in Amazon Managed Service for Apache Flink ensure the application configuration has the following: * VPC networking * The selected Subnets can route traffic to the PostgreSQL database * The Security Group allows traffic from the application to the database @@ -125,10 +125,10 @@ To run the application in Amazon Managed Service for Apache Flink make sure the For production deployments: 1. Store database credentials in AWS Secrets Manager. -2. Use VPC endpoints for secure database connectivity -3. Enable SSL/TLS for database connections +2. Use VPC endpoints for secure database connectivity. +3. Enable SSL/TLS for database connections. -> ⚠️ **Password rotation**: if the password of your database is rotated, the JdbcSink fails causing the job to restart. +> ⚠️ **Password rotation**: if the password of your database is rotated, the JdbcSink fails, causing the job to restart. > If you fetch the password dynamically on application start (when you create the JdbcSink object) the job will be able > to restart with the new password. Fetching the password on start is not shown in this example. @@ -162,8 +162,7 @@ workload. #### Which flink-connector-jdbc-* dependency? -For using JdbcSink in DataStream API you need `flink-connector-jdbc-core` and the JDBC Driver of the -specific database. For example +To use JdbcSink in DataStream API, you need `flink-connector-jdbc-core` and the JDBC driver of the specific database. For example: ``` org.apache.flink @@ -178,5 +177,4 @@ specific database. For example ``` -Including `flink-connector-jdbc` would bring in unnecessary dependencies, and make the uber-jar file increasing the size -of the uber-jar. +Including `flink-connector-jdbc` would bring in unnecessary dependencies and increase the size of the uber-jar file.