Skip to content

Commit 6e7c6ab

Browse files
authored
Merge pull request #75 from data-integrations/bugfix_release/CDAP-16735-fix-ddl-errors-during-snapshot
Bugfix release/cdap 16735 fix ddl errors during snapshot
2 parents eadc283 + db550b4 commit 6e7c6ab

File tree

9 files changed

+354
-119
lines changed

9 files changed

+354
-119
lines changed

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
4040
private final DocumentWriter writer = DocumentWriter.defaultWriter();
4141
private final DocumentReader reader = DocumentReader.defaultReader();
4242

43+
public static void wipeHistory() throws IOException {
44+
deltaRuntimeContext.putState(KEY, new byte[] { });
45+
}
46+
4347
@Override
4448
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
4549
List<HistoryRecord> history = getHistory();
@@ -79,7 +83,7 @@ private List<HistoryRecord> getHistory() {
7983
List<HistoryRecord> history = new ArrayList<>();
8084
try {
8185
byte[] historyBytes = deltaRuntimeContext.getState(KEY);
82-
if (historyBytes == null) {
86+
if (historyBytes == null || historyBytes.length == 0) {
8387
return history;
8488
}
8589
String historyStr = Bytes.toString(historyBytes);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.mock;
18+
19+
import io.cdap.delta.api.DDLEvent;
20+
import io.cdap.delta.api.DMLEvent;
21+
import io.cdap.delta.api.EventEmitter;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
29+
/**
30+
* EventEmitter for tests that can block with emitting events.
31+
*/
32+
public class BlockingEventEmitter implements EventEmitter {
33+
private final BlockingQueue<DDLEvent> ddlQueue;
34+
private final BlockingQueue<DMLEvent> dmlQueue;
35+
36+
public BlockingEventEmitter(BlockingQueue<DDLEvent> ddlQueue, BlockingQueue<DMLEvent> dmlQueue) {
37+
this.ddlQueue = ddlQueue;
38+
this.dmlQueue = dmlQueue;
39+
}
40+
41+
@Override
42+
public void emit(DDLEvent ddlEvent) throws InterruptedException {
43+
ddlQueue.put(ddlEvent);
44+
}
45+
46+
@Override
47+
public void emit(DMLEvent dmlEvent) throws InterruptedException {
48+
dmlQueue.put(dmlEvent);
49+
}
50+
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.delta.mysql;
1818

19+
import com.google.common.annotations.VisibleForTesting;
20+
import io.cdap.delta.api.DeltaFailureException;
1921
import io.cdap.delta.api.DeltaSourceContext;
2022
import io.cdap.delta.api.EventEmitter;
2123
import io.cdap.delta.api.EventReader;
@@ -37,6 +39,7 @@
3739
import org.slf4j.Logger;
3840
import org.slf4j.LoggerFactory;
3941

42+
import java.io.IOException;
4043
import java.sql.Driver;
4144
import java.time.temporal.ChronoField;
4245
import java.time.temporal.ChronoUnit;
@@ -59,6 +62,7 @@ public class MySqlEventReader implements EventReader {
5962
private final DeltaSourceContext context;
6063
private final Set<SourceTable> sourceTables;
6164
private EmbeddedEngine engine;
65+
private volatile boolean failedToStop;
6266

6367
public MySqlEventReader(Set<SourceTable> sourceTables, MySqlConfig config,
6468
DeltaSourceContext context, EventEmitter emitter) {
@@ -67,6 +71,7 @@ public MySqlEventReader(Set<SourceTable> sourceTables, MySqlConfig config,
6771
this.context = context;
6872
this.emitter = emitter;
6973
this.executorService = Executors.newSingleThreadScheduledExecutor();
74+
this.failedToStop = false;
7075
}
7176

7277
@Override
@@ -111,6 +116,20 @@ public void start(Offset offset) {
111116
.build();
112117
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
113118
DBSchemaHistory.deltaRuntimeContext = context;
119+
/*
120+
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
121+
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
122+
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
123+
happen during the initial snapshot.
124+
TODO: (CDAP-16735) fix this more comprehensively
125+
*/
126+
if (offset.get().isEmpty()) {
127+
try {
128+
DBSchemaHistory.wipeHistory();
129+
} catch (IOException e) {
130+
throw new RuntimeException("Unable to wipe schema history at start of replication.", e);
131+
}
132+
}
114133

115134
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
116135
DdlParser ddlParser = mysqlConf.getDdlParsingMode().getNewParserInstance(mySqlValueConverters, tableId -> true);
@@ -132,10 +151,16 @@ public void start(Offset offset) {
132151
}
133152

134153
public void stop() throws InterruptedException {
135-
if (engine != null && engine.stop()) {
136-
engine.await(1, TimeUnit.MINUTES);
154+
executorService.shutdownNow();
155+
if (!executorService.awaitTermination(2, TimeUnit.MINUTES)) {
156+
LOG.warn("Unable to cleanly shutdown reader within the timeout.");
157+
failedToStop = true;
137158
}
138-
executorService.shutdown();
159+
}
160+
161+
@VisibleForTesting
162+
boolean failedToStop() {
163+
return failedToStop;
139164
}
140165

141166
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 124 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.cdap.delta.api.SourceTable;
2828
import io.cdap.delta.plugin.common.Records;
2929
import io.debezium.connector.mysql.MySqlValueConverters;
30+
import io.debezium.embedded.StopConnectorException;
3031
import io.debezium.relational.Table;
3132
import io.debezium.relational.TableId;
3233
import io.debezium.relational.Tables;
@@ -40,6 +41,7 @@
4041
import java.io.IOException;
4142
import java.util.HashMap;
4243
import java.util.Map;
44+
import java.util.concurrent.atomic.AtomicReference;
4345
import java.util.function.Consumer;
4446

4547
/**
@@ -128,88 +130,28 @@ public void accept(SourceRecord sourceRecord) {
128130
// If the map is empty, we should read all DDL/DML events and columns of all tables
129131
boolean readAllTables = sourceTableMap.isEmpty();
130132

131-
if (ddl != null) {
132-
ddlParser.getDdlChanges().reset();
133-
ddlParser.parse(ddl, tables);
134-
ddlParser.getDdlChanges().groupEventsByDatabase((database, events) -> {
135-
for (DdlParserListener.Event event : events) {
136-
DDLEvent.Builder builder = DDLEvent.builder()
137-
.setDatabase(database)
138-
.setOffset(recordOffset)
139-
.setSnapshot(isSnapshot);
140-
// since current ddl blacklist implementation is bind with table level, we will only do the ddl blacklist
141-
// checking only for table change related ddl event, includes: ALTER_TABLE, RENAME_TABLE, DROP_TABLE,
142-
// CREATE_TABLE and TRUNCATE_TABLE.
143-
switch (event.type()) {
144-
case ALTER_TABLE:
145-
DdlParserListener.TableAlteredEvent alteredEvent = (DdlParserListener.TableAlteredEvent) event;
146-
TableId tableId = alteredEvent.tableId();
147-
Table table = tables.forTable(tableId);
148-
SourceTable sourceTable = getSourceTable(database, tableId.table());
149-
DDLOperation ddlOp;
150-
if (alteredEvent.previousTableId() != null) {
151-
ddlOp = DDLOperation.RENAME_TABLE;
152-
builder.setPrevTable(alteredEvent.previousTableId().table());
153-
} else {
154-
ddlOp = DDLOperation.ALTER_TABLE;
155-
}
156-
157-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
158-
emitter.emit(builder.setOperation(ddlOp)
159-
.setTable(tableId.table())
160-
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
161-
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
162-
.setPrimaryKey(table.primaryKeyColumnNames())
163-
.build());
164-
}
165-
break;
166-
case DROP_TABLE:
167-
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
168-
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
169-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
170-
emitter.emit(builder.setOperation(DDLOperation.DROP_TABLE)
171-
.setTable(droppedEvent.tableId().table())
172-
.build());
173-
}
174-
break;
175-
case CREATE_TABLE:
176-
DdlParserListener.TableCreatedEvent createdEvent = (DdlParserListener.TableCreatedEvent) event;
177-
tableId = createdEvent.tableId();
178-
table = tables.forTable(tableId);
179-
sourceTable = getSourceTable(database, tableId.table());
180-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
181-
emitter.emit(builder.setOperation(DDLOperation.CREATE_TABLE)
182-
.setTable(tableId.table())
183-
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
184-
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
185-
.setPrimaryKey(table.primaryKeyColumnNames())
186-
.build());
187-
}
188-
break;
189-
case DROP_DATABASE:
190-
emitter.emit(builder.setOperation(DDLOperation.DROP_DATABASE).build());
191-
break;
192-
case CREATE_DATABASE:
193-
emitter.emit(builder.setOperation(DDLOperation.CREATE_DATABASE).build());
194-
break;
195-
case TRUNCATE_TABLE:
196-
DdlParserListener.TableTruncatedEvent truncatedEvent =
197-
(DdlParserListener.TableTruncatedEvent) event;
198-
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
199-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
200-
emitter.emit(builder.setOperation(DDLOperation.TRUNCATE_TABLE)
201-
.setTable(truncatedEvent.tableId().table())
202-
.build());
203-
}
204-
break;
205-
default:
206-
return;
207-
}
208-
}
209-
});
210-
return;
133+
try {
134+
if (ddl != null) {
135+
handleDDL(ddl, recordOffset, isSnapshot, readAllTables);
136+
return;
137+
}
138+
139+
String databaseName = source.get("db");
140+
String tableName = source.get("table");
141+
SourceTable sourceTable = getSourceTable(databaseName, tableName);
142+
if (sourceTableNotValid(readAllTables, sourceTable)) {
143+
return;
144+
}
145+
146+
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
147+
} catch (InterruptedException e) {
148+
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
149+
throw new StopConnectorException("Interrupted while emitting event.");
211150
}
151+
}
212152

153+
private void handleDML(StructuredRecord source, StructuredRecord val, Offset recordOffset,
154+
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
213155
String databaseName = source.get("db");
214156
String tableName = source.get("table");
215157
SourceTable sourceTable = getSourceTable(databaseName, tableName);
@@ -274,6 +216,108 @@ public void accept(SourceRecord sourceRecord) {
274216
}
275217
}
276218

219+
private void handleDDL(String ddlStatement, Offset recordOffset,
220+
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
221+
ddlParser.getDdlChanges().reset();
222+
ddlParser.parse(ddlStatement, tables);
223+
AtomicReference<InterruptedException> interrupted = new AtomicReference<>();
224+
ddlParser.getDdlChanges().groupEventsByDatabase((database, events) -> {
225+
if (interrupted.get() != null) {
226+
return;
227+
}
228+
for (DdlParserListener.Event event : events) {
229+
DDLEvent.Builder builder = DDLEvent.builder()
230+
.setOffset(recordOffset)
231+
.setDatabase(database)
232+
.setSnapshot(isSnapshot);
233+
DDLEvent ddlEvent = null;
234+
// since current ddl blacklist implementation is bind with table level, we will only do the ddl blacklist
235+
// checking only for table change related ddl event, includes: ALTER_TABLE, RENAME_TABLE, DROP_TABLE,
236+
// CREATE_TABLE and TRUNCATE_TABLE.
237+
switch (event.type()) {
238+
case ALTER_TABLE:
239+
DdlParserListener.TableAlteredEvent alteredEvent = (DdlParserListener.TableAlteredEvent) event;
240+
TableId tableId = alteredEvent.tableId();
241+
Table table = tables.forTable(tableId);
242+
SourceTable sourceTable = getSourceTable(database, tableId.table());
243+
DDLOperation ddlOp;
244+
if (alteredEvent.previousTableId() != null) {
245+
ddlOp = DDLOperation.RENAME_TABLE;
246+
builder.setPrevTable(alteredEvent.previousTableId().table());
247+
} else {
248+
ddlOp = DDLOperation.ALTER_TABLE;
249+
}
250+
251+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
252+
ddlEvent = builder.setOperation(ddlOp)
253+
.setTable(tableId.table())
254+
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
255+
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
256+
.setPrimaryKey(table.primaryKeyColumnNames())
257+
.build();
258+
}
259+
break;
260+
case DROP_TABLE:
261+
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
262+
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
263+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
264+
ddlEvent = builder.setOperation(DDLOperation.DROP_TABLE)
265+
.setTable(droppedEvent.tableId().table())
266+
.build();
267+
}
268+
break;
269+
case CREATE_TABLE:
270+
DdlParserListener.TableCreatedEvent createdEvent = (DdlParserListener.TableCreatedEvent) event;
271+
tableId = createdEvent.tableId();
272+
table = tables.forTable(tableId);
273+
sourceTable = getSourceTable(database, tableId.table());
274+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
275+
ddlEvent = builder.setOperation(DDLOperation.CREATE_TABLE)
276+
.setTable(tableId.table())
277+
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
278+
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
279+
.setPrimaryKey(table.primaryKeyColumnNames())
280+
.build();
281+
}
282+
break;
283+
case DROP_DATABASE:
284+
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
285+
break;
286+
case CREATE_DATABASE:
287+
// due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
288+
// a DROP_DATABASE event will be mistakenly categorized as a CREATE_DATABASE event.
289+
// TODO: check if this is fixed in a newer debezium version
290+
if (event.statement() != null && event.statement().startsWith("DROP DATABASE")) {
291+
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
292+
} else {
293+
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
294+
}
295+
break;
296+
case TRUNCATE_TABLE:
297+
DdlParserListener.TableTruncatedEvent truncatedEvent =
298+
(DdlParserListener.TableTruncatedEvent) event;
299+
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
300+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
301+
ddlEvent = builder.setOperation(DDLOperation.TRUNCATE_TABLE)
302+
.setTable(truncatedEvent.tableId().table())
303+
.build();
304+
}
305+
break;
306+
}
307+
if (ddlEvent != null) {
308+
try {
309+
emitter.emit(ddlEvent);
310+
} catch (InterruptedException e) {
311+
interrupted.set(e);
312+
}
313+
}
314+
}
315+
});
316+
if (interrupted.get() != null) {
317+
throw interrupted.get();
318+
}
319+
}
320+
277321
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
278322
return (!sourceTableNotValid(readAllTables, sourceTable)) &&
279323
(!isDDLOperationBlacklisted(readAllTables, sourceTable, op));

0 commit comments

Comments
 (0)