Skip to content

Commit 36efee6

Browse files
authored
Merge pull request #261 from data-integrations/cherry_deb_upgrade
[cherry-pick][6.10][CDAP-20829] Upgrade debezium version to latest java 8 flavour
2 parents 0139f3d + 14f5930 commit 36efee6

File tree

11 files changed

+2388
-1571
lines changed

11 files changed

+2388
-1571
lines changed

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
2828
import io.cdap.delta.plugin.common.RuntimeArguments;
2929
import io.debezium.DebeziumException;
30-
import io.debezium.config.CommonConnectorConfig;
3130
import io.debezium.config.Configuration;
3231
import io.debezium.connector.mysql.MySqlConnector;
3332
import io.debezium.connector.mysql.MySqlConnectorConfig;
34-
import io.debezium.connector.mysql.MySqlJdbcContext;
3533
import io.debezium.connector.mysql.MySqlValueConverters;
3634
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
35+
import io.debezium.connector.mysql.legacy.MySqlJdbcContext;
3736
import io.debezium.embedded.EmbeddedEngine;
3837
import io.debezium.jdbc.JdbcConnection;
3938
import io.debezium.jdbc.JdbcValueConverters;
@@ -93,6 +92,7 @@ public void start(Offset offset) {
9392
jdbcDriverClass.getName(),
9493
jdbcDriverClass.getClassLoader());
9594
MySqlValueConverters.jdbcClassLoader = jdbcDriverClass.getClassLoader();
95+
MySqlAntlrDdlParser.jdbcClassLoader = jdbcDriverClass.getClassLoader();
9696

9797
// For MySQL, the unique table identifier in debezium is 'databaseName.tableName'
9898
Map<String, SourceTable> sourceTableMap = sourceTables.stream().collect(
@@ -124,6 +124,7 @@ public void start(Offset offset) {
124124
.with("database.history.store.only.monitored.tables.ddl", true)
125125
.with("table.include.list", String.join(",", sourceTableMap.keySet()))
126126
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only")
127+
.with("internal.implementation", "legacy")
127128
.with(MySqlConstantOffsetBackingStore.REPLICATION_CONNECTOR_NAME, replicationConnectorName);
128129

129130
if (config.getConsumerID() != null) {

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
169169

170170
DMLOperation.Type op;
171171
String opStr = val.get("op");
172-
if ("c".equals(opStr)) {
172+
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-snapshot-events
173+
// Snapshot events are considered as "r"
174+
if ("c".equals(opStr) || "r".equals(opStr)) {
173175
op = DMLOperation.Type.INSERT;
174176
} else if ("u".equals(opStr)) {
175177
op = DMLOperation.Type.UPDATE;

mysql-delta-plugins/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

Lines changed: 1145 additions & 1034 deletions
Large diffs are not rendered by default.

mysql-delta-plugins/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java

Lines changed: 269 additions & 221 deletions
Large diffs are not rendered by default.

mysql-delta-plugins/src/main/java/io/debezium/connector/mysql/antlr/MySqlAntlrDdlParser.java

Lines changed: 460 additions & 0 deletions
Large diffs are not rendered by default.

mysql-delta-plugins/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java renamed to mysql-delta-plugins/src/main/java/io/debezium/connector/mysql/legacy/MySqlJdbcContext.java

Lines changed: 120 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
*
44
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
55
*/
6-
package io.debezium.connector.mysql;
6+
package io.debezium.connector.mysql.legacy;
77

8+
import io.debezium.DebeziumException;
89
import io.debezium.config.CommonConnectorConfig;
910
import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
1011
import io.debezium.config.Configuration;
1112
import io.debezium.config.Configuration.Builder;
1213
import io.debezium.config.Field;
14+
import io.debezium.connector.mysql.GtidSet;
15+
import io.debezium.connector.mysql.MySqlConnectorConfig;
1316
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
17+
import io.debezium.connector.mysql.MySqlValueConverters;
1418
import io.debezium.jdbc.JdbcConfiguration;
1519
import io.debezium.jdbc.JdbcConnection;
1620
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
@@ -42,22 +46,19 @@ public class MySqlJdbcContext implements AutoCloseable {
4246
* source.
4347
*/
4448
public static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&" +
45-
"nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&" +
49+
"nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&" +
4650
"characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
4751
protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode";
48-
52+
protected static final Logger LOGGER = LoggerFactory.getLogger(MySqlJdbcContext.class);
4953
private static final String SQL_SHOW_SYSTEM_VARIABLES = "SHOW VARIABLES";
5054
private static final String SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET = "SHOW VARIABLES WHERE Variable_name IN " +
5155
"('character_set_server','collation_server')";
5256
private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = "SHOW SESSION STATUS LIKE 'Ssl_version'";
53-
5457
/**
5558
* ===================== This is a diff from the original file ===========================
5659
* We intentionally make the 'connectionFactor' a public static variable, so that we can set the value in app source.
5760
*/
5861
public static ConnectionFactory connectionFactory;
59-
60-
protected final Logger logger = LoggerFactory.getLogger(getClass());
6162
protected final Configuration config;
6263
protected final JdbcConnection jdbc;
6364
private final Map<String, String> originalSystemProperties = new HashMap<>();
@@ -70,29 +71,47 @@ public MySqlJdbcContext(MySqlConnectorConfig config) {
7071
// per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html
7172
boolean useSSL = sslModeEnabled();
7273
Configuration jdbcConfig = this.config
73-
.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) ||
74-
x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name())))
74+
.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING)
75+
|| x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name())))
7576
.edit()
7677
.withDefault(MySqlConnectorConfig.PORT, MySqlConnectorConfig.PORT.defaultValue())
78+
.withDefault("database.useCursorFetch", config.useCursorFetch())
7779
.build()
7880
.subset("database.", true);
7981

8082
Builder jdbcConfigBuilder = jdbcConfig
8183
.edit()
8284
.with("connectTimeout", Long.toString(config.getConnectionTimeout().toMillis()))
83-
.with("useSSL", Boolean.toString(useSSL));
85+
.with("sslMode", sslMode().getValue());
86+
87+
if (useSSL) {
88+
if (!Strings.isNullOrBlank(sslTrustStore())) {
89+
jdbcConfigBuilder.with("trustCertificateKeyStoreUrl", "file:" + sslTrustStore());
90+
}
91+
if (sslTrustStorePassword() != null) {
92+
jdbcConfigBuilder.with("trustCertificateKeyStorePassword", String.valueOf(sslTrustStorePassword()));
93+
}
94+
if (!Strings.isNullOrBlank(sslKeyStore())) {
95+
jdbcConfigBuilder.with("clientCertificateKeyStoreUrl", "file:" + sslKeyStore());
96+
}
97+
if (sslKeyStorePassword() != null) {
98+
jdbcConfigBuilder.with("clientCertificateKeyStorePassword", String.valueOf(sslKeyStorePassword()));
99+
}
100+
}
84101

85102
final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
86103
if (legacyDateTime == null) {
87104
jdbcConfigBuilder.with(JDBC_PROPERTY_LEGACY_DATETIME, "false");
88105
} else if ("true".equals(legacyDateTime)) {
89-
logger.warn("'{}' is set to 'true'. This setting is not recommended and can result in timezone issues.",
106+
LOGGER.warn("'{}' is set to 'true'. This setting is not recommended and can result in timezone issues.",
90107
JDBC_PROPERTY_LEGACY_DATETIME);
91108
}
92109

93110
jdbcConfig = jdbcConfigBuilder.build();
94111
String driverClassName = jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
95-
this.jdbc = new JdbcConnection(jdbcConfig, connectionFactory);
112+
this.jdbc = new JdbcConnection(JdbcConfiguration.adapt(jdbcConfig), connectionFactory,
113+
() -> MySqlValueConverters.jdbcClassLoader,
114+
"`", "`");
96115
}
97116

98117
public Configuration config() {
@@ -104,7 +123,7 @@ public JdbcConnection jdbc() {
104123
}
105124

106125
public Logger logger() {
107-
return logger;
126+
return LOGGER;
108127
}
109128

110129
public String username() {
@@ -132,6 +151,24 @@ public boolean sslModeEnabled() {
132151
return sslMode() != SecureConnectionMode.DISABLED;
133152
}
134153

154+
public String sslKeyStore() {
155+
return config.getString(MySqlConnectorConfig.SSL_KEYSTORE);
156+
}
157+
158+
public char[] sslKeyStorePassword() {
159+
String password = config.getString(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
160+
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
161+
}
162+
163+
public String sslTrustStore() {
164+
return config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE);
165+
}
166+
167+
public char[] sslTrustStorePassword() {
168+
String password = config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
169+
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
170+
}
171+
135172
public EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() {
136173
String mode = config.getString(CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE);
137174
if (mode == null) {
@@ -145,22 +182,11 @@ public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() {
145182
return EventProcessingFailureHandlingMode.parse(mode);
146183
}
147184

148-
public void start() {
149-
if (sslModeEnabled()) {
150-
originalSystemProperties.clear();
151-
// Set the System properties for SSL for the MySQL driver ...
152-
setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true);
153-
setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false);
154-
setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true);
155-
setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, false);
156-
}
157-
}
158-
159185
public void shutdown() {
160186
try {
161187
jdbc.close();
162188
} catch (SQLException e) {
163-
logger.error("Unexpected error shutting down the database connection", e);
189+
LOGGER.error("Unexpected error shutting down the database connection", e);
164190
} finally {
165191
// Reset the system properties to their original value ...
166192
originalSystemProperties.forEach((name, value) -> {
@@ -201,8 +227,8 @@ public boolean isGtidModeEnabled() {
201227
/**
202228
* Determine the executed GTID set for MySQL.
203229
*
204-
* @return the string representation of MySQL's GTID sets; never null but an empty string if the server does not use
205-
* GTIDs
230+
* @return the string representation of MySQL's GTID sets; never null but an empty string if the server does not
231+
* use GTIDs
206232
*/
207233
public String knownGtidSet() {
208234
AtomicReference<String> gtidSetStr = new AtomicReference<String>();
@@ -281,7 +307,7 @@ public boolean userHasPrivileges(String grantName) {
281307
jdbc.query("SHOW GRANTS FOR CURRENT_USER", rs -> {
282308
while (rs.next()) {
283309
String grants = rs.getString(1);
284-
logger.debug(grants);
310+
LOGGER.debug(grants);
285311
if (grants == null) {
286312
return;
287313
}
@@ -292,13 +318,13 @@ public boolean userHasPrivileges(String grantName) {
292318
}
293319
});
294320
} catch (SQLException e) {
295-
throw new ConnectException("Unexpected error while connecting to MySQL and looking at privileges for current " +
296-
"user: ", e);
321+
throw new ConnectException("Unexpected error while connecting to MySQL and looking at privileges for " +
322+
"current user: ", e);
297323
}
298324
return result.get();
299325
}
300326

301-
protected String connectionString() {
327+
public String connectionString() {
302328
return jdbc.connectionString(MYSQL_CONNECTION_URL);
303329
}
304330

@@ -309,7 +335,7 @@ protected String connectionString() {
309335
*/
310336
protected Map<String, String> readMySqlCharsetSystemVariables() {
311337
// Read the system variables from the MySQL instance and get the current database name ...
312-
logger.debug("Reading MySQL charset-related system variables before parsing DDL history.");
338+
LOGGER.debug("Reading MySQL charset-related system variables before parsing DDL history.");
313339
return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET);
314340
}
315341

@@ -318,23 +344,22 @@ protected Map<String, String> readMySqlCharsetSystemVariables() {
318344
*
319345
* @return the system variables that are related to server character sets; never null
320346
*/
321-
protected Map<String, String> readMySqlSystemVariables() {
347+
public Map<String, String> readMySqlSystemVariables() {
322348
// Read the system variables from the MySQL instance and get the current database name ...
323-
logger.debug("Reading MySQL system variables");
349+
LOGGER.debug("Reading MySQL system variables");
324350
return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES);
325351
}
326352

327353
private Map<String, String> querySystemVariables(String statement) {
328354
Map<String, String> variables = new HashMap<>();
329355
try {
330-
start();
331356
jdbc.connect().query(statement, rs -> {
332357
while (rs.next()) {
333358
String varName = rs.getString(1);
334359
String value = rs.getString(2);
335360
if (varName != null && value != null) {
336361
variables.put(varName, value);
337-
logger.debug("\t{} = {}",
362+
LOGGER.debug("\t{} = {}",
338363
Strings.pad(varName, 45, ' '),
339364
Strings.pad(value, 45, ' '));
340365
}
@@ -347,6 +372,36 @@ private Map<String, String> querySystemVariables(String statement) {
347372
return variables;
348373
}
349374

375+
/**
376+
* Read the MySQL default character sets for exisiting databases.
377+
*
378+
* @return the map of database names with their default character sets; never null
379+
*/
380+
protected Map<String, DatabaseLocales> readDatabaseCollations() {
381+
LOGGER.debug("Reading default database charsets");
382+
try {
383+
return jdbc.connect().queryAndMap("SELECT schema_name, default_character_set_name, " +
384+
"default_collation_name FROM information_schema.schemata", rs -> {
385+
final Map<String, DatabaseLocales> charsets = new HashMap<>();
386+
while (rs.next()) {
387+
String dbName = rs.getString(1);
388+
String charset = rs.getString(2);
389+
String collation = rs.getString(3);
390+
if (dbName != null && (charset != null || collation != null)) {
391+
charsets.put(dbName, new DatabaseLocales(charset, collation));
392+
LOGGER.debug("\t{} = {}, {}",
393+
Strings.pad(dbName, 45, ' '),
394+
Strings.pad(charset, 45, ' '),
395+
Strings.pad(collation, 45, ' '));
396+
}
397+
}
398+
return charsets;
399+
});
400+
} catch (SQLException e) {
401+
throw new DebeziumException("Error reading default database charsets: " + e.getMessage(), e);
402+
}
403+
}
404+
350405
protected String setStatementFor(Map<String, String> variables) {
351406
StringBuilder sb = new StringBuilder("SET ");
352407
boolean first = true;
@@ -403,13 +458,41 @@ protected void setSystemProperty(String property, Field field, boolean showValue
403458
*
404459
* @return the session variables that are related to sessions ssl version
405460
*/
406-
protected String getSessionVariableForSslVersion() {
461+
public String getSessionVariableForSslVersion() {
407462
final String sslVersion = "Ssl_version";
408-
logger.debug("Reading MySQL Session variable for Ssl Version");
463+
LOGGER.debug("Reading MySQL Session variable for Ssl Version");
409464
Map<String, String> sessionVariables = querySystemVariables(SQL_SHOW_SESSION_VARIABLE_SSL_VERSION);
410465
if (!sessionVariables.isEmpty() && sessionVariables.containsKey(sslVersion)) {
411466
return sessionVariables.get(sslVersion);
412467
}
413468
return null;
414469
}
470+
471+
/**
472+
* A static class entity for handling database's locale
473+
*/
474+
public static class DatabaseLocales {
475+
private final String charset;
476+
private final String collation;
477+
478+
public DatabaseLocales(String charset, String collation) {
479+
this.charset = charset;
480+
this.collation = collation;
481+
}
482+
483+
public void appendToDdlStatement(String dbName, StringBuilder ddl) {
484+
if (charset != null) {
485+
LOGGER.debug("Setting default charset '{}' for database '{}'", charset, dbName);
486+
ddl.append(" CHARSET ").append(charset);
487+
} else {
488+
LOGGER.info("Default database charset for '{}' not found", dbName);
489+
}
490+
if (collation != null) {
491+
LOGGER.debug("Setting default collation '{}' for database '{}'", collation, dbName);
492+
ddl.append(" COLLATE ").append(collation);
493+
} else {
494+
LOGGER.info("Default database collation for '{}' not found", dbName);
495+
}
496+
}
497+
}
415498
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060

6161
<properties>
6262
<delta.version>0.9.0</delta.version>
63-
<debezium.version>1.3.1.Final</debezium.version>
63+
<debezium.version>1.9.7.Final</debezium.version>
6464
<slf4j.version>1.7.25</slf4j.version>
6565
<guava.version>30.0-jre</guava.version>
6666
<jacoco.version>0.8.8</jacoco.version>

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import io.debezium.connector.sqlserver.SqlServerConnector;
3333
import io.debezium.connector.sqlserver.SqlServerErrorHandler;
3434
import io.debezium.embedded.EmbeddedEngine;
35-
import io.debezium.jdbc.JdbcConfiguration;
36-
import io.debezium.jdbc.JdbcConnection;
3735
import io.debezium.util.Strings;
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
@@ -86,13 +84,10 @@ public void start(Offset offset) {
8684
// load sql server jdbc driver into class loader and use this loaded jdbc class to set the static factory
8785
// variable in SqlServerConnection for instantiation purpose later on.
8886
Class<? extends Driver> jdbcDriverClass = context.loadPluginClass(config.getJDBCPluginId());
89-
String urlPattern = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" +
90-
JdbcConfiguration.PORT + "};databaseName=${" + JdbcConfiguration.DATABASE + "}";
91-
SqlServerConnection.factory = JdbcConnection.patternBasedFactory(urlPattern,
92-
jdbcDriverClass.getName(),
93-
jdbcDriverClass.getClassLoader());
94-
9587
SqlServerErrorHandler.driverClassLoader = jdbcDriverClass.getClassLoader();
88+
SqlServerConnection.jdbcDriverClass = jdbcDriverClass;
89+
90+
9691
// this is needed since sql server does not return the database information in the record
9792
String databaseName = config.getDatabase();
9893

0 commit comments

Comments
 (0)