Skip to content

Commit 1ba8bd8

Browse files
committed
Checkpoint 46 - Add partition path extractor
1 parent 6ac41d3 commit 1ba8bd8

File tree

4 files changed

+208
-8
lines changed

4 files changed

+208
-8
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/bucket/BucketAssignOperator.java

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.streaming.api.operators.Output;
3939
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
4040
import org.apache.flink.streaming.runtime.tasks.StreamTask;
41+
4142
import org.apache.hudi.common.util.Functions;
4243
import org.apache.hudi.common.util.hash.BucketIndexUtil;
4344
import org.apache.hudi.configuration.FlinkOptions;
@@ -174,9 +175,9 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
174175
}
175176

176177
/**
177-
* Calculate which task index should handle this event by:
178-
* 1. Calculating the bucket number (0 to numBuckets-1) based on record key
179-
* 2. Using partitionIndexFunc to map bucket -> task index for balanced distribution
178+
* Calculate which task index should handle this event by: 1. Calculating the bucket number (0
179+
* to numBuckets-1) based on record key 2. Using partitionIndexFunc to map bucket -> task index
180+
* for balanced distribution
180181
*
181182
* @param event The DataChangeEvent to calculate task index for
182183
* @return The task index (0 to parallelism-1) that should handle this event
@@ -252,9 +253,10 @@ private int calculateTaskIndex(DataChangeEvent event) {
252253
String tableIndexKeyFields = String.join(",", primaryKeys);
253254
int bucketNumber = BucketIdentifier.getBucketId(recordKey, tableIndexKeyFields, numBuckets);
254255

256+
// Extract partition path from the event
257+
String partition = extractPartitionPath(event, finalSchema, fieldGetters);
258+
255259
// Use partition function to map bucket to task index for balanced distribution
256-
// partition is "default" since we're not using Hudi partition fields in this context
257-
String partition = "default";
258260
int taskIndex = partitionIndexFunc.apply(numBuckets, partition, bucketNumber);
259261

260262
return taskIndex;
@@ -288,4 +290,123 @@ private String extractRecordKey(
288290

289291
return String.join(",", recordKeyPairs);
290292
}
293+
294+
/**
295+
* Extract partition path from the DataChangeEvent based on schema partition keys.
296+
*
297+
* <p>If the schema has partition keys defined:
298+
*
299+
* <ul>
300+
* <li>Extracts partition field values from the record data
301+
* <li>Formats them as "field1=value1/field2=value2" (Hive-style partitioning)
302+
* </ul>
303+
*
304+
* <p>If no partition keys are defined, returns "default".
305+
*
306+
* @param event The DataChangeEvent to extract partition from
307+
* @param schema The table schema containing partition key definitions
308+
* @param fieldGetters Field getters for extracting values (not used currently, may be needed
309+
* for optimization)
310+
* @return The partition path string
311+
*/
312+
private String extractPartitionPath(
313+
DataChangeEvent event, Schema schema, List<RecordData.FieldGetter> fieldGetters) {
314+
315+
// Check if schema has partition keys defined
316+
List<String> partitionKeys = schema.partitionKeys();
317+
if (partitionKeys == null || partitionKeys.isEmpty()) {
318+
return "default";
319+
}
320+
321+
// Get the record data to extract from (after for INSERT/UPDATE/REPLACE, before for DELETE)
322+
RecordData recordData;
323+
switch (event.op()) {
324+
case INSERT:
325+
case UPDATE:
326+
case REPLACE:
327+
recordData = event.after();
328+
break;
329+
case DELETE:
330+
recordData = event.before();
331+
break;
332+
default:
333+
throw new IllegalArgumentException("Unsupported operation: " + event.op());
334+
}
335+
336+
if (recordData == null) {
337+
throw new IllegalStateException(
338+
"Cannot extract partition path: " + event.op() + " event has null data");
339+
}
340+
341+
// Extract partition values and build partition path
342+
List<String> partitionParts = new ArrayList<>(partitionKeys.size());
343+
for (String partitionKey : partitionKeys) {
344+
int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
345+
if (fieldIndex == -1) {
346+
throw new IllegalStateException(
347+
"Partition key field '"
348+
+ partitionKey
349+
+ "' not found in schema for table "
350+
+ event.tableId());
351+
}
352+
353+
// Get field value
354+
Object fieldValue;
355+
if (recordData.isNullAt(fieldIndex)) {
356+
// Handle null partition values - use "__HIVE_DEFAULT_PARTITION__" as per Hive
357+
// convention
358+
fieldValue = "__HIVE_DEFAULT_PARTITION__";
359+
} else {
360+
// Get the field value based on the field type
361+
DataType fieldType = schema.getColumns().get(fieldIndex).getType();
362+
fieldValue = getFieldValue(recordData, fieldIndex, fieldType);
363+
}
364+
365+
// Format as "key=value" (Hive-style partitioning)
366+
partitionParts.add(partitionKey + "=" + fieldValue);
367+
}
368+
369+
// Join partition parts with "/"
370+
return String.join("/", partitionParts);
371+
}
372+
373+
/**
374+
* Extract field value from RecordData based on field type. This is a simplified version -
375+
* complex types may need additional handling.
376+
*/
377+
private Object getFieldValue(RecordData recordData, int fieldIndex, DataType fieldType) {
378+
switch (fieldType.getTypeRoot()) {
379+
case CHAR:
380+
case VARCHAR:
381+
return recordData.getString(fieldIndex).toString();
382+
case BOOLEAN:
383+
return recordData.getBoolean(fieldIndex);
384+
case TINYINT:
385+
return recordData.getByte(fieldIndex);
386+
case SMALLINT:
387+
return recordData.getShort(fieldIndex);
388+
case INTEGER:
389+
case DATE:
390+
return recordData.getInt(fieldIndex);
391+
case BIGINT:
392+
return recordData.getLong(fieldIndex);
393+
case FLOAT:
394+
return recordData.getFloat(fieldIndex);
395+
case DOUBLE:
396+
return recordData.getDouble(fieldIndex);
397+
case TIMESTAMP_WITHOUT_TIME_ZONE:
398+
return recordData.getTimestamp(
399+
fieldIndex,
400+
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision(fieldType));
401+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
402+
return recordData.getLocalZonedTimestampData(
403+
fieldIndex,
404+
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision(fieldType));
405+
default:
406+
// For other types, create a field getter and use it
407+
RecordData.FieldGetter fieldGetter =
408+
RecordData.createFieldGetter(fieldType, fieldIndex);
409+
return fieldGetter.getFieldOrNull(recordData);
410+
}
411+
}
291412
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/EventBucketStreamWriteFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.configuration.Configuration;
3131
import org.apache.flink.runtime.state.FunctionInitializationContext;
3232
import org.apache.flink.table.types.logical.RowType;
33+
3334
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
3435
import org.apache.hudi.common.util.Functions;
3536
import org.apache.hudi.common.util.hash.BucketIndexUtil;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.flink.table.data.RowData;
4646
import org.apache.flink.table.types.logical.RowType;
4747
import org.apache.flink.util.Collector;
48+
4849
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
4950
import org.apache.hudi.configuration.FlinkOptions;
5051
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/RowDataUtils.java

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,8 @@ public static HoodieFlinkInternalRow convertDataChangeEventToHoodieFlinkInternal
262262
// Extract record key from primary key fields
263263
String recordKey = extractRecordKeyFromDataChangeEvent(dataChangeEvent, schema);
264264

265-
// Default partition path - in real implementation this would be based on configured
266-
// partition fields
267-
String partitionPath = "default";
265+
// Extract partition path from partition key fields
266+
String partitionPath = extractPartitionPathFromDataChangeEvent(dataChangeEvent, schema);
268267

269268
return convertDataChangeEventToHoodieFlinkInternalRow(
270269
dataChangeEvent, schema, zoneId, recordKey, partitionPath, fileId, instantTime);
@@ -345,6 +344,84 @@ private static String extractRecordKeyFromDataChangeEvent(
345344
return String.join(",", recordKeyValues);
346345
}
347346

347+
/**
348+
* Extract partition path from DataChangeEvent based on partition key fields in schema.
349+
*
350+
* <p>If the schema has partition keys defined:
351+
*
352+
* <ul>
353+
* <li>Extracts partition field values from the record data
354+
* <li>Formats them as "field1=value1/field2=value2" (Hive-style partitioning)
355+
* </ul>
356+
*
357+
* <p>If no partition keys are defined, returns "default".
358+
*
359+
* @param dataChangeEvent The DataChangeEvent to extract partition from
360+
* @param schema The table schema containing partition key definitions
361+
* @return The partition path string
362+
*/
363+
private static String extractPartitionPathFromDataChangeEvent(
364+
DataChangeEvent dataChangeEvent, Schema schema) {
365+
List<String> partitionKeys = schema.partitionKeys();
366+
if (partitionKeys == null || partitionKeys.isEmpty()) {
367+
return "default";
368+
}
369+
370+
// Get the record data to extract from (after for INSERT/UPDATE/REPLACE, before for DELETE)
371+
RecordData recordData;
372+
switch (dataChangeEvent.op()) {
373+
case INSERT:
374+
case UPDATE:
375+
case REPLACE:
376+
recordData = dataChangeEvent.after();
377+
break;
378+
case DELETE:
379+
recordData = dataChangeEvent.before();
380+
break;
381+
default:
382+
throw new IllegalArgumentException(
383+
"Unsupported operation: " + dataChangeEvent.op());
384+
}
385+
386+
if (recordData == null) {
387+
throw new IllegalStateException(
388+
"Cannot extract partition path: "
389+
+ dataChangeEvent.op()
390+
+ " event has null data");
391+
}
392+
393+
// Extract partition values and build partition path
394+
List<String> partitionParts = new ArrayList<>(partitionKeys.size());
395+
for (String partitionKey : partitionKeys) {
396+
int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
397+
if (fieldIndex == -1) {
398+
throw new IllegalStateException(
399+
"Partition key field '"
400+
+ partitionKey
401+
+ "' not found in schema for table "
402+
+ dataChangeEvent.tableId());
403+
}
404+
405+
// Get field value
406+
Object fieldValue;
407+
if (recordData.isNullAt(fieldIndex)) {
408+
// Handle null partition values - use "__HIVE_DEFAULT_PARTITION__" as per Hive
409+
// convention
410+
fieldValue = "__HIVE_DEFAULT_PARTITION__";
411+
} else {
412+
// Get the field value based on the field type
413+
DataType fieldType = schema.getColumns().get(fieldIndex).getType();
414+
fieldValue = getFieldValue(recordData, fieldIndex, fieldType);
415+
}
416+
417+
// Format as "key=value" (Hive-style partitioning)
418+
partitionParts.add(partitionKey + "=" + fieldValue);
419+
}
420+
421+
// Join partition parts with "/"
422+
return String.join("/", partitionParts);
423+
}
424+
348425
/** Get field value from RecordData based on field type. */
349426
private static Object getFieldValue(RecordData recordData, int fieldIndex, DataType fieldType) {
350427
switch (fieldType.getTypeRoot()) {

0 commit comments

Comments
 (0)