Skip to content

Commit 5581640

Browse files
committed
Checkpoint 57 - Remove manual embedded timeline server management
1 parent 57133bf commit 5581640

File tree

3 files changed

+28
-119
lines changed

3 files changed

+28
-119
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,10 @@
3939
import org.apache.hudi.common.model.WriteOperationType;
4040
import org.apache.hudi.common.table.HoodieTableMetaClient;
4141
import org.apache.hudi.common.table.timeline.HoodieTimeline;
42-
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
4342
import org.apache.hudi.common.util.CommitUtils;
4443
import org.apache.hudi.common.util.Option;
4544
import org.apache.hudi.common.util.SerializationUtils;
4645
import org.apache.hudi.common.util.collection.Pair;
47-
import org.apache.hudi.config.HoodieWriteConfig;
4846
import org.apache.hudi.configuration.FlinkOptions;
4947
import org.apache.hudi.configuration.OptionsResolver;
5048
import org.apache.hudi.exception.HoodieException;
@@ -275,12 +273,6 @@ boolean shouldTriggerCompaction() {
275273
*/
276274
private transient SubtaskGateway[] gateways;
277275

278-
/**
279-
* A dedicated write client whose only job is to run the embedded timeline server. This ensures
280-
* there is only one timeline server for the entire job.
281-
*/
282-
private transient HoodieFlinkWriteClient<?> timelineServerClient;
283-
284276
/** A single-thread executor to handle instant time requests, mimicking the parent behavior. */
285277
private transient NonThrownExecutor instantRequestExecutor;
286278

@@ -328,20 +320,6 @@ public void start() throws Exception {
328320
// Initialize the gateways array to avoid NullPointerException when subtasks are ready.
329321
this.gateways = new SubtaskGateway[context.currentParallelism()];
330322

331-
// Initialize a single write client for the coordinator path.
332-
// Its primary role is to start and manage the embedded timeline server.
333-
try {
334-
// The baseConfig points to the dummy coordinator path.
335-
// A .hoodie directory is required for the timeline server to start.
336-
StreamerUtil.initTableIfNotExists(this.baseConfig);
337-
this.timelineServerClient = FlinkWriteClients.createWriteClient(this.baseConfig);
338-
LOG.info("Successfully started timeline server on coordinator.");
339-
} catch (Exception e) {
340-
LOG.error("Failed to start timeline server on coordinator.", e);
341-
context.failJob(e);
342-
return;
343-
}
344-
345323
// Re-initialize transient fields after deserialization from a Flink checkpoint.
346324
// When the coordinator is restored, the `tableContexts` map is deserialized, but all
347325
// `writeClient` fields within it will be null because they are transient.
@@ -903,9 +881,6 @@ private void scheduleTableServicesIfNeeded(TableId tableId, TableContext tableCo
903881

904882
@Override
905883
public void close() throws Exception {
906-
if (timelineServerClient != null) {
907-
timelineServerClient.close();
908-
}
909884
if (instantRequestExecutor != null) {
910885
instantRequestExecutor.close();
911886
}
@@ -948,9 +923,7 @@ private static void createHudiTablePath(Configuration config) throws IOException
948923

949924
private Configuration createTableSpecificConfig(TableId tableId) {
950925
Configuration tableConfig = new Configuration(baseConfig);
951-
String coordinatorPath = baseConfig.getString(FlinkOptions.PATH);
952-
// Use the same logic as MultiTableEventStreamWriteFunction to strip "/coordinator"
953-
String rootPath = coordinatorPath.split("/coordinator")[0];
926+
String rootPath = baseConfig.getString(FlinkOptions.PATH);
954927
String tablePath =
955928
String.format(
956929
"%s/%s/%s", rootPath, tableId.getSchemaName(), tableId.getTableName());
@@ -974,13 +947,6 @@ private Configuration createTableSpecificConfig(TableId tableId) {
974947
tableId);
975948
}
976949

977-
// Disable both embedded timeline server and metadata table for per-table clients.
978-
// The central coordinator manages the only timeline server.
979-
tableConfig.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false);
980-
981-
// Use memory-based file system view since each client is lightweight.
982-
tableConfig.setString(FileSystemViewStorageConfig.VIEW_TYPE.key(), "MEMORY");
983-
984950
return tableConfig;
985951
}
986952

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@
5252
import org.apache.flink.util.OutputTag;
5353

5454
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
55-
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
5655
import org.apache.hudi.configuration.FlinkOptions;
5756
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
5857
import org.apache.hudi.sink.event.WriteMetadataEvent;
5958
import org.apache.hudi.util.AvroSchemaConverter;
60-
import org.apache.hudi.util.ViewStorageProperties;
6159
import org.slf4j.Logger;
6260
import org.slf4j.LoggerFactory;
6361

@@ -107,7 +105,12 @@ public MultiTableEventStreamWriteFunction(Configuration config) {
107105

108106
@Override
109107
public void initializeState(FunctionInitializationContext context) throws Exception {
110-
super.initializeState(context);
108+
// NOTE: Do NOT call super.initializeState(context) here.
109+
// The parent class (AbstractStreamWriteFunction) expects to manage a single Hudi table
110+
// and tries to create a HoodieTableMetaClient during initialization.
111+
// MultiTableEventStreamWriteFunction is a dispatcher that manages multiple tables
112+
// dynamically, so it doesn't have a single table path. Each child function
113+
// (ExtendedBucketStreamWriteFunction) handles its own state initialization.
111114
this.functionInitializationContext = context;
112115

113116
// Initialize schema map before restoring state
@@ -523,28 +526,13 @@ private Configuration createTableSpecificConfig(TableId tableId) {
523526
AvroSchemaConverter.convertToSchema(rowType).toString();
524527
localTableConfig.set(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema);
525528

526-
String rootPath = this.config.get(FlinkOptions.PATH).split("/coordinator")[0];
527-
if (rootPath != null) {
528-
String tableBasePath =
529-
String.format(
530-
"%s/%s/%s",
531-
rootPath, tableId.getSchemaName(), tableId.getTableName());
532-
localTableConfig.set(FlinkOptions.PATH, tableBasePath);
533-
}
529+
String rootPath = this.config.get(FlinkOptions.PATH);
530+
String tableBasePath =
531+
String.format(
532+
"%s/%s/%s",
533+
rootPath, tableId.getSchemaName(), tableId.getTableName());
534+
localTableConfig.set(FlinkOptions.PATH, tableBasePath);
534535

535-
// Modify ViewStorageProperties to point to coordinator table
536-
FileSystemViewStorageConfig viewStorageConfig =
537-
ViewStorageProperties.loadFromProperties(
538-
this.config.get(FlinkOptions.PATH), localTableConfig);
539-
localTableConfig.setString(
540-
FileSystemViewStorageConfig.VIEW_TYPE.key(),
541-
viewStorageConfig.getStorageType().name());
542-
localTableConfig.setString(
543-
FileSystemViewStorageConfig.REMOTE_HOST_NAME.key(),
544-
viewStorageConfig.getRemoteViewServerHost());
545-
localTableConfig.setString(
546-
FileSystemViewStorageConfig.REMOTE_PORT_NUM.key(),
547-
viewStorageConfig.getRemoteViewServerPort() + "");
548536
return localTableConfig;
549537
});
550538
}
@@ -634,23 +622,27 @@ protected void flushRemaining(boolean endInput) {
634622

635623
@Override
636624
public void close() throws Exception {
637-
for (ExtendedBucketStreamWriteFunction func : tableFunctions.values()) {
638-
try {
639-
func.close();
640-
} catch (Exception e) {
641-
LOG.error("Failed to close table function", e);
625+
if (tableFunctions != null) {
626+
for (ExtendedBucketStreamWriteFunction func : tableFunctions.values()) {
627+
try {
628+
func.close();
629+
} catch (Exception e) {
630+
LOG.error("Failed to close table function", e);
631+
}
642632
}
643633
}
644634
super.close();
645635
}
646636

647637
public void endInput() {
648638
super.endInput();
649-
for (ExtendedBucketStreamWriteFunction func : tableFunctions.values()) {
650-
try {
651-
func.endInput();
652-
} catch (Exception e) {
653-
LOG.error("Failed to complete endInput for table function", e);
639+
if (tableFunctions != null) {
640+
for (ExtendedBucketStreamWriteFunction func : tableFunctions.values()) {
641+
try {
642+
func.endInput();
643+
} catch (Exception e) {
644+
LOG.error("Failed to complete endInput for table function", e);
645+
}
654646
}
655647
}
656648
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/operator/MultiTableWriteOperator.java

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.runtime.jobgraph.OperatorID;
2626
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
2727

28-
import org.apache.hudi.configuration.FlinkOptions;
2928
import org.apache.hudi.sink.common.AbstractWriteOperator;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
@@ -105,56 +104,8 @@ public void open() throws Exception {
105104
*/
106105
public static MultiTableWriteOperatorFactory<Event> getFactory(
107106
Configuration conf, String schemaOperatorUid) {
108-
// Create coordinator-specific configuration with dummy table settings
109-
// This satisfies the coordinator's requirement for table initialization
110-
Configuration coordinatorConfig = createCoordinatorConfig(conf);
111-
112107
LOG.info("Creating multi-table write operator factory with extended coordinator support");
113108
return MultiTableWriteOperatorFactory.instance(
114-
coordinatorConfig,
115-
new MultiTableWriteOperator(coordinatorConfig, schemaOperatorUid));
116-
}
117-
118-
/**
119-
* Creates a coordinator-specific configuration with dummy table settings. This satisfies the
120-
* coordinator's requirement for table initialization while actual table routing happens
121-
* dynamically based on incoming events. Uses deterministic naming to allow reuse of existing
122-
* coordinator tables.
123-
*/
124-
private static Configuration createCoordinatorConfig(Configuration originalConfig) {
125-
Configuration coordinatorConfig = new Configuration();
126-
coordinatorConfig.addAll(originalConfig);
127-
128-
// Create deterministic dummy table name based on base path hash for reusability
129-
String originalPath = coordinatorConfig.get(FlinkOptions.PATH, "default");
130-
String pathHash = String.valueOf(Math.abs(originalPath.hashCode()));
131-
String dummyTableName = "coordinator_" + pathHash;
132-
133-
coordinatorConfig.set(FlinkOptions.TABLE_NAME, dummyTableName);
134-
coordinatorConfig.set(FlinkOptions.DATABASE_NAME, "coordinator_db");
135-
136-
// Set deterministic path for coordinator table (allows reuse)
137-
String coordinatorPath = originalPath + "/coordinator/" + dummyTableName;
138-
coordinatorConfig.set(FlinkOptions.PATH, coordinatorPath);
139-
140-
// Set dummy Avro schema with a simple structure (id: int)
141-
String dummyAvroSchema =
142-
"{\n"
143-
+ " \"type\": \"record\",\n"
144-
+ " \"name\": \"coordinator_record\",\n"
145-
+ " \"fields\": [\n"
146-
+ " {\n"
147-
+ " \"name\": \"id\",\n"
148-
+ " \"type\": \"int\"\n"
149-
+ " }\n"
150-
+ " ]\n"
151-
+ "}";
152-
coordinatorConfig.set(FlinkOptions.SOURCE_AVRO_SCHEMA, dummyAvroSchema);
153-
154-
LOG.info(
155-
"Created coordinator config with reusable dummy table: coordinator_db.{} at path: {}",
156-
dummyTableName,
157-
coordinatorPath);
158-
return coordinatorConfig;
109+
conf, new MultiTableWriteOperator(conf, schemaOperatorUid));
159110
}
160111
}

0 commit comments

Comments
 (0)