Skip to content

Commit db550b4

Browse files
committed
CDAP-16737 fix stop logic
Fixed stop logic for the event readers. Event consumers need to throw a special StopConnectorException to tell Debezium to stop immediately instead of continuing. Fixed the consumers to do this if they are interrupted while writing to the event queue, which happens when the reader is being stopped. Also added integration tests for stopping logic and included fixes to mysql integration test
1 parent a006f5e commit db550b4

File tree

6 files changed

+83
-41
lines changed

6 files changed

+83
-41
lines changed

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.delta.mysql;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import io.cdap.delta.api.DeltaFailureException;
2021
import io.cdap.delta.api.DeltaSourceContext;
2122
import io.cdap.delta.api.EventEmitter;
@@ -61,6 +62,7 @@ public class MySqlEventReader implements EventReader {
6162
private final DeltaSourceContext context;
6263
private final Set<SourceTable> sourceTables;
6364
private EmbeddedEngine engine;
65+
private volatile boolean failedToStop;
6466

6567
public MySqlEventReader(Set<SourceTable> sourceTables, MySqlConfig config,
6668
DeltaSourceContext context, EventEmitter emitter) {
@@ -69,6 +71,7 @@ public MySqlEventReader(Set<SourceTable> sourceTables, MySqlConfig config,
6971
this.context = context;
7072
this.emitter = emitter;
7173
this.executorService = Executors.newSingleThreadScheduledExecutor();
74+
this.failedToStop = false;
7275
}
7376

7477
@Override
@@ -151,9 +154,15 @@ public void stop() throws InterruptedException {
151154
executorService.shutdownNow();
152155
if (!executorService.awaitTermination(2, TimeUnit.MINUTES)) {
153156
LOG.warn("Unable to cleanly shutdown reader within the timeout.");
157+
failedToStop = true;
154158
}
155159
}
156160

161+
@VisibleForTesting
162+
boolean failedToStop() {
163+
return failedToStop;
164+
}
165+
157166
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {
158167
// Use MySQL-specific converters and schemas for values ...
159168

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.cdap.delta.api.SourceTable;
2828
import io.cdap.delta.plugin.common.Records;
2929
import io.debezium.connector.mysql.MySqlValueConverters;
30+
import io.debezium.embedded.StopConnectorException;
3031
import io.debezium.relational.Table;
3132
import io.debezium.relational.TableId;
3233
import io.debezium.relational.Tables;
@@ -144,9 +145,8 @@ public void accept(SourceRecord sourceRecord) {
144145

145146
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
146147
} catch (InterruptedException e) {
147-
LOG.debug("Interrupted while emitting change event.", e);
148-
// reset the interrupted flag so that caller knows to interrupt
149-
Thread.currentThread().interrupt();
148+
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
149+
throw new StopConnectorException("Interrupted while emitting event.");
150150
}
151151
}
152152

@@ -284,7 +284,14 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
284284
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
285285
break;
286286
case CREATE_DATABASE:
287-
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
287+
// due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
288+
// a DROP_DATABASE event will be mistakenly categorized as a CREATE_DATABASE event.
289+
// TODO: check if this is fixed in a newer debezium version
290+
if (event.statement() != null && event.statement().startsWith("DROP DATABASE")) {
291+
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
292+
} else {
293+
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
294+
}
288295
break;
289296
case TRUNCATE_TABLE:
290297
DdlParserListener.TableTruncatedEvent truncatedEvent =

mysql-delta-plugins/src/test/java/io/cdap/delta/mysql/MySqlEventReaderIntegrationTest.java

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import io.cdap.delta.api.DMLEvent;
2525
import io.cdap.delta.api.DMLOperation;
2626
import io.cdap.delta.api.DeltaSourceContext;
27+
import io.cdap.delta.api.EventEmitter;
2728
import io.cdap.delta.api.Offset;
2829
import io.cdap.delta.api.SourceTable;
30+
import io.cdap.delta.plugin.mock.BlockingEventEmitter;
2931
import io.cdap.delta.plugin.mock.MockContext;
3032
import io.cdap.delta.plugin.mock.MockEventEmitter;
3133
import org.junit.Assert;
@@ -44,6 +46,8 @@
4446
import java.util.Collections;
4547
import java.util.Properties;
4648
import java.util.TimeZone;
49+
import java.util.concurrent.ArrayBlockingQueue;
50+
import java.util.concurrent.BlockingQueue;
4751
import java.util.concurrent.TimeUnit;
4852

4953
/**
@@ -100,8 +104,9 @@ public static void setupClass() throws Exception {
100104
try (Connection connection = DriverManager.getConnection(connectionUrl, connProperties)) {
101105
// create table
102106
try (Statement statement = connection.createStatement()) {
103-
statement.execute(String.format("CREATE TABLE %s (id int PRIMARY KEY, name varchar(50), bday date null)",
104-
CUSTOMERS_TABLE));
107+
statement.execute(
108+
String.format("CREATE TABLE %s (id int PRIMARY KEY, name varchar(50) not null, bday date null)",
109+
CUSTOMERS_TABLE));
105110
}
106111

107112
// insert sample data
@@ -142,30 +147,25 @@ public void test() throws InterruptedException {
142147
Assert.assertEquals(4, eventEmitter.getDdlEvents().size());
143148
Assert.assertEquals(2, eventEmitter.getDmlEvents().size());
144149

145-
/*
146-
This currently would fails.
147-
Need to investigate why it's DROP_TABLE, CREATE_DATABASE, CREATE_DATABASE, CREATE_TABLE
148150
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(0);
149-
Assert.assertEquals(DDLOperation.DROP_DATABASE, ddlEvent.getOperation());
151+
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
150152
Assert.assertEquals(DB, ddlEvent.getDatabase());
153+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
151154

152155
ddlEvent = eventEmitter.getDdlEvents().get(1);
153-
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
156+
Assert.assertEquals(DDLOperation.DROP_DATABASE, ddlEvent.getOperation());
154157
Assert.assertEquals(DB, ddlEvent.getDatabase());
155158

156159
ddlEvent = eventEmitter.getDdlEvents().get(2);
157-
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
160+
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
158161
Assert.assertEquals(DB, ddlEvent.getDatabase());
159-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
160-
*/
161162

162-
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(3);
163+
ddlEvent = eventEmitter.getDdlEvents().get(3);
163164
Assert.assertEquals(DDLOperation.CREATE_TABLE, ddlEvent.getOperation());
164165
Assert.assertEquals(DB, ddlEvent.getDatabase());
165166
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
166167
Assert.assertEquals(Collections.singletonList("id"), ddlEvent.getPrimaryKey());
167-
// TODO: this would currently fail, seems to incorrectly return the schema as non-nullable
168-
//Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
168+
Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
169169

170170
DMLEvent dmlEvent = eventEmitter.getDmlEvents().get(0);
171171
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
@@ -177,8 +177,7 @@ public void test() throws InterruptedException {
177177
.set("name", "alice")
178178
.setDate("bday", LocalDate.ofEpochDay(0))
179179
.build();
180-
// this fails with schemas that are different
181-
// Assert.assertEquals(expected, row);
180+
Assert.assertEquals(expected, row);
182181

183182
dmlEvent = eventEmitter.getDmlEvents().get(1);
184183
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
@@ -190,7 +189,37 @@ public void test() throws InterruptedException {
190189
.set("name", "bob")
191190
.setDate("bday", LocalDate.ofEpochDay(365))
192191
.build();
193-
// this fails with schemas that are different
194-
// Assert.assertEquals(expected, row);
192+
Assert.assertEquals(expected, row);
193+
}
194+
195+
@Test
196+
public void stopReaderTest() throws Exception {
197+
SourceTable sourceTable = new SourceTable(DB, CUSTOMERS_TABLE, null,
198+
Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
199+
200+
DeltaSourceContext context = new MockContext(Driver.class);
201+
BlockingQueue<DDLEvent> ddlEvents = new ArrayBlockingQueue<>(1);
202+
BlockingQueue<DMLEvent> dmlEvents = new ArrayBlockingQueue<>(1);
203+
EventEmitter eventEmitter = new BlockingEventEmitter(ddlEvents, dmlEvents);
204+
MySqlConfig config = new MySqlConfig("localhost", port, "root", password, 13, DB,
205+
TimeZone.getDefault().getID());
206+
207+
MySqlEventReader eventReader = new MySqlEventReader(Collections.singleton(sourceTable), config,
208+
context, eventEmitter);
209+
210+
eventReader.start(new Offset());
211+
212+
int count = 0;
213+
while (ddlEvents.size() < 1 && count < 100) {
214+
TimeUnit.MILLISECONDS.sleep(50);
215+
count++;
216+
}
217+
218+
if (count >= 100) {
219+
Assert.fail("Reader never emitted any events.");
220+
}
221+
222+
eventReader.stop();
223+
Assert.assertFalse(eventReader.failedToStop());
195224
}
196225
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerDeltaSource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public void configure(Configurer configurer) {
5959

6060
@Override
6161
public EventReader createReader(EventReaderDefinition definition, DeltaSourceContext context, EventEmitter emitter) {
62-
LOG.info("creating new event reader");
6362
return new SqlServerEventReader(definition.getTables(), config, context, emitter);
6463
}
6564

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.cdap.delta.api.SourceTable;
2929
import io.cdap.delta.plugin.common.Records;
3030
import io.debezium.connector.sqlserver.SourceInfo;
31+
import io.debezium.embedded.StopConnectorException;
3132
import org.apache.kafka.connect.data.Struct;
3233
import org.apache.kafka.connect.source.SourceRecord;
3334
import org.slf4j.Logger;
@@ -63,7 +64,6 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
6364
this.databaseName = databaseName;
6465
this.trackingTables = new HashSet<>();
6566
this.sourceTableMap = sourceTableMap;
66-
LOG.info("created new record consumer.");
6767
}
6868

6969
@Override
@@ -80,14 +80,8 @@ public void accept(SourceRecord sourceRecord) {
8080
Map<String, String> deltaOffset = SqlServerConstantOffsetBackingStore.serializeOffsets(sourceRecord);
8181
Offset recordOffset = new Offset(deltaOffset);
8282
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
83-
Object snapshotVal = sourceRecord.sourceOffset().get(SourceInfo.SNAPSHOT_KEY);
84-
LOG.info("snapshot val = {}", snapshotVal);
8583

8684
boolean isSnapshot = Boolean.TRUE.equals(sourceRecord.sourceOffset().get(SourceInfo.SNAPSHOT_KEY));
87-
LOG.info("{}, offset:", this);
88-
for (Map.Entry<String, String> entry : deltaOffset.entrySet()) {
89-
LOG.info(" {} = {}", entry.getKey(), entry.getValue());
90-
}
9185

9286
DMLOperation op;
9387
String opStr = val.get("op");
@@ -146,8 +140,6 @@ public void accept(SourceRecord sourceRecord) {
146140
.setSnapshot(isSnapshot);
147141

148142
Schema schema = value.getSchema();
149-
LOG.info("isSnapshot = {}", isSnapshot);
150-
LOG.info("tracking tables has {} elements, = {}", trackingTables.size(), trackingTables);
151143
// send the ddl event if the first see the table and the it is in snapshot.
152144
// Note: the delta app itself have prevented adding CREATE_TABLE operation into DDL blacklist for all the tables.
153145
if (!trackingTables.contains(trackingTable) && isSnapshot) {
@@ -172,9 +164,8 @@ public void accept(SourceRecord sourceRecord) {
172164
.setPrimaryKey(primaryFields)
173165
.build());
174166
} catch (InterruptedException e) {
175-
LOG.debug("Interrupted while emitting change event.", e);
176-
Thread.currentThread().interrupt();
177-
return;
167+
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
168+
throw new StopConnectorException("Interrupted while emitting an event.");
178169
}
179170
trackingTables.add(trackingTable);
180171
}
@@ -203,8 +194,8 @@ public void accept(SourceRecord sourceRecord) {
203194
try {
204195
emitter.emit(dmlBuilder.build());
205196
} catch (InterruptedException e) {
206-
LOG.debug("Interrupted while emitting change event.", e);
207-
Thread.currentThread().interrupt();
197+
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
198+
throw new StopConnectorException("Interrupted while emitting an event.");
208199
}
209200
}
210201
}

sqlserver-delta-plugins/src/test/java/io.cdap.delta.sqlserver/SqlServerEventReaderIntegrationTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.cdap.delta.plugin.mock.BlockingEventEmitter;
3131
import io.cdap.delta.plugin.mock.MockContext;
3232
import io.cdap.delta.plugin.mock.MockEventEmitter;
33-
import org.eclipse.jetty.util.BlockingArrayQueue;
3433
import org.junit.Assert;
3534
import org.junit.BeforeClass;
3635
import org.junit.Test;
@@ -46,6 +45,7 @@
4645
import java.time.LocalDate;
4746
import java.util.Collections;
4847
import java.util.Properties;
48+
import java.util.concurrent.ArrayBlockingQueue;
4949
import java.util.concurrent.BlockingQueue;
5050
import java.util.concurrent.TimeUnit;
5151

@@ -202,8 +202,8 @@ public void testEventReaderStop() throws Exception {
202202
Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
203203

204204
DeltaSourceContext context = new MockContext(SQLServerDriver.class);
205-
BlockingQueue<DDLEvent> ddlEvents = new BlockingArrayQueue<>(1);
206-
BlockingQueue<DMLEvent> dmlEvents = new BlockingArrayQueue<>(1);
205+
BlockingQueue<DDLEvent> ddlEvents = new ArrayBlockingQueue<>(1);
206+
BlockingQueue<DMLEvent> dmlEvents = new ArrayBlockingQueue<>(1);
207207
EventEmitter eventEmitter = new BlockingEventEmitter(ddlEvents, dmlEvents);
208208
SqlServerConfig config = new SqlServerConfig("localhost", port, "sa", password,
209209
DB, null, "mssql");
@@ -213,9 +213,16 @@ public void testEventReaderStop() throws Exception {
213213

214214
eventReader.start(new Offset());
215215

216-
while (ddlEvents.size() < 1) {
217-
TimeUnit.MILLISECONDS.sleep(10);
216+
int count = 0;
217+
while (ddlEvents.size() < 1 && count < 100) {
218+
TimeUnit.MILLISECONDS.sleep(50);
219+
count++;
218220
}
221+
222+
if (count >= 100) {
223+
Assert.fail("Reader never emitted any events.");
224+
}
225+
219226
eventReader.stop();
220227
Assert.assertFalse(eventReader.failedToStop());
221228
}

0 commit comments

Comments
 (0)