Skip to content

Commit 581496d

Browse files
authored
CDAP-16554 check replication permissions during assessment for MySQL. (#69)
* check replication permissions during assessmetn for MySQL. * address comments. * address comments. * add comments for null check.
1 parent 585fb60 commit 581496d

File tree

5 files changed

+108
-29
lines changed

5 files changed

+108
-29
lines changed

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.cdap.cdap.api.annotation.Macro;
2121
import io.cdap.cdap.api.plugin.PluginConfig;
2222

23+
import java.util.Properties;
2324
import javax.annotation.Nullable;
2425

2526
/**
@@ -98,4 +99,16 @@ public String getJdbcPluginName() {
9899
public String getJDBCPluginId() {
99100
return String.format("%s.%s.%s", "mysqlsource", "jbdc", jdbcPluginName);
100101
}
102+
103+
public String getJdbcURL() {
104+
return String.format("jdbc:mysql://%s:%d/%s", host, port, database);
105+
}
106+
107+
public Properties getConnectionProperties() {
108+
Properties properties = new Properties();
109+
properties.put("user", user);
110+
properties.put("password", password);
111+
properties.put("serverTimezone", getServerTimezone());
112+
return properties;
113+
}
101114
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlDeltaSource.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.cdap.delta.plugin.common.DriverCleanup;
3333

3434
import java.sql.Driver;
35+
import java.util.UUID;
3536

3637
/**
3738
* Mysql origin.
@@ -61,24 +62,24 @@ public EventReader createReader(EventReaderDefinition definition, DeltaSourceCon
6162
}
6263

6364
@Override
64-
public TableRegistry createTableRegistry(Configurer configurer) {
65+
public TableRegistry createTableRegistry(Configurer configurer) throws Exception {
66+
return new MySqlTableRegistry(conf, getDriverCleanup(configurer));
67+
}
68+
69+
@Override
70+
public TableAssessor<TableDetail> createTableAssessor(Configurer configurer) throws Exception {
71+
return new MySqlTableAssessor(conf, getDriverCleanup(configurer));
72+
}
73+
74+
private DriverCleanup getDriverCleanup(Configurer configurer) throws Exception {
6575
Class<? extends Driver> jdbcDriverClass = configurer.usePluginClass("jdbc", conf.getJdbcPluginName(),
66-
conf.getJDBCPluginId(),
76+
conf.getJDBCPluginId() + "." +
77+
UUID.randomUUID().toString(),
6778
PluginProperties.builder().build());
6879
if (jdbcDriverClass == null) {
6980
throw new IllegalArgumentException("JDBC plugin " + conf.getJdbcPluginName() + " not found.");
7081
}
71-
try {
72-
DriverCleanup cleanup = DriverCleanup.ensureJDBCDriverIsAvailable(
73-
jdbcDriverClass, String.format("jdbc:mysql://%s:%d/%s", conf.getHost(), conf.getPort(), conf.getDatabase()));
74-
return new MySqlTableRegistry(conf, cleanup);
75-
} catch (Exception e) {
76-
throw new RuntimeException("Unable to instantiate JDBC driver", e);
77-
}
78-
}
7982

80-
@Override
81-
public TableAssessor<TableDetail> createTableAssessor(Configurer configurer) throws Exception {
82-
return new MySqlTableAssessor();
83+
return DriverCleanup.ensureJDBCDriverIsAvailable(jdbcDriverClass, conf.getJdbcURL());
8384
}
8485
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlTableAssessor.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,23 @@
1717
package io.cdap.delta.mysql;
1818

1919
import io.cdap.cdap.api.data.schema.Schema;
20+
import io.cdap.delta.api.assessment.Assessment;
2021
import io.cdap.delta.api.assessment.ColumnAssessment;
2122
import io.cdap.delta.api.assessment.ColumnDetail;
2223
import io.cdap.delta.api.assessment.ColumnSuggestion;
2324
import io.cdap.delta.api.assessment.ColumnSupport;
25+
import io.cdap.delta.api.assessment.Problem;
2426
import io.cdap.delta.api.assessment.TableAssessment;
2527
import io.cdap.delta.api.assessment.TableAssessor;
2628
import io.cdap.delta.api.assessment.TableDetail;
2729
import io.cdap.delta.plugin.common.ColumnEvaluation;
30+
import io.cdap.delta.plugin.common.DriverCleanup;
2831

32+
import java.io.IOException;
33+
import java.sql.Connection;
34+
import java.sql.DriverManager;
35+
import java.sql.ResultSet;
36+
import java.sql.Statement;
2937
import java.sql.Types;
3038
import java.util.ArrayList;
3139
import java.util.Collections;
@@ -39,6 +47,29 @@ public class MySqlTableAssessor implements TableAssessor<TableDetail> {
3947
static final String COLUMN_LENGTH = "COLUMN_LENGTH";
4048
static final String SCALE = "SCALE";
4149

50+
private final MySqlConfig conf;
51+
private final DriverCleanup driverCleanup;
52+
53+
MySqlTableAssessor(MySqlConfig conf, DriverCleanup driverCleanup) {
54+
this.conf = conf;
55+
this.driverCleanup = driverCleanup;
56+
}
57+
58+
@Override
59+
public Assessment assess() {
60+
List<Problem> featureProblems = new ArrayList<>();
61+
List<String> permissions = new ArrayList<>();
62+
permissions.add("REPLICATION SLAVE");
63+
permissions.add("REPLICATION CLIENT");
64+
checkReplicationPermissions(featureProblems, permissions);
65+
return new Assessment(featureProblems, Collections.emptyList());
66+
}
67+
68+
@Override
69+
public void close() throws IOException {
70+
driverCleanup.close();
71+
}
72+
4273
@Override
4374
public TableAssessment assess(TableDetail tableDetail) {
4475
List<ColumnAssessment> columnAssessments = new ArrayList<>();
@@ -126,4 +157,46 @@ static ColumnEvaluation evaluateColumn(ColumnDetail detail) throws IllegalArgume
126157
.build();
127158
return new ColumnEvaluation(field, assessment);
128159
}
160+
161+
private void checkReplicationPermissions(List<Problem> featureProblems, List<String> permissions) {
162+
String query = String.format("SHOW GRANTS FOR '%s'@'%s'", conf.getUser(), conf.getHost());
163+
Problem permissionUnknownProblem =
164+
new Problem("Table Replication Permission Unknown",
165+
String.format("Unable to check if '%s' permissions were granted for user '%s'@'%s' or not",
166+
String.join(",", permissions), conf.getUser(), conf.getHost()),
167+
"Check database connectivity and user permissions",
168+
"Change events might fail to be read after the snapshot phase");
169+
try (Connection connection = DriverManager.getConnection(conf.getJdbcURL(), conf.getConnectionProperties());
170+
Statement statement = connection.createStatement();
171+
ResultSet rs = statement.executeQuery(query)) {
172+
if (rs.next()) {
173+
String grants = rs.getString(1);
174+
175+
// Avoid potential NPE in case the connection was accidentally closed by database before the value is read,
176+
// though it will happen very rarely
177+
if (grants == null) {
178+
featureProblems.add(permissionUnknownProblem);
179+
return;
180+
}
181+
182+
if (grants.contains("ALL")) {
183+
return;
184+
}
185+
186+
for (String permission : permissions) {
187+
if (!grants.contains(permission)) {
188+
featureProblems.add(
189+
new Problem("Table Replication Permission Not Granted",
190+
String.format("The '%s' permission is not granted for user '%s'@'%s'", permission,
191+
conf.getUser(), conf.getHost()),
192+
String.format("Grant '%s' permission to user '%s'@'%s'", permission,
193+
conf.getUser(), conf.getHost()),
194+
"Will not be able to read replication events after the initial snapshot completes."));
195+
}
196+
}
197+
}
198+
} catch (Exception e) {
199+
featureProblems.add(permissionUnknownProblem);
200+
}
201+
}
129202
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlTableRegistry.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,29 +40,24 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Optional;
43-
import java.util.Properties;
4443

4544
/**
4645
* Lists and describes tables.
4746
*/
4847
public class MySqlTableRegistry implements TableRegistry {
4948
private final MySqlConfig conf;
5049
private final DriverCleanup driverCleanup;
51-
private final Properties properties;
5250

5351
public MySqlTableRegistry(MySqlConfig conf, DriverCleanup driverCleanup) {
5452
this.conf = conf;
5553
this.driverCleanup = driverCleanup;
56-
this.properties = new Properties();
57-
properties.put("user", conf.getUser());
58-
properties.put("password", conf.getPassword());
59-
properties.put("serverTimezone", conf.getServerTimezone());
6054
}
6155

6256
@Override
6357
public TableList listTables() throws IOException {
6458
List<TableSummary> tables = new ArrayList<>();
65-
try (Connection connection = DriverManager.getConnection(getConnectionString(conf.getDatabase()), properties)) {
59+
try (Connection connection = DriverManager.getConnection(getConnectionString(conf.getDatabase()),
60+
conf.getConnectionProperties())) {
6661
DatabaseMetaData dbMeta = connection.getMetaData();
6762
try (ResultSet tableResults = dbMeta.getTables(null, null, null, null)) {
6863
while (tableResults.next()) {
@@ -85,7 +80,7 @@ public TableList listTables() throws IOException {
8580

8681
@Override
8782
public TableDetail describeTable(String db, String table) throws TableNotFoundException, IOException {
88-
try (Connection connection = DriverManager.getConnection(getConnectionString(db), properties)) {
83+
try (Connection connection = DriverManager.getConnection(getConnectionString(db), conf.getConnectionProperties())) {
8984
DatabaseMetaData dbMeta = connection.getMetaData();
9085
TableDetail.Builder builder = getTableDetailBuilder(dbMeta, db, table)
9186
.orElseThrow(() -> new TableNotFoundException(db, table, ""));

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerDeltaSource.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,17 @@ public EventReader createReader(EventReaderDefinition definition, DeltaSourceCon
6060
}
6161

6262
@Override
63-
public TableRegistry createTableRegistry(Configurer configurer) {
63+
public TableRegistry createTableRegistry(Configurer configurer) throws Exception {
6464
Class<? extends Driver> jdbcDriverClass = configurer.usePluginClass("jdbc", config.getJdbcPluginName(),
6565
config.getJDBCPluginId(),
6666
PluginProperties.builder().build());
6767
if (jdbcDriverClass == null) {
6868
throw new IllegalArgumentException("JDBC plugin " + config.getJdbcPluginName() + " not found.");
6969
}
70-
try {
71-
DriverCleanup cleanup = DriverCleanup.ensureJDBCDriverIsAvailable(
72-
jdbcDriverClass, String.format("jdbc:sqlserver://%s:%d", config.getHost(), config.getPort()));
73-
return new SqlServerTableRegistry(config, cleanup);
74-
} catch (Exception e) {
75-
throw new RuntimeException("Unable to instantiate JDBC driver", e);
76-
}
70+
71+
DriverCleanup cleanup = DriverCleanup.ensureJDBCDriverIsAvailable(
72+
jdbcDriverClass, String.format("jdbc:sqlserver://%s:%d", config.getHost(), config.getPort()));
73+
return new SqlServerTableRegistry(config, cleanup);
7774
}
7875

7976
@Override

0 commit comments

Comments
 (0)