File tree Expand file tree Collapse file tree 2 files changed +8
-6
lines changed
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink Expand file tree Collapse file tree 2 files changed +8
-6
lines changed Original file line number Diff line number Diff line change @@ -301,21 +301,22 @@ private String extractRecordKey(
301301 * <li>Formats them as "field1=value1/field2=value2" (Hive-style partitioning)
302302 * </ul>
303303 *
304- * <p>If no partition keys are defined, returns "default" .
304+ * <p>If no partition keys are defined, returns empty string (for unpartitioned tables) .
305305 *
306306 * @param event The DataChangeEvent to extract partition from
307307 * @param schema The table schema containing partition key definitions
308308 * @param fieldGetters Field getters for extracting values (not used currently, may be needed
309309 * for optimization)
310- * @return The partition path string
310+ * @return The partition path string (empty string for unpartitioned tables)
311311 */
312312 private String extractPartitionPath (
313313 DataChangeEvent event , Schema schema , List <RecordData .FieldGetter > fieldGetters ) {
314314
315315 // Check if schema has partition keys defined
316316 List <String > partitionKeys = schema .partitionKeys ();
317317 if (partitionKeys == null || partitionKeys .isEmpty ()) {
318- return "default" ;
318+ // Hudi convention: unpartitioned tables use empty string, not "default"
319+ return "" ;
319320 }
320321
321322 // Get the record data to extract from (after for INSERT/UPDATE/REPLACE, before for DELETE)
Original file line number Diff line number Diff line change @@ -355,17 +355,18 @@ private static String extractRecordKeyFromDataChangeEvent(
355355 * <li>Formats them as "field1=value1/field2=value2" (Hive-style partitioning)
356356 * </ul>
357357 *
358- * <p>If no partition keys are defined, returns "default" .
358+ * <p>If no partition keys are defined, returns empty string (for unpartitioned tables) .
359359 *
360360 * @param dataChangeEvent The DataChangeEvent to extract partition from
361361 * @param schema The table schema containing partition key definitions
362- * @return The partition path string
362+ * @return The partition path string (empty string for unpartitioned tables)
363363 */
364364 private static String extractPartitionPathFromDataChangeEvent (
365365 DataChangeEvent dataChangeEvent , Schema schema ) {
366366 List <String > partitionKeys = schema .partitionKeys ();
367367 if (partitionKeys == null || partitionKeys .isEmpty ()) {
368- return "default" ;
368+ // Hudi convention: unpartitioned tables use empty string, not "default"
369+ return "" ;
369370 }
370371
371372 // Get the record data to extract from (after for INSERT/UPDATE/REPLACE, before for DELETE)
You can’t perform that action at this time.
0 commit comments