Skip to content

Commit 3692032

Browse files
authored
[CDAP-16743] Fixed the issue that sql server plugin failed to replicate changes if initial table is empty. (#76)
* Fixed the issue that sql server plugin failed to replicate changes if initial table is empty. * Change comments * address comments. * refactor code. * update offset store class comments. * address comments. * refactor the code. * fixed indent.
1 parent 6e7c6ab commit 3692032

File tree

4 files changed

+158
-85
lines changed

4 files changed

+158
-85
lines changed

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConstantOffsetBackingStore.java

Lines changed: 4 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@
2020
import io.cdap.cdap.api.common.Bytes;
2121
import io.debezium.connector.sqlserver.SourceInfo;
2222
import org.apache.kafka.connect.runtime.WorkerConfig;
23-
import org.apache.kafka.connect.source.SourceRecord;
2423
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
2524

2625
import java.nio.ByteBuffer;
2726
import java.util.HashMap;
2827
import java.util.Map;
2928

3029
/**
31-
* The offset store class for sql server. Sql server expects the offset which contains:
32-
* 1. change_lsn, 2. commmit_lsn, 3. snapshot, 4. snapshot_completed.
30+
* The sql server constant offset store class is used by debezium only to retrieve the offset information.
31+
* Debezium expects the offset which contains:
32+
* 1. change_lsn, 2. commit_lsn, 3. snapshot, 4. snapshot_completed.
3333
*/
3434
public class SqlServerConstantOffsetBackingStore extends MemoryOffsetBackingStore {
35-
private static final String SNAPSHOT_COMPLETED = "snapshot_completed";
3635
private static final Gson GSON = new Gson();
3736
private static final String KEY = "{\"schema\":null,\"payload\":[\"delta\",{\"server\":\"dummy\"}]}";
37+
static final String SNAPSHOT_COMPLETED = "snapshot_completed";
3838

3939
@Override
4040
public void configure(WorkerConfig config) {
@@ -69,41 +69,4 @@ public void configure(WorkerConfig config) {
6969

7070
data.put(ByteBuffer.wrap(Bytes.toBytes(KEY)), ByteBuffer.wrap(offsetBytes));
7171
}
72-
73-
static Map<String, String> deserializeOffsets(Map<String, String> offsets) {
74-
Map<String, String> offsetConfig = new HashMap<>();
75-
String changeLsn = offsets.getOrDefault(SourceInfo.CHANGE_LSN_KEY, null);
76-
String commitLsn = offsets.getOrDefault(SourceInfo.COMMIT_LSN_KEY, null);
77-
String snapshot = offsets.getOrDefault(SourceInfo.SNAPSHOT_KEY, null);
78-
String snapshotCompleted = offsets.getOrDefault(SNAPSHOT_COMPLETED, null);
79-
// this is prevent NPE since the configuration does not allow putting null value,
80-
// also WorkerConfig.get() will throw Exception if a configuration is not found, so have to put some value in it
81-
offsetConfig.put(SourceInfo.CHANGE_LSN_KEY, changeLsn == null ? "" : changeLsn);
82-
offsetConfig.put(SourceInfo.COMMIT_LSN_KEY, commitLsn == null ? "" : commitLsn);
83-
offsetConfig.put(SourceInfo.SNAPSHOT_KEY, snapshot == null ? "" : snapshot);
84-
offsetConfig.put(SNAPSHOT_COMPLETED, snapshotCompleted == null ? "" : snapshotCompleted);
85-
return offsetConfig;
86-
}
87-
88-
static Map<String, String> serializeOffsets(SourceRecord sourceRecord) {
89-
Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
90-
String changLsn = (String) sourceOffset.get(SourceInfo.CHANGE_LSN_KEY);
91-
String commitLsn = (String) sourceOffset.get(SourceInfo.COMMIT_LSN_KEY);
92-
Boolean snapshot = (Boolean) sourceOffset.get(SourceInfo.SNAPSHOT_KEY);
93-
Boolean snapshotCompleted = (Boolean) sourceOffset.get(SNAPSHOT_COMPLETED);
94-
Map<String, String> deltaOffset = new HashMap<>(4);
95-
if (changLsn != null) {
96-
deltaOffset.put(SourceInfo.CHANGE_LSN_KEY, changLsn);
97-
}
98-
if (commitLsn != null) {
99-
deltaOffset.put(SourceInfo.COMMIT_LSN_KEY, commitLsn);
100-
}
101-
if (snapshot != null) {
102-
deltaOffset.put(SourceInfo.SNAPSHOT_KEY, String.valueOf(snapshot));
103-
}
104-
if (snapshotCompleted != null) {
105-
deltaOffset.put(SNAPSHOT_COMPLETED, String.valueOf(snapshotCompleted));
106-
}
107-
return deltaOffset;
108-
}
10972
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,20 @@
2525
import io.cdap.delta.plugin.common.DBSchemaHistory;
2626
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
2727
import io.debezium.config.Configuration;
28+
import io.debezium.connector.sqlserver.SourceInfo;
2829
import io.debezium.connector.sqlserver.SqlServerConnection;
2930
import io.debezium.connector.sqlserver.SqlServerConnector;
3031
import io.debezium.embedded.EmbeddedEngine;
3132
import io.debezium.jdbc.JdbcConfiguration;
3233
import io.debezium.jdbc.JdbcConnection;
34+
import io.debezium.util.Strings;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

3638
import java.io.IOException;
3739
import java.sql.Driver;
40+
import java.util.Arrays;
41+
import java.util.HashSet;
3842
import java.util.Map;
3943
import java.util.Set;
4044
import java.util.concurrent.ExecutorService;
@@ -92,28 +96,36 @@ public void start(Offset offset) {
9296
return schema == null ? table : schema + "." + table;
9397
}, t -> t));
9498

99+
Map<String, String> state = offset.get(); // this will never be null
95100
// offset config
96-
Configuration.Builder builder = Configuration.create()
97-
.with("connector.class", SqlServerConnector.class.getName())
98-
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
99-
.with("offset.flush.interval.ms", 1000);
100-
SqlServerConstantOffsetBackingStore.deserializeOffsets(offset.get()).forEach(builder::with);
101-
102-
Configuration debeziumConf =
103-
builder
104-
/* begin connector properties */
105-
.with("name", "delta")
106-
.with("database.hostname", config.getHost())
107-
.with("database.port", config.getPort())
108-
.with("database.user", config.getUser())
109-
.with("database.password", config.getPassword())
110-
.with("database.history", DBSchemaHistory.class.getName())
111-
.with("database.dbname", databaseName)
112-
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
113-
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
114-
.with("database.serverTimezone", config.getServerTimezone())
115-
.build();
101+
Configuration debeziumConf = Configuration.create()
102+
.with("connector.class", SqlServerConnector.class.getName())
103+
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
104+
.with("offset.flush.interval.ms", 1000)
105+
/* bind offset configs with debeizumConf */
106+
.with("change_lsn", state.getOrDefault(SourceInfo.CHANGE_LSN_KEY, ""))
107+
.with("commit_lsn", state.getOrDefault(SourceInfo.COMMIT_LSN_KEY, ""))
108+
.with("snapshot", state.getOrDefault(SourceInfo.SNAPSHOT_KEY, ""))
109+
.with("snapshot_completed", state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, ""))
110+
/* begin connector properties */
111+
.with("name", "delta")
112+
.with("database.hostname", config.getHost())
113+
.with("database.port", config.getPort())
114+
.with("database.user", config.getUser())
115+
.with("database.password", config.getPassword())
116+
.with("database.history", DBSchemaHistory.class.getName())
117+
.with("database.dbname", databaseName)
118+
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
119+
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
120+
.with("database.serverTimezone", config.getServerTimezone())
121+
.build();
122+
116123
DBSchemaHistory.deltaRuntimeContext = context;
124+
125+
String snapshotTablesStr = state.get(SqlServerOffset.SNAPSHOT_TABLES);
126+
Set<String> snapshotTables = Strings.isNullOrEmpty(snapshotTablesStr) ? new HashSet<>() :
127+
new HashSet<>(Arrays.asList(snapshotTablesStr.split(SqlServerOffset.DELIMITER)));
128+
117129
/*
118130
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
119131
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
@@ -136,15 +148,14 @@ public void start(Offset offset) {
136148
LOG.info("creating new EmbeddedEngine...");
137149
// Create the engine with this configuration ...
138150
engine = EmbeddedEngine.create()
139-
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, sourceTableMap))
151+
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, snapshotTables, sourceTableMap))
140152
.using(debeziumConf)
141153
.using(new NotifyingCompletionCallback(context))
142154
.build();
143155
executorService.submit(engine);
144156
} finally {
145157
Thread.currentThread().setContextClassLoader(oldCL);
146158
}
147-
148159
}
149160

150161
@Override
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.sqlserver;
18+
19+
import io.cdap.delta.api.Offset;
20+
import io.debezium.connector.sqlserver.SourceInfo;
21+
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
import java.util.Set;
27+
28+
/**
29+
* Record offset information for SqlServer.
30+
*/
31+
public class SqlServerOffset {
32+
static final String DELIMITER = ",";
33+
static final String SNAPSHOT_TABLES = "snapshot_tables";
34+
35+
private final String changeLsn;
36+
private final String commitLsn;
37+
private final Boolean isSnapshot;
38+
private final Boolean isSnapshotCompleted;
39+
private Set<String> snapshotTables;
40+
41+
SqlServerOffset(Map<String, ?> properties) {
42+
this.changeLsn = (String) properties.get(SourceInfo.CHANGE_LSN_KEY);
43+
this.commitLsn = (String) properties.get(SourceInfo.COMMIT_LSN_KEY);
44+
this.isSnapshot = (Boolean) properties.get(SourceInfo.SNAPSHOT_KEY);
45+
this.isSnapshotCompleted = (Boolean) properties.get(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED);
46+
this.snapshotTables = new HashSet<>();
47+
}
48+
49+
boolean isSnapshot() {
50+
return Boolean.TRUE.equals(isSnapshot);
51+
}
52+
53+
void setSnapshotTables(Set<String> snapshotTables) {
54+
this.snapshotTables = new HashSet<>(snapshotTables);
55+
}
56+
57+
void addSnapshotTable(String table) {
58+
snapshotTables.add(table);
59+
}
60+
61+
Offset getAsOffset() {
62+
Map<String, String> deltaOffset = new HashMap<>();
63+
if (changeLsn != null) {
64+
deltaOffset.put(SourceInfo.CHANGE_LSN_KEY, changeLsn);
65+
}
66+
if (commitLsn != null) {
67+
deltaOffset.put(SourceInfo.COMMIT_LSN_KEY, commitLsn);
68+
}
69+
if (isSnapshot != null) {
70+
deltaOffset.put(SourceInfo.SNAPSHOT_KEY, String.valueOf(isSnapshot));
71+
}
72+
if (isSnapshotCompleted != null) {
73+
deltaOffset.put(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, String.valueOf(isSnapshotCompleted));
74+
}
75+
if (snapshotTables != null && !snapshotTables.isEmpty()) {
76+
deltaOffset.put(SNAPSHOT_TABLES, String.join(DELIMITER, snapshotTables));
77+
}
78+
79+
return new Offset(deltaOffset);
80+
}
81+
82+
@Override
83+
public boolean equals(Object o) {
84+
if (this == o) {
85+
return true;
86+
}
87+
if (o == null || getClass() != o.getClass()) {
88+
return false;
89+
}
90+
SqlServerOffset that = (SqlServerOffset) o;
91+
return Objects.equals(changeLsn, that.changeLsn)
92+
&& Objects.equals(commitLsn, that.commitLsn)
93+
&& Objects.equals(isSnapshot, that.isSnapshot)
94+
&& Objects.equals(isSnapshotCompleted, that.isSnapshotCompleted)
95+
&& Objects.equals(snapshotTables, that.snapshotTables);
96+
}
97+
98+
@Override
99+
public int hashCode() {
100+
return Objects.hash(changeLsn, commitLsn, isSnapshot, isSnapshotCompleted, snapshotTables);
101+
}
102+
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.cdap.delta.api.Offset;
2828
import io.cdap.delta.api.SourceTable;
2929
import io.cdap.delta.plugin.common.Records;
30-
import io.debezium.connector.sqlserver.SourceInfo;
3130
import io.debezium.embedded.StopConnectorException;
3231
import org.apache.kafka.connect.data.Struct;
3332
import org.apache.kafka.connect.source.SourceRecord;
@@ -36,7 +35,6 @@
3635

3736
import java.io.IOException;
3837
import java.util.ArrayList;
39-
import java.util.HashSet;
4038
import java.util.List;
4139
import java.util.Map;
4240
import java.util.Set;
@@ -53,16 +51,15 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5351
private final EventEmitter emitter;
5452
// we need this since there is no way to get the db information from the source record
5553
private final String databaseName;
56-
// this is hack to track the tables getting created or not
57-
private final Set<SourceTable> trackingTables;
54+
private final Set<String> snapshotTables;
5855
private final Map<String, SourceTable> sourceTableMap;
5956

6057
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
61-
Map<String, SourceTable> sourceTableMap) {
58+
Set<String> snapshotTables, Map<String, SourceTable> sourceTableMap) {
6259
this.context = context;
6360
this.emitter = emitter;
6461
this.databaseName = databaseName;
65-
this.trackingTables = new HashSet<>();
62+
this.snapshotTables = snapshotTables;
6663
this.sourceTableMap = sourceTableMap;
6764
}
6865

@@ -77,12 +74,12 @@ public void accept(SourceRecord sourceRecord) {
7774
return;
7875
}
7976

80-
Map<String, String> deltaOffset = SqlServerConstantOffsetBackingStore.serializeOffsets(sourceRecord);
81-
Offset recordOffset = new Offset(deltaOffset);
82-
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
83-
84-
boolean isSnapshot = Boolean.TRUE.equals(sourceRecord.sourceOffset().get(SourceInfo.SNAPSHOT_KEY));
77+
SqlServerOffset sqlServerOffset = new SqlServerOffset(sourceRecord.sourceOffset());
78+
sqlServerOffset.setSnapshotTables(snapshotTables);
79+
boolean isSnapshot = sqlServerOffset.isSnapshot();
80+
Offset recordOffset = sqlServerOffset.getAsOffset();
8581

82+
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
8683
DMLOperation op;
8784
String opStr = val.get("op");
8885
if ("c".equals(opStr) || "r".equals(opStr)) {
@@ -96,13 +93,12 @@ public void accept(SourceRecord sourceRecord) {
9693
return;
9794
}
9895

99-
StructuredRecord key = Records.convert((Struct) sourceRecord.key());
100-
String recordName = key.getSchema().getRecordName();
101-
// the record name will always be like this: [db.server.name].[schema].[table].Envelope
102-
if (recordName == null) {
96+
String topicName = sourceRecord.topic();
97+
// the topic name will always be like this: [db.server.name].[schema].[table]
98+
if (topicName == null) {
10399
return; // safety check to avoid NPE
104100
}
105-
String[] splits = recordName.split("\\.");
101+
String[] splits = topicName.split("\\.");
106102
String schemaName = splits[1];
107103
String tableName = splits[2];
108104
String sourceTableId = schemaName + "." + tableName;
@@ -132,22 +128,23 @@ public void accept(SourceRecord sourceRecord) {
132128
return;
133129
}
134130

135-
// this is a hack to send DDL event if we first see this table, now all the stuff is in memory
136-
SourceTable trackingTable = new SourceTable(databaseName, tableName);
137131
DDLEvent.Builder builder = DDLEvent.builder()
138132
.setDatabase(databaseName)
139-
.setOffset(recordOffset)
140133
.setSnapshot(isSnapshot);
141134

142135
Schema schema = value.getSchema();
143-
// send the ddl event if the first see the table and the it is in snapshot.
136+
// send the ddl events only if we see the table at the first time
144137
// Note: the delta app itself have prevented adding CREATE_TABLE operation into DDL blacklist for all the tables.
145-
if (!trackingTables.contains(trackingTable) && isSnapshot) {
138+
if (!snapshotTables.contains(sourceTableId)) {
139+
StructuredRecord key = Records.convert((Struct) sourceRecord.key());
146140
List<Schema.Field> fields = key.getSchema().getFields();
147141
List<String> primaryFields = new ArrayList<>();
148142
if (fields != null && !fields.isEmpty()) {
149143
primaryFields = fields.stream().map(Schema.Field::getName).collect(Collectors.toList());
150144
}
145+
sqlServerOffset.addSnapshotTable(sourceTableId);
146+
recordOffset = sqlServerOffset.getAsOffset();
147+
builder.setOffset(recordOffset);
151148

152149
try {
153150
// try to always drop the table before snapshot the schema.
@@ -167,7 +164,7 @@ public void accept(SourceRecord sourceRecord) {
167164
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
168165
throw new StopConnectorException("Interrupted while emitting an event.");
169166
}
170-
trackingTables.add(trackingTable);
167+
snapshotTables.add(sourceTableId);
171168
}
172169

173170
if (!readAllTables && sourceTable.getDmlBlacklist().contains(op)) {

0 commit comments

Comments
 (0)