Skip to content

Commit a006f5e

Browse files
committed
CDAP-16735 fix error handling on snapshot ddl failures
Fixed the plugins to correctly stop and reset state if there are issues during database snapshot.
1 parent eadc283 commit a006f5e

File tree

8 files changed

+295
-102
lines changed

8 files changed

+295
-102
lines changed

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
4040
private final DocumentWriter writer = DocumentWriter.defaultWriter();
4141
private final DocumentReader reader = DocumentReader.defaultReader();
4242

43+
public static void wipeHistory() throws IOException {
44+
deltaRuntimeContext.putState(KEY, new byte[] { });
45+
}
46+
4347
@Override
4448
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
4549
List<HistoryRecord> history = getHistory();
@@ -79,7 +83,7 @@ private List<HistoryRecord> getHistory() {
7983
List<HistoryRecord> history = new ArrayList<>();
8084
try {
8185
byte[] historyBytes = deltaRuntimeContext.getState(KEY);
82-
if (historyBytes == null) {
86+
if (historyBytes == null || historyBytes.length == 0) {
8387
return history;
8488
}
8589
String historyStr = Bytes.toString(historyBytes);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.mock;
18+
19+
import io.cdap.delta.api.DDLEvent;
20+
import io.cdap.delta.api.DMLEvent;
21+
import io.cdap.delta.api.EventEmitter;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
29+
/**
30+
* EventEmitter for tests that can block with emitting events.
31+
*/
32+
public class BlockingEventEmitter implements EventEmitter {
33+
private final BlockingQueue<DDLEvent> ddlQueue;
34+
private final BlockingQueue<DMLEvent> dmlQueue;
35+
36+
public BlockingEventEmitter(BlockingQueue<DDLEvent> ddlQueue, BlockingQueue<DMLEvent> dmlQueue) {
37+
this.ddlQueue = ddlQueue;
38+
this.dmlQueue = dmlQueue;
39+
}
40+
41+
@Override
42+
public void emit(DDLEvent ddlEvent) throws InterruptedException {
43+
ddlQueue.put(ddlEvent);
44+
}
45+
46+
@Override
47+
public void emit(DMLEvent dmlEvent) throws InterruptedException {
48+
dmlQueue.put(dmlEvent);
49+
}
50+
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.delta.mysql;
1818

19+
import io.cdap.delta.api.DeltaFailureException;
1920
import io.cdap.delta.api.DeltaSourceContext;
2021
import io.cdap.delta.api.EventEmitter;
2122
import io.cdap.delta.api.EventReader;
@@ -37,6 +38,7 @@
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

41+
import java.io.IOException;
4042
import java.sql.Driver;
4143
import java.time.temporal.ChronoField;
4244
import java.time.temporal.ChronoUnit;
@@ -111,6 +113,20 @@ public void start(Offset offset) {
111113
.build();
112114
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
113115
DBSchemaHistory.deltaRuntimeContext = context;
116+
/*
117+
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
118+
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
119+
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
120+
happen during the initial snapshot.
121+
TODO: (CDAP-16735) fix this more comprehensively
122+
*/
123+
if (offset.get().isEmpty()) {
124+
try {
125+
DBSchemaHistory.wipeHistory();
126+
} catch (IOException e) {
127+
throw new RuntimeException("Unable to wipe schema history at start of replication.", e);
128+
}
129+
}
114130

115131
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
116132
DdlParser ddlParser = mysqlConf.getDdlParsingMode().getNewParserInstance(mySqlValueConverters, tableId -> true);
@@ -132,10 +148,10 @@ public void start(Offset offset) {
132148
}
133149

134150
public void stop() throws InterruptedException {
135-
if (engine != null && engine.stop()) {
136-
engine.await(1, TimeUnit.MINUTES);
151+
executorService.shutdownNow();
152+
if (!executorService.awaitTermination(2, TimeUnit.MINUTES)) {
153+
LOG.warn("Unable to cleanly shutdown reader within the timeout.");
137154
}
138-
executorService.shutdown();
139155
}
140156

141157
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 117 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.io.IOException;
4141
import java.util.HashMap;
4242
import java.util.Map;
43+
import java.util.concurrent.atomic.AtomicReference;
4344
import java.util.function.Consumer;
4445

4546
/**
@@ -128,88 +129,29 @@ public void accept(SourceRecord sourceRecord) {
128129
// If the map is empty, we should read all DDL/DML events and columns of all tables
129130
boolean readAllTables = sourceTableMap.isEmpty();
130131

131-
if (ddl != null) {
132-
ddlParser.getDdlChanges().reset();
133-
ddlParser.parse(ddl, tables);
134-
ddlParser.getDdlChanges().groupEventsByDatabase((database, events) -> {
135-
for (DdlParserListener.Event event : events) {
136-
DDLEvent.Builder builder = DDLEvent.builder()
137-
.setDatabase(database)
138-
.setOffset(recordOffset)
139-
.setSnapshot(isSnapshot);
140-
// since current ddl blacklist implementation is bind with table level, we will only do the ddl blacklist
141-
// checking only for table change related ddl event, includes: ALTER_TABLE, RENAME_TABLE, DROP_TABLE,
142-
// CREATE_TABLE and TRUNCATE_TABLE.
143-
switch (event.type()) {
144-
case ALTER_TABLE:
145-
DdlParserListener.TableAlteredEvent alteredEvent = (DdlParserListener.TableAlteredEvent) event;
146-
TableId tableId = alteredEvent.tableId();
147-
Table table = tables.forTable(tableId);
148-
SourceTable sourceTable = getSourceTable(database, tableId.table());
149-
DDLOperation ddlOp;
150-
if (alteredEvent.previousTableId() != null) {
151-
ddlOp = DDLOperation.RENAME_TABLE;
152-
builder.setPrevTable(alteredEvent.previousTableId().table());
153-
} else {
154-
ddlOp = DDLOperation.ALTER_TABLE;
155-
}
156-
157-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
158-
emitter.emit(builder.setOperation(ddlOp)
159-
.setTable(tableId.table())
160-
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
161-
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
162-
.setPrimaryKey(table.primaryKeyColumnNames())
163-
.build());
164-
}
165-
break;
166-
case DROP_TABLE:
167-
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
168-
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
169-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
170-
emitter.emit(builder.setOperation(DDLOperation.DROP_TABLE)
171-
.setTable(droppedEvent.tableId().table())
172-
.build());
173-
}
174-
break;
175-
case CREATE_TABLE:
176-
DdlParserListener.TableCreatedEvent createdEvent = (DdlParserListener.TableCreatedEvent) event;
177-
tableId = createdEvent.tableId();
178-
table = tables.forTable(tableId);
179-
sourceTable = getSourceTable(database, tableId.table());
180-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
181-
emitter.emit(builder.setOperation(DDLOperation.CREATE_TABLE)
182-
.setTable(tableId.table())
183-
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
184-
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
185-
.setPrimaryKey(table.primaryKeyColumnNames())
186-
.build());
187-
}
188-
break;
189-
case DROP_DATABASE:
190-
emitter.emit(builder.setOperation(DDLOperation.DROP_DATABASE).build());
191-
break;
192-
case CREATE_DATABASE:
193-
emitter.emit(builder.setOperation(DDLOperation.CREATE_DATABASE).build());
194-
break;
195-
case TRUNCATE_TABLE:
196-
DdlParserListener.TableTruncatedEvent truncatedEvent =
197-
(DdlParserListener.TableTruncatedEvent) event;
198-
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
199-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
200-
emitter.emit(builder.setOperation(DDLOperation.TRUNCATE_TABLE)
201-
.setTable(truncatedEvent.tableId().table())
202-
.build());
203-
}
204-
break;
205-
default:
206-
return;
207-
}
208-
}
209-
});
210-
return;
132+
try {
133+
if (ddl != null) {
134+
handleDDL(ddl, recordOffset, isSnapshot, readAllTables);
135+
return;
136+
}
137+
138+
String databaseName = source.get("db");
139+
String tableName = source.get("table");
140+
SourceTable sourceTable = getSourceTable(databaseName, tableName);
141+
if (sourceTableNotValid(readAllTables, sourceTable)) {
142+
return;
143+
}
144+
145+
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
146+
} catch (InterruptedException e) {
147+
LOG.debug("Interrupted while emitting change event.", e);
148+
// reset the interrupted flag so that caller knows to interrupt
149+
Thread.currentThread().interrupt();
211150
}
151+
}
212152

153+
private void handleDML(StructuredRecord source, StructuredRecord val, Offset recordOffset,
154+
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
213155
String databaseName = source.get("db");
214156
String tableName = source.get("table");
215157
SourceTable sourceTable = getSourceTable(databaseName, tableName);
@@ -274,6 +216,101 @@ public void accept(SourceRecord sourceRecord) {
274216
}
275217
}
276218

219+
private void handleDDL(String ddlStatement, Offset recordOffset,
220+
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
221+
ddlParser.getDdlChanges().reset();
222+
ddlParser.parse(ddlStatement, tables);
223+
AtomicReference<InterruptedException> interrupted = new AtomicReference<>();
224+
ddlParser.getDdlChanges().groupEventsByDatabase((database, events) -> {
225+
if (interrupted.get() != null) {
226+
return;
227+
}
228+
for (DdlParserListener.Event event : events) {
229+
DDLEvent.Builder builder = DDLEvent.builder()
230+
.setOffset(recordOffset)
231+
.setDatabase(database)
232+
.setSnapshot(isSnapshot);
233+
DDLEvent ddlEvent = null;
234+
// since current ddl blacklist implementation is bind with table level, we will only do the ddl blacklist
235+
// checking only for table change related ddl event, includes: ALTER_TABLE, RENAME_TABLE, DROP_TABLE,
236+
// CREATE_TABLE and TRUNCATE_TABLE.
237+
switch (event.type()) {
238+
case ALTER_TABLE:
239+
DdlParserListener.TableAlteredEvent alteredEvent = (DdlParserListener.TableAlteredEvent) event;
240+
TableId tableId = alteredEvent.tableId();
241+
Table table = tables.forTable(tableId);
242+
SourceTable sourceTable = getSourceTable(database, tableId.table());
243+
DDLOperation ddlOp;
244+
if (alteredEvent.previousTableId() != null) {
245+
ddlOp = DDLOperation.RENAME_TABLE;
246+
builder.setPrevTable(alteredEvent.previousTableId().table());
247+
} else {
248+
ddlOp = DDLOperation.ALTER_TABLE;
249+
}
250+
251+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
252+
ddlEvent = builder.setOperation(ddlOp)
253+
.setTable(tableId.table())
254+
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
255+
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
256+
.setPrimaryKey(table.primaryKeyColumnNames())
257+
.build();
258+
}
259+
break;
260+
case DROP_TABLE:
261+
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
262+
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
263+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
264+
ddlEvent = builder.setOperation(DDLOperation.DROP_TABLE)
265+
.setTable(droppedEvent.tableId().table())
266+
.build();
267+
}
268+
break;
269+
case CREATE_TABLE:
270+
DdlParserListener.TableCreatedEvent createdEvent = (DdlParserListener.TableCreatedEvent) event;
271+
tableId = createdEvent.tableId();
272+
table = tables.forTable(tableId);
273+
sourceTable = getSourceTable(database, tableId.table());
274+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
275+
ddlEvent = builder.setOperation(DDLOperation.CREATE_TABLE)
276+
.setTable(tableId.table())
277+
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
278+
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
279+
.setPrimaryKey(table.primaryKeyColumnNames())
280+
.build();
281+
}
282+
break;
283+
case DROP_DATABASE:
284+
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
285+
break;
286+
case CREATE_DATABASE:
287+
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
288+
break;
289+
case TRUNCATE_TABLE:
290+
DdlParserListener.TableTruncatedEvent truncatedEvent =
291+
(DdlParserListener.TableTruncatedEvent) event;
292+
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
293+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
294+
ddlEvent = builder.setOperation(DDLOperation.TRUNCATE_TABLE)
295+
.setTable(truncatedEvent.tableId().table())
296+
.build();
297+
}
298+
break;
299+
}
300+
if (ddlEvent != null) {
301+
try {
302+
emitter.emit(ddlEvent);
303+
} catch (InterruptedException e) {
304+
interrupted.set(e);
305+
}
306+
}
307+
}
308+
});
309+
if (interrupted.get() != null) {
310+
throw interrupted.get();
311+
}
312+
}
313+
277314
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
278315
return (!sourceTableNotValid(readAllTables, sourceTable)) &&
279316
(!isDDLOperationBlacklisted(readAllTables, sourceTable, op));

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerDeltaSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.cdap.delta.api.assessment.TableDetail;
3131
import io.cdap.delta.api.assessment.TableRegistry;
3232
import io.cdap.delta.plugin.common.DriverCleanup;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.sql.Driver;
3537

@@ -40,6 +42,7 @@
4042
@Name(SqlServerDeltaSource.NAME)
4143
@Description("Delta source for SqlServer.")
4244
public class SqlServerDeltaSource implements DeltaSource {
45+
private static final Logger LOG = LoggerFactory.getLogger(SqlServerDeltaSource.class);
4346
public static final String NAME = "sqlserver";
4447
private final SqlServerConfig config;
4548

@@ -56,6 +59,7 @@ public void configure(Configurer configurer) {
5659

5760
@Override
5861
public EventReader createReader(EventReaderDefinition definition, DeltaSourceContext context, EventEmitter emitter) {
62+
LOG.info("creating new event reader");
5963
return new SqlServerEventReader(definition.getTables(), config, context, emitter);
6064
}
6165

0 commit comments

Comments
 (0)