Skip to content

Conversation

@watsonjo737
Copy link

https://issues.apache.org/jira/browse/FLINK-38522

What is the purpose of the change

When using StartupOptions.timestamp(), the MySQL CDC connector calls DebeziumUtils.findBinlogOffset() to locate the appropriate binlog position. This method creates a short-lived BinaryLogClient to scan binlog files, but does not configure SSL mode on the client.

If the MySQL server requires SSL connections (e.g., require_secure_transport=ON or SSL mode configured in connection parameters), the connection fails because the BinaryLogClient attempts an unencrypted connection.

This Pull Request uses the ssl mode provided as part of the connection config for the BinaryLogClient as well to prevent this issue in cases where mysql is configured for require_secure_transport.

Changes

Set SSLMode to match what was provided in the connectionConfig for BinaryLogClient
Add unit test to ensure SSLMode is converted correctly.

Verifying this change

  1. Configure MySQL with require_secure_transport=ON or SSL mode requirements
  2. Create a MySQL CDC source with StartupOptions.timestamp(timestampMs)
  3. Start the Flink job
  4. Ensure that the source is able to read records from the specified timestamp.

…earching for binlog offset for starting mysql cdc from timestamp
… test, throw runtime exception when we have invalid ssl mode
@lvyanquan lvyanquan added this to the V3.6.0 milestone Dec 5, 2025
public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
Copy link
Contributor

@lvyanquan lvyanquan Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can centralize the SSL configuration in the DebeziumUtils#createBinaryClient method,


and based on the implementation of MySqlStreamingChangeEventSource, we also need to add the following code to set SslSocketFactory:
client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
if (connectorConfig.sslModeEnabled()) {
SSLSocketFactory sslSocketFactory =
getBinlogSslSocketFactory(connectorConfig, connection);
if (sslSocketFactory != null) {
client.setSslSocketFactory(sslSocketFactory);
}
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review and recommendation @lvyanquan !
I have now updated this PR:

  • ssl config now centralised under DebeziumUtils#createBinaryClient
  • updated BinlogSplitReader and SnapshotSplitReader to use the new version of createBinaryClient
  • added sslSocketFactory config, similar to what is used in flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
  • updated iTest to include testing sslSocketFactory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants