Skip to content

Commit ccd12fc

Browse files
authored
Merge pull request #227 from data-integrations/fix-stop-reader
CDAP-20394: Implement correct event reader stop method
2 parents 07a40dc + 441859a commit ccd12fc

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.delta.api.EventReader;
2323
import io.cdap.delta.api.Offset;
2424
import io.cdap.delta.api.SourceTable;
25+
import io.cdap.delta.api.StopContext;
2526
import io.cdap.delta.plugin.common.DBSchemaHistory;
2627
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
2728
import io.cdap.delta.plugin.common.RuntimeArguments;
@@ -171,7 +172,10 @@ public void start(Offset offset) {
171172
}
172173
}
173174

174-
public void stop() throws InterruptedException {
175+
public void stop(StopContext stopContext) throws InterruptedException {
176+
LOG.info("Stopping debezium engine, reason: " + stopContext.getOrigin());
177+
// Debezium engine is implicitly stopped as part of this call
178+
// Refer EmbeddedEngine docs
175179
executorService.shutdownNow();
176180
if (!executorService.awaitTermination(2, TimeUnit.MINUTES)) {
177181
LOG.warn("Unable to cleanly shutdown reader within the timeout.");

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.delta.api.EventReader;
2323
import io.cdap.delta.api.Offset;
2424
import io.cdap.delta.api.SourceTable;
25+
import io.cdap.delta.api.StopContext;
2526
import io.cdap.delta.plugin.common.DBSchemaHistory;
2627
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
2728
import io.cdap.delta.plugin.common.RuntimeArguments;
@@ -176,7 +177,10 @@ public void start(Offset offset) {
176177
}
177178

178179
@Override
179-
public void stop() throws InterruptedException {
180+
public void stop(StopContext stopContext) throws InterruptedException {
181+
LOG.info("Stopping debezium engine, reason: " + stopContext.getOrigin());
182+
// Debezium engine is implicitly stopped as part of this call
183+
// Refer EmbeddedEngine docs
180184
executorService.shutdownNow();
181185
if (!executorService.awaitTermination(2, TimeUnit.MINUTES)) {
182186
failedStopping = true;

0 commit comments

Comments
 (0)