Skip to content

Commit bd6ce74

Browse files
support schema for SQL Server Table Registry (#171)
support schema for SQL Server Table Registry
1 parent 376344f commit bd6ce74

File tree

1 file changed

+32
-17
lines changed

1 file changed

+32
-17
lines changed

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerTableRegistry.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.cdap.delta.api.assessment.TableSummary;
2929
import io.cdap.delta.plugin.common.ColumnEvaluation;
3030
import io.cdap.delta.plugin.common.DriverCleanup;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
import java.io.IOException;
3335
import java.sql.Connection;
@@ -44,11 +46,13 @@
4446
import java.util.Map;
4547
import java.util.Optional;
4648
import java.util.Set;
49+
import javax.annotation.Nullable;
4750

4851
/**
4952
* Sql Server table registry
5053
*/
5154
public class SqlServerTableRegistry implements TableRegistry {
55+
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerTableRegistry.class);
5256
private final String jdbcUrl;
5357
private final SqlServerConfig config;
5458
private final DriverCleanup driverCleanup;
@@ -93,30 +97,38 @@ public TableList listTables() throws IOException {
9397
}
9498
}
9599

100+
96101
@Override
97-
public TableDetail describeTable(String database, String schema, String table)
98-
throws TableNotFoundException, IOException {
99-
// TODO CDAP-17477 Ignore schema currently since its fetch from metadata in sql server
100-
return describeTable(database, table);
102+
public TableDetail describeTable(String db, String table) throws TableNotFoundException, IOException {
103+
return describeTable(db, null, table);
101104
}
102105

103106
@Override
104-
public TableDetail describeTable(String db, String table) throws TableNotFoundException, IOException {
107+
public TableDetail describeTable(String db, @Nullable String schema, String table)
108+
throws TableNotFoundException, IOException {
105109
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
106110
List<Problem> missingFeatures = new ArrayList<>();
107111
DatabaseMetaData dbMeta = connection.getMetaData();
108-
TableDetail.Builder builder = getTableDetailBuilder(dbMeta, db, table)
112+
TableDetail.Builder builder = getTableDetailBuilder(dbMeta, db, schema, table)
109113
.orElseThrow(() -> new TableNotFoundException(db, table, ""));
110114

111-
String query = String.format("SELECT [name], is_tracked_by_cdc FROM sys.tables where name = '%s'", table);
115+
116+
String query =
117+
schema == null ? String.format("SELECT is_tracked_by_cdc FROM sys.tables where name = '%s'", table)
118+
: String.format("SELECT is_tracked_by_cdc FROM " +
119+
"(select is_tracked_by_cdc, schema_id from sys.tables where name = '%s') as t "
120+
+ "join (select schema_id from sys.schemas where name ='%s') as s "
121+
+ "on t.schema_id = s.schema_id", table, schema);
112122
try (Statement statement = connection.createStatement();
113123
ResultSet rs = statement.executeQuery(query)) {
124+
// if schema is null, we will check any table matching the table name with any schema
114125
if (rs.next()) {
115126
// if cdc is enabled, then the column 'is_tracked_by_cdc' should be 1
116127
if (rs.getInt("is_tracked_by_cdc") != 1) {
117128
missingFeatures.add(
118129
new Problem("Table CDC Feature Not Enabled",
119-
String.format("The CDC feature for table '%s' in database '%s' was not enabled.", table, db),
130+
String.format("The CDC feature for table '%s' in database '%s' was not enabled.",
131+
schema == null ? table : schema + "." + table, db),
120132
"Check the table CDC settings",
121133
"Not able to replicate table changes"));
122134
}
@@ -125,7 +137,7 @@ public TableDetail describeTable(String db, String table) throws TableNotFoundEx
125137
missingFeatures.add(
126138
new Problem("Unable To Check If CDC Was Enabled",
127139
String.format("Unable to check if CDC feature for table '%s' in database '%s' was enabled or not",
128-
table, db),
140+
schema == null ? table : schema + "." + table, db),
129141
"Check database connectivity and table information",
130142
null));
131143
}
@@ -155,21 +167,21 @@ public void close() throws IOException {
155167
driverCleanup.close();
156168
}
157169

158-
private Optional<TableDetail.Builder> getTableDetailBuilder(DatabaseMetaData dbMeta, String db, String table)
159-
throws SQLException {
170+
private Optional<TableDetail.Builder> getTableDetailBuilder(DatabaseMetaData dbMeta, String db,
171+
@Nullable String schema, String table) throws SQLException {
160172
List<ColumnDetail> columns = new ArrayList<>();
161173
// this schema name is needed to construct the full table name, e.g, dbo.test for debizium to fetch records from
162174
// sql server. The table name is constructed using [schemaName].[tableName]. However, the dbMeta is not able
163175
// to retrieve any result back if we pass in the full table name, the schema name has to be passed separately
164176
// in the second parameter.
165-
String schemaName = null;
166-
try (ResultSet columnResults = dbMeta.getColumns(db, null, table, null)) {
177+
// if schema name is null, then any table matching the table name with any schema name
178+
try (ResultSet columnResults = dbMeta.getColumns(db, schema, table, null)) {
167179
while (columnResults.next()) {
168180
Map<String, String> properties = new HashMap<>();
169181
properties.put(SqlServerTableAssessor.COLUMN_LENGTH, columnResults.getString("COLUMN_SIZE"));
170182
properties.put(SqlServerTableAssessor.SCALE, columnResults.getString("DECIMAL_DIGITS"));
171183
properties.put(SqlServerTableAssessor.TYPE_NAME, columnResults.getString("TYPE_NAME"));
172-
schemaName = columnResults.getString("TABLE_SCHEM");
184+
schema = columnResults.getString("TABLE_SCHEM");
173185
columns.add(new ColumnDetail(columnResults.getString("COLUMN_NAME"),
174186
JDBCType.valueOf(columnResults.getInt("DATA_TYPE")),
175187
columnResults.getBoolean("NULLABLE"),
@@ -180,13 +192,16 @@ private Optional<TableDetail.Builder> getTableDetailBuilder(DatabaseMetaData dbM
180192
return Optional.empty();
181193
}
182194
List<String> primaryKey = new ArrayList<>();
183-
try (ResultSet keyResults = dbMeta.getPrimaryKeys(db, schemaName, table)) {
195+
LOGGER.debug("Query primary key for {}.{}.{}", db, schema, table);
196+
try (ResultSet keyResults = dbMeta.getPrimaryKeys(db, schema, table)) {
184197
while (keyResults.next()) {
185-
primaryKey.add(keyResults.getString("COLUMN_NAME"));
198+
String pk = keyResults.getString("COLUMN_NAME");
199+
LOGGER.debug("Found primary key for {}.{}.{} : {}", db, schema, table, pk);
200+
primaryKey.add(pk);
186201
}
187202
}
188203

189-
return Optional.of(TableDetail.builder(db, table, schemaName)
204+
return Optional.of(TableDetail.builder(db, table, schema)
190205
.setPrimaryKey(primaryKey)
191206
.setColumns(columns));
192207
}

0 commit comments

Comments
 (0)