From e08038ded4822109658a8330c34604746d874565 Mon Sep 17 00:00:00 2001 From: Francisco Date: Tue, 27 May 2025 15:33:36 +0200 Subject: [PATCH 1/3] first commit --- java/Iceberg/IcebergSQLJSONGlue/README.md | 78 ++++++ java/Iceberg/IcebergSQLJSONGlue/pom.xml | 225 ++++++++++++++++++ .../main/java/GlueTableSQLJSONExample.java | 189 +++++++++++++++ .../src/main/java/StockPrice.java | 60 +++++ .../java/StockPriceGeneratorFunction.java | 24 ++ .../flink-application-properties-dev.json | 16 ++ .../src/main/resources/log4j2.properties | 13 + .../src/main/resources/price.avsc | 23 ++ 8 files changed, 628 insertions(+) create mode 100644 java/Iceberg/IcebergSQLJSONGlue/README.md create mode 100644 java/Iceberg/IcebergSQLJSONGlue/pom.xml create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/resources/log4j2.properties create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc diff --git a/java/Iceberg/IcebergSQLJSONGlue/README.md b/java/Iceberg/IcebergSQLJSONGlue/README.md new file mode 100644 index 0000000..ca05aa6 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/README.md @@ -0,0 +1,78 @@ +# Flink Iceberg Sink using SQL API + +* Flink version: 1.20.0 +* Flink API: SQL API +* Iceberg 1.8.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrates how to use +[Flink SQL API with Iceberg](https://iceberg.apache.org/docs/latest/flink-writes/) and the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as POJO objects, simulating a real source, for example a Kafka Source, that receives records +that can be converted to table format for SQL operations. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 30 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application uses a predefined schema for the stock price data with the following fields: +* `timestamp`: STRING - ISO timestamp of the record +* `symbol`: STRING - Stock symbol (e.g., AAPL, AMZN) +* `price`: FLOAT - Stock price (0-10 range) +* `volumes`: INT - Trade volumes (0-1000000 range) + +This schema matches the AVRO schema used in the DataStream API example for consistency. +The equivalent AVRO schema definition is available in [price.avsc](./src/main/resources/price.avsc) for reference. + +Schema changes would require updating both the POJO class and the SQL table definition. +Unlike the DataStream approach with AVRO, this SQL approach requires explicit schema management. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergSQLJSONGlue/pom.xml b/java/Iceberg/IcebergSQLJSONGlue/pom.xml new file mode 100644 index 0000000..1df90e1 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/pom.xml @@ -0,0 +1,225 @@ + + + 4.0.0 + + com.amazonaws + iceberg-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.20.0 + 1.11.3 + 2.12 + 3.4.0 + 1.8.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink-1.20 + ${iceberg.version} + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + GlueTableSQLJSONExample + + + + + + + + + diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java new file mode 100644 index 0000000..15b0c42 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java @@ -0,0 +1,189 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class GlueTableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "glue"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(GlueTableSQLJSONExample.class); + + // Configuration properties + private static String s3BucketPrefix; + private static String glueDatabase; + private static String glueTable; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments - using standard environment instead of WebUI for production consistency + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup configuration properties with validation + setupIcebergProperties(icebergProperties); + Catalog glueCatalog = createGlueCatalog(tableEnv); + tableEnv.registerCatalog(CATALOG_NAME,glueCatalog); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + String sinkTableName = CATALOG_NAME + "." + glueDatabase + "." + glueTable; + + // Define and create table with schema matching AVRO schema from DataStream example + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" + + "`timestamp` STRING, " + + "symbol STRING," + + "price FLOAT," + + "volumes INT" + + ") PARTITIONED BY (symbol) "; + + LOG.info("Creating table with statement: {}", createTableStatement); + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations - Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT `timestamp`, symbol, price, volumes FROM stockPriceTable"; + LOG.info("Executing insert statement: {}", insertQuery); + TableResult insertResult = tableEnv.executeSql(insertQuery); + + // Keep the job running to continuously insert data + LOG.info("Application started successfully. Inserting data into Iceberg table: {}", sinkTableName); + + } + + private static void setupIcebergProperties(Properties icebergProperties) { + s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); + glueDatabase = icebergProperties.getProperty("catalog.db", "default"); + glueTable = icebergProperties.getProperty("catalog.table", "prices_iceberg"); + + Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket prefix for the warehouse."); + Preconditions.checkNotNull(glueDatabase, "You must supply a database name"); + Preconditions.checkNotNull(glueTable, "You must supply a table name"); + + // Validate S3 URI format + validateURI(s3BucketPrefix); + + LOG.info("Iceberg configuration: bucket={}, database={}, table={}", + s3BucketPrefix, glueDatabase, glueTable); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with Glue specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createGlueCatalog(StreamTableEnvironment tableEnv) { + + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + catalogProperties.put("impl", "org.apache.iceberg.aws.glue.GlueCatalog"); + //Loading Glue Data Catalog + CatalogLoader glueCatalogLoader = CatalogLoader.custom( + CATALOG_NAME, + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "org.apache.iceberg.aws.glue.GlueCatalog"); + + + FlinkCatalog flinkCatalog = new FlinkCatalog(CATALOG_NAME,glueDatabase, Namespace.empty(),glueCatalogLoader,true,1000); + return flinkCatalog; + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(GlueTableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateURI(String uri) { + String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; + Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), + "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java new file mode 100644 index 0000000..c386c2a --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java @@ -0,0 +1,60 @@ +import java.time.Instant; + +public class StockPrice { + private String timestamp; + private String symbol; + private Float price; + private Integer volumes; + + public StockPrice() { + } + + public StockPrice(String timestamp, String symbol, Float price, Integer volumes) { + this.timestamp = timestamp; + this.symbol = symbol; + this.price = price; + this.volumes = volumes; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSymbol() { + return symbol; + } + + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + public Float getPrice() { + return price; + } + + public void setPrice(Float price) { + this.price = price; + } + + public Integer getVolumes() { + return volumes; + } + + public void setVolumes(Integer volumes) { + this.volumes = volumes; + } + + @Override + public String toString() { + return "StockPrice{" + + "timestamp='" + timestamp + '\'' + + ", symbol='" + symbol + '\'' + + ", price=" + price + + ", volumes=" + volumes + + '}'; + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..38e236b --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,24 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as StockPrice POJOs. + * + * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction + * from the IcebergDataStreamSink example. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + @Override + public StockPrice map(Long sequence) throws Exception { + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + return new StockPrice(timestamp, symbol, price, volumes); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..67e0aa1 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,16 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 10.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "bucket.prefix": "s3://iceberg-performance-026090544291/iceberg", + "catalog.db": "iceberg", + "catalog.table": "prices_iceberg" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/log4j2.properties b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/log4j2.properties new file mode 100644 index 0000000..a6cccce --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/log4j2.properties @@ -0,0 +1,13 @@ +# Log4j2 configuration +status = warn +name = PropertiesConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = ConsoleAppender +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Root logger configuration +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc new file mode 100644 index 0000000..6303e0d --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file From fd81e7f359bb20a4c9fde3bc83368fe27c5e9921 Mon Sep 17 00:00:00 2001 From: Francisco Date: Tue, 27 May 2025 15:35:39 +0200 Subject: [PATCH 2/3] updating properties --- .../src/main/resources/flink-application-properties-dev.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json index 67e0aa1..79ec4f1 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -8,7 +8,7 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3://iceberg-performance-026090544291/iceberg", + "bucket.prefix": "s3:///iceberg", "catalog.db": "iceberg", "catalog.table": "prices_iceberg" } From 68c7ce6f2051072fb62f8dc609d9baed50998c85 Mon Sep 17 00:00:00 2001 From: Francisco Date: Thu, 10 Jul 2025 16:22:20 +0200 Subject: [PATCH 3/3] Adding Hadoop Utils --- java/Iceberg/IcebergSQLJSONGlue/README.md | 25 ++- java/Iceberg/IcebergSQLJSONGlue/pom.xml | 129 ++++------- .../msf}/GlueTableSQLJSONExample.java | 206 ++++++++---------- .../services/msf/pojo}/StockPrice.java | 4 +- .../source}/StockPriceGeneratorFunction.java | 5 +- .../flink/runtime/util/HadoopUtils.java | 120 ++++++++++ .../flink-application-properties-dev.json | 4 +- .../src/main/resources/price.avsc | 23 -- 8 files changed, 284 insertions(+), 232 deletions(-) rename java/Iceberg/IcebergSQLJSONGlue/src/main/java/{ => com/amazonaws/services/msf}/GlueTableSQLJSONExample.java (70%) rename java/Iceberg/IcebergSQLJSONGlue/src/main/java/{ => com/amazonaws/services/msf/pojo}/StockPrice.java (92%) rename java/Iceberg/IcebergSQLJSONGlue/src/main/java/{ => com/amazonaws/services/msf/source}/StockPriceGeneratorFunction.java (86%) create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java delete mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc diff --git a/java/Iceberg/IcebergSQLJSONGlue/README.md b/java/Iceberg/IcebergSQLJSONGlue/README.md index ca05aa6..045e9bc 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/README.md +++ b/java/Iceberg/IcebergSQLJSONGlue/README.md @@ -1,8 +1,8 @@ # Flink Iceberg Sink using SQL API -* Flink version: 1.20.0 +* Flink version: 1.20.1 * Flink API: SQL API -* Iceberg 1.8.1 +* Iceberg 1.9.1 * Language: Java (11) * Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) @@ -57,7 +57,21 @@ At the moment there are current limitations concerning Flink Iceberg integration * Doesn't support Iceberg Table with hidden partitioning * Doesn't support adding columns, removing columns, renaming columns or changing columns. -### Schema and schema evolution +### Hadoop Library Availability Challenge + +When integrating Flink with Iceberg, there's a common configuration challenge that affects most Flink deployments: + +#### The Challenge +* When using Flink SQL's `CREATE CATALOG` statements, Hadoop libraries must be available on the system classpath +* However, standard Flink distributions use shaded dependencies that can create class loading conflicts with Hadoop's expectations +* This is particularly relevant for TaskManagers (which is the case for most generic Flink clusters, except EMR) + +#### Solution Approaches +1. **For SQL Applications (This Example)** + * If Hadoop is not pre-installed in the cluster, you'll need to create a custom HadoopUtils class and properly configure Maven dependencies + * This example includes the necessary configuration to handle these dependencies + +### Schema The application uses a predefined schema for the stock price data with the following fields: * `timestamp`: STRING - ISO timestamp of the record @@ -65,11 +79,6 @@ The application uses a predefined schema for the stock price data with the follo * `price`: FLOAT - Stock price (0-10 range) * `volumes`: INT - Trade volumes (0-1000000 range) -This schema matches the AVRO schema used in the DataStream API example for consistency. -The equivalent AVRO schema definition is available in [price.avsc](./src/main/resources/price.avsc) for reference. - -Schema changes would require updating both the POJO class and the SQL table definition. -Unlike the DataStream approach with AVRO, this SQL approach requires explicit schema management. ### Running locally, in IntelliJ diff --git a/java/Iceberg/IcebergSQLJSONGlue/pom.xml b/java/Iceberg/IcebergSQLJSONGlue/pom.xml index 1df90e1..627ec0a 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/pom.xml +++ b/java/Iceberg/IcebergSQLJSONGlue/pom.xml @@ -14,116 +14,71 @@ 11 ${target.java.version} ${target.java.version} - - 1.20.0 - 1.11.3 + 1.20 + 1.20.1 2.12 - 3.4.0 - 1.8.1 + 1.9.1 1.2.0 2.23.1 5.8.1 + + + + + com.amazonaws + aws-java-sdk-bom + + 1.12.782 + pom + import + + + + - - - org.apache.flink - flink-runtime-web - ${flink.version} - provided - org.apache.flink flink-streaming-java ${flink.version} provided - - org.apache.flink - flink-table-runtime - ${flink.version} - provided - org.apache.flink flink-table-api-java-bridge ${flink.version} + org.apache.flink - flink-table-common - ${flink.version} - - - org.apache.flink - flink-metrics-dropwizard + flink-table-planner_${scala.version} ${flink.version} + provided org.apache.flink - flink-avro + flink-clients ${flink.version} - org.apache.flink - flink-table-planner_${scala.version} + flink-connector-datagen ${flink.version} provided - com.amazonaws aws-kinesisanalytics-runtime ${kda.runtime.version} provided - - org.apache.flink - flink-connector-files - ${flink.version} - provided - - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - - - org.apache.avro - avro - - - org.slf4j - slf4j-reload4j - - - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${hadoop.version} - - - - org.apache.iceberg - iceberg-core - ${iceberg.version} - - org.apache.iceberg - iceberg-flink + iceberg-flink-runtime-${flink.major.version} ${iceberg.version} @@ -131,25 +86,15 @@ iceberg-aws-bundle ${iceberg.version} + + - org.apache.iceberg - iceberg-aws - ${iceberg.version} - - - org.apache.iceberg - iceberg-flink-1.20 - ${iceberg.version} - - - - org.junit.jupiter - junit-jupiter - ${junit5.version} - test + org.apache.flink + flink-s3-fs-hadoop + ${flink.version} - + org.apache.logging.log4j log4j-slf4j-impl @@ -184,7 +129,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.1 + 3.6.0 package @@ -213,9 +158,21 @@ - GlueTableSQLJSONExample + com.amazonaws.services.msf.GlueTableSQLJSONExample + + + + org.apache.hadoop.conf + shaded.org.apache.hadoop.conf + + + org.apache.flink.runtime.util.HadoopUtils + shadow.org.apache.flink.runtime.util.HadoopUtils + + diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/GlueTableSQLJSONExample.java similarity index 70% rename from java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java rename to java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/GlueTableSQLJSONExample.java index 15b0c42..dfe64f9 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/GlueTableSQLJSONExample.java @@ -1,4 +1,4 @@ -/* +package com.amazonaws.services.msf;/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: MIT-0 * @@ -17,10 +17,11 @@ */ import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.pojo.StockPrice; +import com.amazonaws.services.msf.source.StockPriceGeneratorFunction; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; @@ -28,18 +29,11 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.util.Preconditions; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.flink.CatalogLoader; -import org.apache.iceberg.flink.FlinkCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -50,71 +44,60 @@ public class GlueTableSQLJSONExample { private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; private static final Logger LOG = LoggerFactory.getLogger(GlueTableSQLJSONExample.class); - // Configuration properties - private static String s3BucketPrefix; - private static String glueDatabase; - private static String glueTable; - - public static void main(String[] args) throws Exception { - // 1. Initialize environments - using standard environment instead of WebUI for production consistency - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } - // 2. Load properties and configure environment - Map applicationProperties = loadApplicationProperties(env); - Properties icebergProperties = applicationProperties.get("Iceberg"); + private static void validateURI(String uri) { + String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; + Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), + "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + } - // Configure local development settings if needed + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (isLocal(env)) { - env.enableCheckpointing(30000); - env.setParallelism(2); + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(GlueTableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); } + } - // 3. Setup configuration properties with validation - setupIcebergProperties(icebergProperties); - Catalog glueCatalog = createGlueCatalog(tableEnv); - tableEnv.registerCatalog(CATALOG_NAME,glueCatalog); - - // 4. Create data generator source - Properties dataGenProperties = applicationProperties.get("DataGen"); - DataStream stockPriceDataStream = env.fromSource( - createDataGenerator(dataGenProperties), - WatermarkStrategy.noWatermarks(), - "DataGen"); - + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); - // 5. Convert DataStream to Table and create view - Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); - tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } - String sinkTableName = CATALOG_NAME + "." + glueDatabase + "." + glueTable; + private static String createCatalogStatement(String s3BucketPrefix) { + return "CREATE CATALOG " + CATALOG_NAME + " WITH (" + + "'type' = 'iceberg', " + + "'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog'," + + "'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'," + + "'warehouse' = '" + s3BucketPrefix + "')"; + } - // Define and create table with schema matching AVRO schema from DataStream example - String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" + + private static String createTableStatement(String sinkTableName) { + return "CREATE TABLE IF NOT EXISTS " + sinkTableName + " (" + "`timestamp` STRING, " + "symbol STRING," + "price FLOAT," + "volumes INT" + ") PARTITIONED BY (symbol) "; - - LOG.info("Creating table with statement: {}", createTableStatement); - tableEnv.executeSql(createTableStatement); - - // 7. Execute SQL operations - Insert data from stock price stream - String insertQuery = "INSERT INTO " + sinkTableName + - " SELECT `timestamp`, symbol, price, volumes FROM stockPriceTable"; - LOG.info("Executing insert statement: {}", insertQuery); - TableResult insertResult = tableEnv.executeSql(insertQuery); - - // Keep the job running to continuously insert data - LOG.info("Application started successfully. Inserting data into Iceberg table: {}", sinkTableName); - } - private static void setupIcebergProperties(Properties icebergProperties) { - s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); - glueDatabase = icebergProperties.getProperty("catalog.db", "default"); - glueTable = icebergProperties.getProperty("catalog.table", "prices_iceberg"); + private static IcebergConfig setupIcebergProperties(Properties icebergProperties) { + String s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); + String glueDatabase = icebergProperties.getProperty("catalog.db", "default"); + String glueTable = icebergProperties.getProperty("catalog.table", "prices_iceberg"); Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket prefix for the warehouse."); Preconditions.checkNotNull(glueDatabase, "You must supply a database name"); @@ -125,65 +108,68 @@ private static void setupIcebergProperties(Properties icebergProperties) { LOG.info("Iceberg configuration: bucket={}, database={}, table={}", s3BucketPrefix, glueDatabase, glueTable); - } - private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { - double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); - Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); - - LOG.info("Data generator: {} record/sec", recordsPerSecond); - return new DataGeneratorSource(new StockPriceGeneratorFunction(), - Long.MAX_VALUE, - RateLimiterStrategy.perSecond(recordsPerSecond), - TypeInformation.of(StockPrice.class)); + return new IcebergConfig(s3BucketPrefix, glueDatabase, glueTable); } - /** - * Defines a config object with Glue specific catalog and io implementations - * Then, uses that to create the Flink catalog - */ - private static Catalog createGlueCatalog(StreamTableEnvironment tableEnv) { - - Map catalogProperties = new HashMap<>(); - catalogProperties.put("type", "iceberg"); - catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); - catalogProperties.put("warehouse", s3BucketPrefix); - catalogProperties.put("impl", "org.apache.iceberg.aws.glue.GlueCatalog"); - //Loading Glue Data Catalog - CatalogLoader glueCatalogLoader = CatalogLoader.custom( - CATALOG_NAME, - catalogProperties, - new org.apache.hadoop.conf.Configuration(), - "org.apache.iceberg.aws.glue.GlueCatalog"); - - - FlinkCatalog flinkCatalog = new FlinkCatalog(CATALOG_NAME,glueDatabase, Namespace.empty(),glueCatalogLoader,true,1000); - return flinkCatalog; - } + private static class IcebergConfig { + final String s3BucketPrefix; + final String glueDatabase; + final String glueTable; - private static boolean isLocal(StreamExecutionEnvironment env) { - return env instanceof LocalStreamEnvironment; + IcebergConfig(String s3BucketPrefix, String glueDatabase, String glueTable) { + this.s3BucketPrefix = s3BucketPrefix; + this.glueDatabase = glueDatabase; + this.glueTable = glueTable; + } } - /** - * Load application properties from Amazon Managed Service for Apache Flink runtime - * or from a local resource, when the environment is local - */ - private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + public static void main(String[] args) throws Exception { + // 1. Initialize environments - using standard environment instead of WebUI for production consistency + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2 If running local, we need to enable Checkpoints. Iceberg commits data with every checkpoint if (isLocal(env)) { - LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); - return KinesisAnalyticsRuntime.getApplicationProperties( - Objects.requireNonNull(GlueTableSQLJSONExample.class.getClassLoader() - .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); - } else { - LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); - return KinesisAnalyticsRuntime.getApplicationProperties(); + env.enableCheckpointing(60000); } - } - public static void validateURI(String uri) { - String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; - Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), - "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + // 3. Setup configuration properties with validation + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + IcebergConfig config = setupIcebergProperties(icebergProperties); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + String sinkTableName = CATALOG_NAME + "." + config.glueDatabase + "." + config.glueTable; + + // Create catalog and configure it + tableEnv.executeSql(createCatalogStatement(config.s3BucketPrefix)); + tableEnv.executeSql("USE CATALOG " + CATALOG_NAME); + tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + config.glueDatabase); + tableEnv.executeSql("USE " + config.glueDatabase); + + // Create table + String createTableStatement = createTableStatement(sinkTableName); + LOG.info("Creating table with statement: {}", createTableStatement); + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations - Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT `timestamp`, symbol, price, volumes FROM default_catalog.default_database.stockPriceTable"; + LOG.info("Executing insert statement: {}", insertQuery); + TableResult insertResult = tableEnv.executeSql(insertQuery); + + // Keep the job running to continuously insert data + LOG.info("Application started successfully. Inserting data into Iceberg table: {}", sinkTableName); } } diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/pojo/StockPrice.java similarity index 92% rename from java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java rename to java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/pojo/StockPrice.java index c386c2a..3b4f923 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/pojo/StockPrice.java @@ -1,4 +1,4 @@ -import java.time.Instant; +package com.amazonaws.services.msf.pojo; public class StockPrice { private String timestamp; @@ -50,7 +50,7 @@ public void setVolumes(Integer volumes) { @Override public String toString() { - return "StockPrice{" + + return "com.amazonaws.services.msf.pojo.StockPrice{" + "timestamp='" + timestamp + '\'' + ", symbol='" + symbol + '\'' + ", price=" + price + diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java similarity index 86% rename from java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java rename to java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java index 38e236b..1cf5554 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/com/amazonaws/services/msf/source/StockPriceGeneratorFunction.java @@ -1,9 +1,12 @@ +package com.amazonaws.services.msf.source; + +import com.amazonaws.services.msf.pojo.StockPrice; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.connector.datagen.source.GeneratorFunction; import java.time.Instant; /** - * Function used by DataGen source to generate random records as StockPrice POJOs. + * Function used by DataGen source to generate random records as com.amazonaws.services.msf.pojo.StockPrice POJOs. * * The generator mimics the behavior of AvroGenericStockTradeGeneratorFunction * from the IcebergDataStreamSink example. diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java new file mode 100644 index 0000000..b177d06 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java @@ -0,0 +1,120 @@ +package org.apache.flink.runtime.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + + +/** + * This class is a copy of org.apache.flink.runtime.util.HadoopUtils with the getHadoopConfiguration() method replaced to + * return an org.apache.hadoop.conf.Configuration instead of org.apache.hadoop.hdfs.HdfsConfiguration. + * + * This class is then shaded, along with org.apache.hadoop.conf.*, to avoid conflicts with the same classes provided by + * org.apache.flink:flink-s3-fs-hadoop, which is normally installed as plugin in Flink when S3. + * + * Other methods are copied from the original class. + */ +public class HadoopUtils { + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + + /** + * This method has been re-implemented to always return a org.apache.hadoop.conf.Configuration + */ + public static Configuration getHadoopConfiguration( + org.apache.flink.configuration.Configuration flinkConfiguration) { + return new Configuration(false); + } + + public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi) { + return UserGroupInformation.isSecurityEnabled() + && ugi.getAuthenticationMethod() + == UserGroupInformation.AuthenticationMethod.KERBEROS; + } + + + public static boolean areKerberosCredentialsValid( + UserGroupInformation ugi, boolean useTicketCache) { + Preconditions.checkState(isKerberosSecurityEnabled(ugi)); + + // note: UGI::hasKerberosCredentials inaccurately reports false + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), + // so we check only in ticket cache scenario. + if (useTicketCache && !ugi.hasKerberosCredentials()) { + if (hasHDFSDelegationToken(ugi)) { + LOG.warn( + "Hadoop security is enabled but current login user does not have Kerberos credentials, " + + "use delegation token instead. Flink application will terminate after token expires."); + return true; + } else { + LOG.error( + "Hadoop security is enabled, but current login user has neither Kerberos credentials " + + "nor delegation tokens!"); + return false; + } + } + + return true; + } + + /** + * Indicates whether the user has an HDFS delegation token. + */ + public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) { + Collection> usrTok = ugi.getTokens(); + for (Token token : usrTok) { + if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) { + return true; + } + } + return false; + } + + /** + * Checks if the Hadoop dependency is at least the given version. + */ + public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj > major || (maj == major && min >= minor); + } + + /** + * Checks if the Hadoop dependency is at most the given version. + */ + public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2 hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj < major || (maj == major && min < minor); + } + + private static Tuple2 getMajorMinorBundledHadoopVersion() { + String versionString = VersionInfo.getVersion(); + String[] versionParts = versionString.split("\\."); + + if (versionParts.length < 2) { + throw new FlinkRuntimeException( + "Cannot determine version of Hadoop, unexpected version string: " + + versionString); + } + + int maj = Integer.parseInt(versionParts[0]); + int min = Integer.parseInt(versionParts[1]); + return Tuple2.of(maj, min); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json index 79ec4f1..14f4d2a 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -8,9 +8,9 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3:///iceberg", + "bucket.prefix": "s3://iceberg-performance-026090544291/iceberg", "catalog.db": "iceberg", - "catalog.table": "prices_iceberg" + "catalog.table": "prices_iceberg_test_2" } } ] \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc deleted file mode 100644 index 6303e0d..0000000 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/price.avsc +++ /dev/null @@ -1,23 +0,0 @@ -{ - "type": "record", - "name": "Price", - "namespace": "com.amazonaws.services.msf.avro", - "fields": [ - { - "name": "timestamp", - "type": "string" - }, - { - "name": "symbol", - "type": "string" - }, - { - "name": "price", - "type": "float" - }, - { - "name": "volumes", - "type": "int" - } - ] -} \ No newline at end of file