Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/Iceberg/IcebergDataStreamSink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

* Flink version: 1.20.0
* Flink API: DataStream API
* Iceberg 1.8.1
* Iceberg 1.9.1
* Language: Java (11)
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
Expand Down
111 changes: 41 additions & 70 deletions java/Iceberg/IcebergDataStreamSink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,36 @@
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>

<flink.major.version>1.20</flink.major.version>
<flink.version>1.20.0</flink.version>
<avro.version>1.11.3</avro.version>
<hadoop.version>3.4.0</hadoop.version>
<iceberg.version>1.8.1</iceberg.version>
<iceberg.version>1.9.1</iceberg.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<log4j.version>2.23.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>1.12.782</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.29</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>
<!-- Flink Core dependencies -->
<dependency>
Expand All @@ -44,26 +64,14 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink Iceberg uses DropWizard metrics -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -79,62 +87,26 @@
<version>${flink.version}</version>
</dependency>

<!--Iceberg dependencies -->
<!-- DO NOT include the iceberg-flink-runtime-* dependency, because it contains a shaded version of Avro -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.20</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<!-- exclude to prevent multiple of SLF4j binding conflict -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${flink.major.version}</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- S3 File System Support -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>

<!-- Logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<!-- Logging Dependencies -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand All @@ -149,7 +121,6 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.amazonaws.services.msf.iceberg;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.formats.avro.AvroToRowDataConverters;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

public class AvroGenericRecordToRowDataMapper implements MapFunction<GenericRecord, RowData> {
private final AvroToRowDataConverters.AvroToRowDataConverter converter;

AvroGenericRecordToRowDataMapper(RowType rowType) {
this.converter = AvroToRowDataConverters.createRowConverter(rowType);
}

public RowData map(GenericRecord genericRecord) throws Exception {
return (RowData)this.converter.convert(genericRecord);
}

public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = (RowType) logicalType;
return new AvroGenericRecordToRowDataMapper(rowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand All @@ -19,9 +18,9 @@
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.shaded.org.apache.avro.Schema;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -88,11 +87,12 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS);
List<String> equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+"));

Schema shadedAvroSchema = new Schema.Parser().parse(avroSchema.toString());
Copy link
Contributor

@nicusX nicusX Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth adding a comment here, highlighting this is using the shaded implementation of AVRO Schema.
One may guess from the name, but better being explicit


// Convert Avro Schema to Iceberg Schema, this will be used for creating the table
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(shadedAvroSchema);
// Avro Generic Record to Row Data Mapper
MapFunction<GenericRecord, RowData> avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
AvroGenericRecordToRowDataMapper avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);


// Catalog properties for using Glue Data Catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
{
"PropertyGroupId": "Iceberg",
"PropertyMap": {
"bucket.prefix": "s3://<my-bucket>/iceberg",
"catalog.db": "default",
"catalog.table": "prices_iceberg",
"bucket.prefix": "s3://<bucket-name>/iceberg",
"catalog.db": "iceberg",
"catalog.table": "prices_iceberg_datastream",
"partition.fields": "symbol",
"operation": "append",
"upsert.equality.fields": "symbol"
}
}
]
]