Skip to content

Commit 625bb36

Browse files
voonhouscshuo
authored andcommitted
Checkpoint 55 - Address comments
1 parent 57c29dc commit 625bb36

File tree

9 files changed

+174
-260
lines changed

9 files changed

+174
-260
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ limitations under the License.
9494

9595
<dependency>
9696
<groupId>org.apache.hudi</groupId>
97-
<artifactId>hudi-flink1.20.x</artifactId>
97+
<artifactId>hudi-flink${flink.major.version}.x</artifactId>
9898
<version>${hudi.version}</version>
9999
</dependency>
100100

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/HudiDataSinkFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ public HudiDataSink createDataSink(Context context) {
5757
FactoryHelper.DefaultContext factoryContext = (FactoryHelper.DefaultContext) context;
5858
Configuration config = factoryContext.getFactoryConfiguration();
5959

60+
// Validate that only BUCKET index type is used
61+
String indexType = config.get(HudiConfig.INDEX_TYPE);
62+
if (indexType != null && !indexType.equalsIgnoreCase("BUCKET")) {
63+
throw new IllegalArgumentException(
64+
String.format(
65+
"Unsupported index type '%s'. Currently only 'BUCKET' index type is supported. "
66+
+ "Other index types (e.g., FLINK_STATE, BLOOM, SIMPLE) are not yet implemented "
67+
+ "for multi-table CDC pipelines.",
68+
indexType));
69+
}
70+
6071
String schemaOperatorUid =
6172
context.getPipelineConfiguration()
6273
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
@@ -69,8 +80,6 @@ public Set<ConfigOption<?>> requiredOptions() {
6980
Set<ConfigOption<?>> options = new HashSet<>();
7081
options.add(HudiConfig.PATH);
7182
options.add(HudiConfig.RECORD_KEY_FIELD);
72-
// options.add(HudiConfig.PRECOMBINE_FIELD);
73-
// options.add(HudiConfig.BUCKET_INDEX_NUM_BUCKETS);
7483
return options;
7584
}
7685

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/bucket/BucketAssignOperator.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.hudi.sink.bucket;
1919

20+
import org.apache.flink.cdc.common.data.RecordData;
2021
import org.apache.flink.cdc.common.event.DataChangeEvent;
2122
import org.apache.flink.cdc.common.event.Event;
2223
import org.apache.flink.cdc.common.event.FlushEvent;
@@ -83,12 +84,12 @@ public class BucketAssignOperator extends AbstractStreamOperator<BucketWrapper>
8384
/** Cache of schemas per table for bucket calculation. */
8485
private final Map<TableId, Schema> schemaCache = new HashMap<>();
8586

86-
/** Cache of primary key fields per table. */
87-
private final Map<TableId, List<String>> primaryKeyCache = new HashMap<>();
88-
8987
/** RowDataKeyGen cache per table for key and partition extraction. */
9088
private final Map<TableId, RowDataKeyGen> keyGenCache = new HashMap<>();
9189

90+
/** Field getter cache per table - lazily created and invalidated on schema changes. */
91+
private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterCache = new HashMap<>();
92+
9293
public BucketAssignOperator(Configuration conf, String schemaOperatorUid) {
9394
this.numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
9495
this.schemaOperatorUid = schemaOperatorUid;
@@ -132,8 +133,8 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
132133
schemaCache.put(schemaEvent.tableId(), newSchema);
133134

134135
// Clear caches when schema changes
135-
primaryKeyCache.remove(schemaEvent.tableId());
136136
keyGenCache.remove(schemaEvent.tableId());
137+
fieldGetterCache.remove(schemaEvent.tableId());
137138

138139
// Broadcast to all tasks
139140
for (int i = 0; i < totalTasksNumber; i++) {
@@ -211,8 +212,7 @@ private int calculateTaskIndex(DataChangeEvent event) {
211212
final Schema finalSchema = schema;
212213

213214
// Get or cache primary keys
214-
List<String> primaryKeys =
215-
primaryKeyCache.computeIfAbsent(tableId, k -> finalSchema.primaryKeys());
215+
List<String> primaryKeys = finalSchema.primaryKeys();
216216

217217
if (primaryKeys.isEmpty()) {
218218
throw new IllegalStateException(
@@ -223,11 +223,14 @@ private int calculateTaskIndex(DataChangeEvent event) {
223223
RowDataKeyGen keyGen =
224224
keyGenCache.computeIfAbsent(tableId, k -> RowDataUtils.createKeyGen(finalSchema));
225225

226+
// Get or create field getters for this table, lazily cached
227+
List<RecordData.FieldGetter> fieldGetters =
228+
fieldGetterCache.computeIfAbsent(
229+
tableId,
230+
k -> RowDataUtils.createFieldGetters(finalSchema, ZoneId.systemDefault()));
231+
226232
// Convert DataChangeEvent to RowData for key extraction
227-
RowData rowData =
228-
RowDataUtils.convertDataChangeEventToRowData(
229-
event,
230-
RowDataUtils.createFieldGetters(finalSchema, ZoneId.systemDefault()));
233+
RowData rowData = RowDataUtils.convertDataChangeEventToRowData(event, fieldGetters);
231234

232235
// Use RowDataKeyGen to extract record key and partition path
233236
String recordKey = keyGen.getRecordKey(rowData);
@@ -237,9 +240,9 @@ private int calculateTaskIndex(DataChangeEvent event) {
237240
String tableIndexKeyFields = String.join(",", primaryKeys);
238241
int bucketNumber = BucketIdentifier.getBucketId(recordKey, tableIndexKeyFields, numBuckets);
239242

243+
// partitionIndexFunc is designed for single table, events may come from different tables,
244+
// prefix them with tableId e.g. tableId + "_" + partition
240245
// Use partition function to map bucket to task index for balanced distribution
241-
int taskIndex = partitionIndexFunc.apply(numBuckets, partition, bucketNumber);
242-
243-
return taskIndex;
246+
return partitionIndexFunc.apply(numBuckets, tableId + "_" + partition, bucketNumber);
244247
}
245248
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@
6363
import org.slf4j.Logger;
6464
import org.slf4j.LoggerFactory;
6565

66+
import java.io.IOException;
6667
import java.io.Serializable;
68+
import java.nio.file.Files;
69+
import java.nio.file.Path;
70+
import java.nio.file.Paths;
6771
import java.util.Arrays;
6872
import java.util.Collection;
6973
import java.util.Collections;
@@ -489,6 +493,9 @@ private void handleCreateTableEvent(CreateTableOperatorEvent createTableOperator
489493
String tablePath = tableConfig.getString(FlinkOptions.PATH);
490494
pathToTableId.put(tablePath, tId);
491495

496+
// Create physical directory for Hudi table before initializing
497+
createHudiTablePath(tableConfig);
498+
492499
StreamerUtil.initTableIfNotExists(tableConfig);
493500
HoodieFlinkWriteClient<?> writeClient =
494501
FlinkWriteClients.createWriteClient(tableConfig);
@@ -920,6 +927,26 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
920927
}
921928

922929
// --- Helper Methods ---
930+
931+
/**
932+
* Creates the physical directory for a Hudi table if it doesn't exist. This must be done on the
933+
* coordinator side to avoid race conditions when multiple task managers try to create the same
934+
* directory simultaneously.
935+
*
936+
* @param config The table-specific configuration containing the path
937+
* @throws IOException if directory creation fails
938+
*/
939+
private static void createHudiTablePath(Configuration config) throws IOException {
940+
String tablePath = config.get(FlinkOptions.PATH);
941+
Path path = Paths.get(tablePath);
942+
if (!Files.exists(path)) {
943+
Files.createDirectories(path);
944+
LOG.info("Created physical directory for Hudi table at: {}", tablePath);
945+
} else {
946+
LOG.debug("Hudi table directory already exists at: {}", tablePath);
947+
}
948+
}
949+
923950
private Configuration createTableSpecificConfig(TableId tableId) {
924951
Configuration tableConfig = new Configuration(baseConfig);
925952
String coordinatorPath = baseConfig.getString(FlinkOptions.PATH);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/HudiRecordEventSerializer.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.hudi.sink.event;
1919

20-
import org.apache.flink.cdc.common.data.RecordData;
2120
import org.apache.flink.cdc.common.event.CreateTableEvent;
2221
import org.apache.flink.cdc.common.event.DataChangeEvent;
2322
import org.apache.flink.cdc.common.event.Event;
@@ -32,7 +31,6 @@
3231

3332
import java.time.ZoneId;
3433
import java.util.HashMap;
35-
import java.util.List;
3634
import java.util.Map;
3735

3836
/**
@@ -56,9 +54,6 @@ public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> {
5654
/** Schema cache per table - populated from CreateTableEvent and SchemaChangeEvent. */
5755
private final Map<TableId, Schema> schemaMaps;
5856

59-
/** Field getter cache per table for efficient conversion. */
60-
private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterCache;
61-
6257
/** RowDataKeyGen cache per table for key and partition extraction. */
6358
private final Map<TableId, RowDataKeyGen> keyGenCache;
6459

@@ -67,7 +62,6 @@ public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> {
6762

6863
public HudiRecordEventSerializer(ZoneId zoneId) {
6964
this.schemaMaps = new HashMap<>();
70-
this.fieldGetterCache = new HashMap<>();
7165
this.keyGenCache = new HashMap<>();
7266
this.zoneId = zoneId;
7367
}
@@ -87,8 +81,7 @@ public HoodieFlinkInternalRow serialize(Event event, String fileId, String insta
8781
if (event instanceof CreateTableEvent) {
8882
CreateTableEvent createTableEvent = (CreateTableEvent) event;
8983
schemaMaps.put(createTableEvent.tableId(), createTableEvent.getSchema());
90-
// Clear caches for this table since schema changed
91-
fieldGetterCache.remove(createTableEvent.tableId());
84+
// Clear keyGenCache for this table since schema changed
9285
keyGenCache.remove(createTableEvent.tableId());
9386
// Schema events don't produce records
9487
return null;
@@ -102,8 +95,7 @@ public HoodieFlinkInternalRow serialize(Event event, String fileId, String insta
10295
Schema newSchema =
10396
SchemaUtils.applySchemaChangeEvent(existingSchema, schemaChangeEvent);
10497
schemaMaps.put(schemaChangeEvent.tableId(), newSchema);
105-
// Clear caches for this table since schema changed
106-
fieldGetterCache.remove(schemaChangeEvent.tableId());
98+
// Clear keyGenCache for this table since schema changed
10799
keyGenCache.remove(schemaChangeEvent.tableId());
108100
}
109101
// Schema events don't produce records
@@ -180,7 +172,6 @@ public void setSchema(TableId tableId, Schema schema) {
180172
schemaMaps.put(tableId, schema);
181173
// Clear cached field getters and key gens for this table so they get recreated with the new
182174
// schema
183-
fieldGetterCache.remove(tableId);
184175
keyGenCache.remove(tableId);
185176
}
186177
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/EventProcessorFunction.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
package org.apache.flink.cdc.connectors.hudi.sink.function;
1919

2020
import org.apache.flink.cdc.common.event.DataChangeEvent;
21+
import org.apache.flink.cdc.common.event.Event;
2122
import org.apache.flink.cdc.common.event.FlushEvent;
2223
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
24+
import org.apache.flink.streaming.api.functions.ProcessFunction;
25+
import org.apache.flink.table.data.RowData;
26+
import org.apache.flink.util.Collector;
2327

2428
/**
2529
* Template interface for processing CDC events in a standardized way. Provides a consistent event
@@ -29,22 +33,34 @@
2933
* clear separation of concerns:
3034
*
3135
* <ul>
32-
* <li>{@link #processDataChange(DataChangeEvent)} - Handles INSERT/UPDATE/DELETE operations
36+
* <li>{@link #processDataChange(DataChangeEvent, ProcessFunction.Context, Collector)} - Handles
37+
* DML operations (INSERT, UPDATE, DELETE)
3338
* <li>{@link #processSchemaChange(SchemaChangeEvent)} - Handles DDL operations (CREATE TABLE, ADD
3439
* COLUMN, etc.)
3540
* <li>{@link #processFlush(FlushEvent)} - Handles coordinated flushing of buffered data
3641
* </ul>
42+
*
43+
* <p>Implementations of this interface are used in multi-table CDC pipelines to route and process
44+
* events from different source tables to their corresponding Hudi tables.
45+
*
46+
* @see MultiTableEventStreamWriteFunction
3747
*/
3848
public interface EventProcessorFunction {
3949

4050
/**
41-
* Process data change events (INSERT/UPDATE/DELETE). This is where actual data is buffered and
42-
* written.
51+
* Process data change events (INSERT, UPDATE, DELETE operations).
4352
*
44-
* @param event The data change event
45-
* @throws Exception if processing fails
53+
* <p>This method handles DML operations from the CDC stream, converting them into Hudi records
54+
* and collecting them for writing to the appropriate table.
55+
*
56+
* @param event The data change event containing the operation type and data
57+
* @param ctx The process function context for accessing runtime information
58+
* @param out The collector for emitting processed RowData records
4659
*/
47-
void processDataChange(DataChangeEvent event) throws Exception;
60+
void processDataChange(
61+
DataChangeEvent event,
62+
ProcessFunction<Event, RowData>.Context ctx,
63+
Collector<RowData> out);
4864

4965
/**
5066
* Process schema change events (CREATE TABLE, ADD COLUMN, etc.).

0 commit comments

Comments
 (0)