Skip to content

Commit 306fe17

Browse files
Updating DataStream to use same depedencies as SQL (#132)
* Updating DataStream to use same depedencies as SQL * Update README.md * Update flink-application-properties-dev.json * removing data gen test
1 parent 427e4c3 commit 306fe17

File tree

6 files changed

+81
-105
lines changed

6 files changed

+81
-105
lines changed

java/Iceberg/IcebergDataStreamSink/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
* Flink version: 1.20.0
44
* Flink API: DataStream API
5-
* Iceberg 1.8.1
5+
* Iceberg 1.9.1
66
* Language: Java (11)
77
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
88
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)

java/Iceberg/IcebergDataStreamSink/pom.xml

Lines changed: 41 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,36 @@
1414
<target.java.version>11</target.java.version>
1515
<maven.compiler.source>${target.java.version}</maven.compiler.source>
1616
<maven.compiler.target>${target.java.version}</maven.compiler.target>
17-
17+
<flink.major.version>1.20</flink.major.version>
1818
<flink.version>1.20.0</flink.version>
1919
<avro.version>1.11.3</avro.version>
20-
<hadoop.version>3.4.0</hadoop.version>
21-
<iceberg.version>1.8.1</iceberg.version>
20+
<iceberg.version>1.9.1</iceberg.version>
2221
<kda.runtime.version>1.2.0</kda.runtime.version>
2322
<log4j.version>2.23.1</log4j.version>
2423
<junit5.version>5.8.1</junit5.version>
2524
</properties>
2625

26+
<dependencyManagement>
27+
<dependencies>
28+
<dependency>
29+
<groupId>com.amazonaws</groupId>
30+
<artifactId>aws-java-sdk-bom</artifactId>
31+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
32+
<version>1.12.782</version>
33+
<type>pom</type>
34+
<scope>import</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>software.amazon.awssdk</groupId>
38+
<artifactId>bom</artifactId>
39+
<version>2.28.29</version>
40+
<type>pom</type>
41+
<scope>import</scope>
42+
</dependency>
43+
</dependencies>
44+
</dependencyManagement>
45+
46+
2747
<dependencies>
2848
<!-- Flink Core dependencies -->
2949
<dependency>
@@ -44,26 +64,14 @@
4464
<version>${flink.version}</version>
4565
<scope>provided</scope>
4666
</dependency>
47-
<dependency>
48-
<groupId>org.apache.flink</groupId>
49-
<artifactId>flink-connector-files</artifactId>
50-
<version>${flink.version}</version>
51-
<scope>provided</scope>
52-
</dependency>
67+
5368
<dependency>
5469
<groupId>org.apache.flink</groupId>
5570
<artifactId>flink-table-runtime</artifactId>
5671
<version>${flink.version}</version>
5772
<scope>provided</scope>
5873
</dependency>
5974

60-
<!-- Flink Iceberg uses DropWizard metrics -->
61-
<dependency>
62-
<groupId>org.apache.flink</groupId>
63-
<artifactId>flink-metrics-dropwizard</artifactId>
64-
<version>${flink.version}</version>
65-
</dependency>
66-
6775
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
6876
<dependency>
6977
<groupId>com.amazonaws</groupId>
@@ -79,62 +87,26 @@
7987
<version>${flink.version}</version>
8088
</dependency>
8189

82-
<!--Iceberg dependencies -->
83-
<!-- DO NOT include the iceberg-flink-runtime-* dependency, because it contains a shaded version of Avro -->
84-
<dependency>
85-
<groupId>org.apache.iceberg</groupId>
86-
<artifactId>iceberg-core</artifactId>
87-
<version>${iceberg.version}</version>
88-
</dependency>
89-
<dependency>
90-
<groupId>org.apache.iceberg</groupId>
91-
<artifactId>iceberg-flink</artifactId>
92-
<version>${iceberg.version}</version>
93-
</dependency>
94-
<dependency>
95-
<groupId>org.apache.iceberg</groupId>
96-
<artifactId>iceberg-flink-1.20</artifactId>
97-
<version>${iceberg.version}</version>
98-
</dependency>
99-
<dependency>
100-
<groupId>org.apache.iceberg</groupId>
101-
<artifactId>iceberg-aws-bundle</artifactId>
102-
<version>${iceberg.version}</version>
103-
</dependency>
104-
<dependency>
105-
<groupId>org.apache.iceberg</groupId>
106-
<artifactId>iceberg-aws</artifactId>
107-
<version>${iceberg.version}</version>
108-
</dependency>
10990

110-
<dependency>
111-
<groupId>org.apache.hadoop</groupId>
112-
<artifactId>hadoop-client</artifactId>
113-
<version>${hadoop.version}</version>
114-
<exclusions>
115-
<exclusion>
116-
<groupId>org.apache.avro</groupId>
117-
<artifactId>avro</artifactId>
118-
</exclusion>
119-
<!-- exclude to prevent multiple of SLF4j binding conflict -->
120-
<exclusion>
121-
<groupId>org.slf4j</groupId>
122-
<artifactId>slf4j-reload4j</artifactId>
123-
</exclusion>
124-
</exclusions>
125-
</dependency>
91+
<dependency>
92+
<groupId>org.apache.iceberg</groupId>
93+
<artifactId>iceberg-flink-runtime-${flink.major.version}</artifactId>
94+
<version>${iceberg.version}</version>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.apache.iceberg</groupId>
98+
<artifactId>iceberg-aws-bundle</artifactId>
99+
<version>${iceberg.version}</version>
100+
</dependency>
126101

102+
<!-- S3 File System Support -->
103+
<dependency>
104+
<groupId>org.apache.flink</groupId>
105+
<artifactId>flink-s3-fs-hadoop</artifactId>
106+
<version>${flink.version}</version>
107+
</dependency>
127108

128-
<!-- Tests -->
129-
<dependency>
130-
<groupId>org.junit.jupiter</groupId>
131-
<artifactId>junit-jupiter</artifactId>
132-
<version>${junit5.version}</version>
133-
<scope>test</scope>
134-
</dependency>
135-
136-
<!-- Logging framework, to produce console output when running in the IDE. -->
137-
<!-- These dependencies are excluded from the application JAR by default. -->
109+
<!-- Logging Dependencies -->
138110
<dependency>
139111
<groupId>org.apache.logging.log4j</groupId>
140112
<artifactId>log4j-slf4j-impl</artifactId>
@@ -149,7 +121,6 @@
149121
<groupId>org.apache.logging.log4j</groupId>
150122
<artifactId>log4j-core</artifactId>
151123
<version>${log4j.version}</version>
152-
<scope>runtime</scope>
153124
</dependency>
154125
</dependencies>
155126

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.amazonaws.services.msf.iceberg;
2+
3+
import org.apache.flink.api.common.functions.MapFunction;
4+
import org.apache.flink.formats.avro.AvroToRowDataConverters;
5+
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
6+
import org.apache.flink.table.data.RowData;
7+
import org.apache.flink.table.types.DataType;
8+
import org.apache.flink.table.types.logical.LogicalType;
9+
import org.apache.flink.table.types.logical.RowType;
10+
import org.apache.flink.table.types.utils.TypeConversions;
11+
import org.apache.avro.Schema;
12+
import org.apache.avro.generic.GenericRecord;
13+
14+
public class AvroGenericRecordToRowDataMapper implements MapFunction<GenericRecord, RowData> {
15+
private final AvroToRowDataConverters.AvroToRowDataConverter converter;
16+
17+
AvroGenericRecordToRowDataMapper(RowType rowType) {
18+
this.converter = AvroToRowDataConverters.createRowConverter(rowType);
19+
}
20+
21+
public RowData map(GenericRecord genericRecord) throws Exception {
22+
return (RowData)this.converter.convert(genericRecord);
23+
}
24+
25+
public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
26+
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
27+
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
28+
RowType rowType = (RowType) logicalType;
29+
return new AvroGenericRecordToRowDataMapper(rowType);
30+
}
31+
}

java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.apache.avro.generic.GenericRecord;
1010
import org.apache.iceberg.BaseTable;
11-
import org.apache.iceberg.NullOrder;
1211
import org.apache.iceberg.PartitionSpec;
1312
import org.apache.iceberg.Table;
1413
import org.apache.iceberg.TableMetadata;
@@ -19,9 +18,9 @@
1918
import org.apache.iceberg.flink.CatalogLoader;
2019
import org.apache.iceberg.flink.FlinkSchemaUtil;
2120
import org.apache.iceberg.flink.TableLoader;
22-
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
2321
import org.apache.iceberg.flink.sink.FlinkSink;
2422
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
23+
import org.apache.iceberg.shaded.org.apache.avro.Schema;
2524

2625
import java.util.Arrays;
2726
import java.util.HashMap;
@@ -88,11 +87,12 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
8887
String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS);
8988
List<String> equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+"));
9089

90+
Schema shadedAvroSchema = new Schema.Parser().parse(avroSchema.toString());
9191

9292
// Convert Avro Schema to Iceberg Schema, this will be used for creating the table
93-
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
93+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(shadedAvroSchema);
9494
// Avro Generic Record to Row Data Mapper
95-
MapFunction<GenericRecord, RowData> avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
95+
AvroGenericRecordToRowDataMapper avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema);
9696

9797

9898
// Catalog properties for using Glue Data Catalog

java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
{
99
"PropertyGroupId": "Iceberg",
1010
"PropertyMap": {
11-
"bucket.prefix": "s3://<my-bucket>/iceberg",
12-
"catalog.db": "default",
13-
"catalog.table": "prices_iceberg",
11+
"bucket.prefix": "s3://<bucket-name>/iceberg",
12+
"catalog.db": "iceberg",
13+
"catalog.table": "prices_iceberg_datastream",
1414
"partition.fields": "symbol",
1515
"operation": "append",
1616
"upsert.equality.fields": "symbol"
1717
}
1818
}
19-
]
19+
]

java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)