From fb2e77e6ac07968a8a6fea0ed092aea6135e2f34 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Tue, 4 Nov 2025 15:50:37 +0000 Subject: [PATCH 1/4] Support for transaction boundaries in mysql connector --- .../source/reader/MySqlRecordEmitter.java | 3 + .../mysql/source/utils/RecordUtils.java | 14 ++ .../source/reader/MySqlRecordEmitterTest.java | 208 +++++++++++++++++- 3 files changed, 218 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 449e7f608f2..06801a659b7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -102,6 +102,9 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); + } else if (RecordUtils.isTransactionMetadataEvent(element)) { + updateStartingOffsetForSplit(splitState, element); + emitElement(element, output); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index e6848f1c4c8..2a323cc51fa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -76,6 +76,8 @@ private RecordUtils() {} "io.debezium.connector.mysql.SchemaChangeKey"; public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat"; + public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME = + "io.debezium.connector.common.TransactionMetadataKey"; private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); /** Converts a {@link ResultSet} row to an array of Objects. */ @@ -342,6 +344,18 @@ public static boolean isHeartbeatEvent(SourceRecord record) { && SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name()); } + /** + * Check whether the given source record is a transaction metadata event (BEGIN or END). + * + *

Transaction events are emitted by Debezium to mark transaction boundaries when + * provide.transaction.metadata is enabled. + */ + public static boolean isTransactionMetadataEvent(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null + && SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + /** * Return the finished snapshot split information. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 5553b1ba8ce..4c530641426 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -36,11 +37,17 @@ import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC; import static io.debezium.connector.mysql.MySqlConnectorConfig.SERVER_NAME; @@ -105,14 +112,201 @@ public TypeInformation getProducedType() { false); } + @Test + void testTransactionBeginEventHandling() throws Exception { + // Create a transaction BEGIN event + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + // Verify it's detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + // Create emitter and split state + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + // Emit the transaction BEGIN event + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + @Test + void testTransactionEndEventHandling() throws Exception { + // Create a transaction END event + SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); + + // Verify it's detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) + .isTrue(); + + // Create emitter and split state + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + // Emit the transaction END event + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEndEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionEndEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + } + + + @Test + void testNonTransactionEventNotDetected() { + // Create a regular data change event + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct value = new Struct(valueSchema).put("op", "c"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord dataRecord = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "test.table", + keySchema, + key, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(dataRecord)).isFalse(); + } + + @Test + void testTransactionEventWithoutKeySchemaNotDetected() { + // Create a record without a key schema (should not be detected as transaction event) + Schema valueSchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("status", Schema.STRING_SCHEMA) + .build(); + + Struct value = new Struct(valueSchema).put("status", "BEGIN"); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", 100L); + + SourceRecord record = new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + "transaction.topic", + null, // No key schema + null, + valueSchema, + value); + + // Verify it's NOT detected as a transaction metadata event + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); + } + private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( - new MySqlBinlogSplit( - "binlog-split", - BinlogOffset.ofEarliest(), - BinlogOffset.ofNonStopping(), - Collections.emptyList(), - Collections.emptyMap(), - 0)); + new MySqlBinlogSplit( + "binlog-split", + BinlogOffset.ofEarliest(), + BinlogOffset.ofNonStopping(), + Collections.emptyList(), + Collections.emptyMap(), + 0)); } + + /** + * Helper method to create a MySqlRecordEmitter that counts emitted records. + */ + private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { + return new MySqlRecordEmitter<>( + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + counter.incrementAndGet(); + out.collect("transaction-event"); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }, + new MySqlSourceReaderMetrics( + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false); + } + + private SourceRecord createTransactionMetadataEvent( + String status, String transactionId, long position) { + Schema keySchema = SchemaBuilder.struct() + .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) + .field("id", Schema.STRING_SCHEMA) + .build(); + + Schema valueSchema = SchemaBuilder.struct() + .name("io.debezium.connector.common.TransactionMetadataValue") + .field("status", Schema.STRING_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("event_count", Schema.OPTIONAL_INT64_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .build(); + + Struct key = new Struct(keySchema).put("id", transactionId); + + Struct value = new Struct(valueSchema) + .put("status", status) + .put("id", transactionId) + .put("ts_ms", System.currentTimeMillis()); + + if ("END".equals(status)) { + value.put("event_count", 5L); + } + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + offset.put("transaction_id", transactionId); + + return new SourceRecord( + Collections.singletonMap("server", "mysql_binlog_source"), + offset, + "mysql_binlog_source.transaction", + keySchema, + key, + valueSchema, + value); + } + } From fbd294e263aa4cf3613e0f45fbc162a6a364c697 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:14:32 +0000 Subject: [PATCH 2/4] add option to enable/disable metadata events --- .../source/reader/MySqlPipelineRecordEmitter.java | 3 ++- .../cdc/connectors/mysql/source/MySqlSource.java | 3 ++- .../connectors/mysql/source/MySqlSourceBuilder.java | 6 ++++++ .../mysql/source/config/MySqlSourceConfig.java | 7 +++++++ .../source/config/MySqlSourceConfigFactory.java | 10 ++++++++++ .../mysql/source/reader/MySqlRecordEmitter.java | 12 +++++++++--- .../mysql/source/reader/MySqlRecordEmitterTest.java | 4 +++- .../mysql/source/reader/MySqlSourceReaderTest.java | 5 +++-- 8 files changed, 42 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index c946b9e29ce..2bfee92ee8e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -92,7 +92,8 @@ public MySqlPipelineRecordEmitter( super( debeziumDeserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges()); + sourceConfig.isIncludeSchemaChanges(), + false); // Explicitly disable transaction metadata events this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index cb06cc45a6d..b5acaf95bf8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -131,7 +131,8 @@ public static MySqlSourceBuilder builder() { new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges())); + sourceConfig.isIncludeSchemaChanges(), + sourceConfig.isIncludeTransactionMetadataEvents())); } MySqlSource( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index caf316d1b4a..fbda06bccd9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -198,6 +198,12 @@ public MySqlSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceBuilder includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents); + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..c0adf7eb214 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -62,6 +62,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorUpper; private final double distributionFactorLower; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final boolean scanNewlyAddedTableEnabled; private final boolean closeIdleReaders; private final Properties jdbcProperties; @@ -99,6 +100,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents, boolean scanNewlyAddedTableEnabled, boolean closeIdleReaders, Properties dbzProperties, @@ -128,6 +130,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.closeIdleReaders = closeIdleReaders; this.dbzProperties = checkNotNull(dbzProperties); @@ -227,6 +230,10 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + public boolean isIncludeTransactionMetadataEvents() { + return includeTransactionMetadataEvents; + } + public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..8715c8b6f61 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable { private double distributionFactorLower = MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; + private boolean includeTransactionMetadataEvents = false; private boolean scanNewlyAddedTableEnabled = false; private boolean closeIdleReaders = false; private Properties jdbcProperties; @@ -235,6 +236,12 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange return this; } + /** Whether the {@link MySqlSource} should output the transaction metadata events or not. */ + public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) { + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; @@ -359,6 +366,8 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not props.setProperty("include.schema.changes", String.valueOf(true)); + // enable transaction metadata if includeTransactionMetadataEvents is true + props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones @@ -412,6 +421,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + includeTransactionMetadataEvents, scanNewlyAddedTableEnabled, closeIdleReaders, props, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 06801a659b7..6979a4486b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -53,15 +53,18 @@ public class MySqlRecordEmitter implements RecordEmitter debeziumDeserializationSchema; private final MySqlSourceReaderMetrics sourceReaderMetrics; private final boolean includeSchemaChanges; + private final boolean includeTransactionMetadataEvents; private final OutputCollector outputCollector; public MySqlRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, - boolean includeSchemaChanges) { + boolean includeSchemaChanges, + boolean includeTransactionMetadataEvents) { this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; + this.includeTransactionMetadataEvents = includeTransactionMetadataEvents; this.outputCollector = new OutputCollector<>(); } @@ -102,9 +105,12 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); + //emitElement(element, output); } else if (RecordUtils.isTransactionMetadataEvent(element)) { - updateStartingOffsetForSplit(splitState, element); - emitElement(element, output); + updateStartingOffsetForSplit(splitState, element); + if (includeTransactionMetadataEvents) { + emitElement(element, output); + } } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 4c530641426..07a0d35d532 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -109,6 +109,7 @@ public TypeInformation getProducedType() { }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), + false, false); } @@ -265,7 +266,8 @@ public TypeInformation getProducedType() { }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), - false); + false, + true); } private SourceRecord createTransactionMetadataEvent( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 7b8dfdcdbb2..509e45872a0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -573,7 +573,8 @@ private MySqlSourceReader createReader( : new MySqlRecordEmitter<>( new ForwardDeserializeSchema(), new MySqlSourceReaderMetrics(readerContext.metricGroup()), - configuration.isIncludeSchemaChanges()); + configuration.isIncludeSchemaChanges(), + configuration.isIncludeTransactionMetadataEvents()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); return new MySqlSourceReader<>( @@ -740,7 +741,7 @@ public MysqlLimitedRecordEmitter( MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { - super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges); + super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges, false); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; From 46581dc111242b6b058a70957d7a8e66b7523484 Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:32:29 +0000 Subject: [PATCH 3/4] test updates --- .../source/reader/MySqlRecordEmitterTest.java | 193 ++++++++++++++++-- 1 file changed, 172 insertions(+), 21 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java index 07a0d35d532..38b54c61f36 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java @@ -96,40 +96,98 @@ record -> { private MySqlRecordEmitter createRecordEmitter() { return new MySqlRecordEmitter<>( - new DebeziumDeserializationSchema() { - @Override - public void deserialize(SourceRecord record, Collector out) { - throw new UnsupportedOperationException(); - } + new DebeziumDeserializationSchema<>() { + @Override + public void deserialize(SourceRecord record, Collector out) { + throw new UnsupportedOperationException(); + } - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(Void.class); - } - }, + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Void.class); + } + }, new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, false); } + @Test + void testTransactionMetadataEventsDisabledByDefault() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated (this should always happen) + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was NOT emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testTransactionMetadataEventsEnabledExplicitly() throws Exception { + SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-456", 150L); + + Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) + .isTrue(); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, true); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionBeginEvent), + readerOutput, + splitState); + + // Verify the offset was updated + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(transactionBeginEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isNotEqualTo(offsetBeforeEmit) + .isEqualByComparingTo(expectedOffset); + + // Verify the event was emitted (because includeTransactionMetadataEvents=true) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + @Test void testTransactionBeginEventHandling() throws Exception { - // Create a transaction BEGIN event SourceRecord transactionBeginEvent = createTransactionMetadataEvent("BEGIN", "tx-123", 100L); - // Verify it's detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionBeginEvent)) .isTrue(); - // Create emitter and split state AtomicInteger emittedRecordsCount = new AtomicInteger(0); MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); BinlogOffset offsetBeforeEmit = splitState.getStartingOffset(); - // Emit the transaction BEGIN event TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( SourceRecords.fromSingleRecord(transactionBeginEvent), @@ -149,19 +207,15 @@ void testTransactionBeginEventHandling() throws Exception { @Test void testTransactionEndEventHandling() throws Exception { - // Create a transaction END event SourceRecord transactionEndEvent = createTransactionMetadataEvent("END", "tx-123", 200L); - // Verify it's detected as a transaction metadata event Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(transactionEndEvent)) .isTrue(); - // Create emitter and split state AtomicInteger emittedRecordsCount = new AtomicInteger(0); MySqlRecordEmitter recordEmitter = createRecordEmitterWithCounter(emittedRecordsCount); MySqlBinlogSplitState splitState = createBinlogSplitState(); - // Emit the transaction END event TestingReaderOutput readerOutput = new TestingReaderOutput<>(); recordEmitter.emitRecord( SourceRecords.fromSingleRecord(transactionEndEvent), @@ -181,7 +235,6 @@ void testTransactionEndEventHandling() throws Exception { @Test void testNonTransactionEventNotDetected() { - // Create a regular data change event Schema keySchema = SchemaBuilder.struct() .field("id", Schema.INT32_SCHEMA) .build(); @@ -211,7 +264,6 @@ void testNonTransactionEventNotDetected() { @Test void testTransactionEventWithoutKeySchemaNotDetected() { - // Create a record without a key schema (should not be detected as transaction event) Schema valueSchema = SchemaBuilder.struct() .name(RecordUtils.SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME) .field("status", Schema.STRING_SCHEMA) @@ -236,6 +288,64 @@ void testTransactionEventWithoutKeySchemaNotDetected() { Assertions.assertThat(RecordUtils.isTransactionMetadataEvent(record)).isFalse(); } + @Test + void testMultipleTransactionEventsWithDisabledConfig() throws Exception { + SourceRecord beginEvent = createTransactionMetadataEvent("BEGIN", "tx-789", 300L); + SourceRecord endEvent = createTransactionMetadataEvent("END", "tx-789", 400L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(beginEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(endEvent), + readerOutput, + splitState); + + // Verify offsets were updated but no events were emitted + BinlogOffset expectedOffset = RecordUtils.getBinlogPosition(endEvent); + Assertions.assertThat(splitState.getStartingOffset()) + .isNotNull() + .isEqualByComparingTo(expectedOffset); + + // Verify no events were emitted (because includeTransactionMetadataEvents=false) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(0); + Assertions.assertThat(readerOutput.getEmittedRecords()).isEmpty(); + } + + @Test + void testMixedEventsWithTransactionMetadataDisabled() throws Exception { + SourceRecord transactionEvent = createTransactionMetadataEvent("BEGIN", "tx-mixed", 500L); + SourceRecord dataEvent = createDataChangeEvent("test.table", 501L); + + AtomicInteger emittedRecordsCount = new AtomicInteger(0); + MySqlRecordEmitter recordEmitter = createRecordEmitterWithTransactionConfig(emittedRecordsCount, false); + MySqlBinlogSplitState splitState = createBinlogSplitState(); + + TestingReaderOutput readerOutput = new TestingReaderOutput<>(); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(transactionEvent), + readerOutput, + splitState); + + recordEmitter.emitRecord( + SourceRecords.fromSingleRecord(dataEvent), + readerOutput, + splitState); + + // Verify only data event was emitted (count=1, not 2) + Assertions.assertThat(emittedRecordsCount.get()).isEqualTo(1); + Assertions.assertThat(readerOutput.getEmittedRecords()).hasSize(1); + } + private MySqlBinlogSplitState createBinlogSplitState() { return new MySqlBinlogSplitState( new MySqlBinlogSplit( @@ -251,6 +361,13 @@ private MySqlBinlogSplitState createBinlogSplitState() { * Helper method to create a MySqlRecordEmitter that counts emitted records. */ private MySqlRecordEmitter createRecordEmitterWithCounter(AtomicInteger counter) { + return createRecordEmitterWithTransactionConfig(counter, true); + } + + /** + * Helper method to create a MySqlRecordEmitter with configurable transaction metadata events. + */ + private MySqlRecordEmitter createRecordEmitterWithTransactionConfig(AtomicInteger counter, boolean includeTransactionMetadataEvents) { return new MySqlRecordEmitter<>( new DebeziumDeserializationSchema<>() { @Override @@ -267,7 +384,7 @@ public TypeInformation getProducedType() { new MySqlSourceReaderMetrics( UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()), false, - true); + includeTransactionMetadataEvents); } private SourceRecord createTransactionMetadataEvent( @@ -311,4 +428,38 @@ private SourceRecord createTransactionMetadataEvent( value); } + private SourceRecord createDataChangeEvent(String topicName, long position) { + Schema keySchema = SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .build(); + Schema valueSchema = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field("after", SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .optional()) + .build(); + + Struct key = new Struct(keySchema).put("id", 1); + Struct after = new Struct(valueSchema.field("after").schema()) + .put("id", 1) + .put("name", "test"); + Struct value = new Struct(valueSchema) + .put("op", "c") + .put("after", after); + + Map offset = new HashMap<>(); + offset.put("file", "mysql-bin.000001"); + offset.put("pos", position); + + return new SourceRecord( + Collections.singletonMap("server", "mysql"), + offset, + topicName, + keySchema, + key, + valueSchema, + value); + } + } From c6ed5f3988ee268c8caf49ed58c973443a35b0fd Mon Sep 17 00:00:00 2001 From: Tejansh Rana Date: Mon, 24 Nov 2025 21:56:07 +0000 Subject: [PATCH 4/4] clean up --- .../cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 6979a4486b1..02d35380903 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -105,7 +105,6 @@ protected void processElement( emitElement(element, output); } else if (RecordUtils.isHeartbeatEvent(element)) { updateStartingOffsetForSplit(splitState, element); - //emitElement(element, output); } else if (RecordUtils.isTransactionMetadataEvent(element)) { updateStartingOffsetForSplit(splitState, element); if (includeTransactionMetadataEvents) {