3030import io .cdap .delta .plugin .mock .BlockingEventEmitter ;
3131import io .cdap .delta .plugin .mock .MockContext ;
3232import io .cdap .delta .plugin .mock .MockEventEmitter ;
33+ import org .junit .AfterClass ;
3334import org .junit .Assert ;
3435import org .junit .BeforeClass ;
3536import org .junit .Test ;
4142import java .sql .Date ;
4243import java .sql .DriverManager ;
4344import java .sql .PreparedStatement ;
45+ import java .sql .SQLException ;
4446import java .sql .Statement ;
4547import java .time .LocalDate ;
4648import java .util .Collections ;
5759 * are some classloading issues due to copied debezium classes.
5860 */
5961public class MySqlEventReaderIntegrationTest {
60- private static final String DB = "test" ;
62+ private static final int CONSUMER_ID = 13 ;
6163 private static final String CUSTOMERS_TABLE = "customers" ;
6264 private static final Schema CUSTOMERS_SCHEMA = Schema .recordOf (
6365 "customers" ,
@@ -69,9 +71,14 @@ public class MySqlEventReaderIntegrationTest {
6971 BINARYCOL_TABLE ,
7072 Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
7173 Schema .Field .of ("bincol" , Schema .of (Schema .Type .STRING )));
74+ private static final String HOST = "localhost" ;
75+ private static final String DB = "test" ;
76+ private static final String USER = "root" ;
7277
7378 private static String password ;
7479 private static int port ;
80+ private static Properties connProperties ;
81+ private static String connectionUrl ;
7582
7683 @ BeforeClass
7784 public static void setupClass () throws Exception {
@@ -83,10 +90,10 @@ public static void setupClass() throws Exception {
8390 }
8491 port = Integer .parseInt (properties .getProperty ("mysql.port" ));
8592
86- Properties connProperties = new Properties ();
87- connProperties .put ("user" , "root" );
93+ connProperties = new Properties ();
94+ connProperties .put ("user" , USER );
8895 connProperties .put ("password" , password );
89- String connectionUrl = String .format ("jdbc:mysql://localhost:%d" , port );
96+ connectionUrl = String .format ("jdbc:mysql://localhost:%d" , port );
9097 DriverManager .getDriver (connectionUrl );
9198
9299 // wait until a connection can be established
@@ -145,14 +152,24 @@ public static void setupClass() throws Exception {
145152 }
146153 }
147154
155+ @ AfterClass
156+ public static void tearDown () throws SQLException {
157+ // drop database
158+ try (Connection connection = DriverManager .getConnection (connectionUrl , connProperties )) {
159+ try (Statement statement = connection .createStatement ()) {
160+ statement .execute ("DROP DATABASE " + DB );
161+ }
162+ }
163+ }
164+
148165 @ Test
149166 public void test () throws InterruptedException {
150167 SourceTable sourceTable = new SourceTable (DB , CUSTOMERS_TABLE , null ,
151168 Collections .emptySet (), Collections .emptySet (), Collections .emptySet ());
152169
153170 DeltaSourceContext context = new MockContext (Driver .class );
154- MockEventEmitter eventEmitter = new MockEventEmitter (6 );
155- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
171+ MockEventEmitter eventEmitter = new MockEventEmitter (7 );
172+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
156173 TimeZone .getDefault ().getID ());
157174
158175 MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
@@ -163,7 +180,7 @@ public void test() throws InterruptedException {
163180 eventEmitter .waitForExpectedEvents (30 , TimeUnit .SECONDS );
164181
165182 Assert .assertEquals (4 , eventEmitter .getDdlEvents ().size ());
166- Assert .assertEquals (2 , eventEmitter .getDmlEvents ().size ());
183+ Assert .assertEquals (3 , eventEmitter .getDmlEvents ().size ());
167184
168185 DDLEvent ddlEvent = eventEmitter .getDdlEvents ().get (0 );
169186 Assert .assertEquals (DDLOperation .Type .DROP_TABLE , ddlEvent .getOperation ().getType ());
@@ -183,14 +200,20 @@ public void test() throws InterruptedException {
183200 Assert .assertEquals (DB , ddlEvent .getOperation ().getDatabaseName ());
184201 Assert .assertEquals (CUSTOMERS_TABLE , ddlEvent .getOperation ().getTableName ());
185202 Assert .assertEquals (Collections .singletonList ("id" ), ddlEvent .getPrimaryKey ());
203+
186204 Assert .assertEquals (CUSTOMERS_SCHEMA , ddlEvent .getSchema ());
187205
188206 DMLEvent dmlEvent = eventEmitter .getDmlEvents ().get (0 );
189207 Assert .assertEquals (DMLOperation .Type .INSERT , dmlEvent .getOperation ().getType ());
190208 Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
191209 Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
192210 StructuredRecord row = dmlEvent .getRow ();
193- StructuredRecord expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
211+
212+ // Take schema name from the row as it is generated by debezium
213+ // so it can be different from the one in our schema but does not impact data
214+ Schema expectedSchema = Schema .recordOf (row .getSchema ().getRecordName (), CUSTOMERS_SCHEMA .getFields ());
215+
216+ StructuredRecord expected = StructuredRecord .builder (expectedSchema )
194217 .set ("id" , 0 )
195218 .set ("name" , "alice" )
196219 .setDate ("bday" , LocalDate .ofEpochDay (0 ))
@@ -202,7 +225,7 @@ public void test() throws InterruptedException {
202225 Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
203226 Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
204227 row = dmlEvent .getRow ();
205- expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
228+ expected = StructuredRecord .builder (expectedSchema )
206229 .set ("id" , 1 )
207230 .set ("name" , "bob" )
208231 .setDate ("bday" , LocalDate .ofEpochDay (365 ))
@@ -214,7 +237,7 @@ public void test() throws InterruptedException {
214237 Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
215238 Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
216239 row = dmlEvent .getRow ();
217- expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
240+ expected = StructuredRecord .builder (expectedSchema )
218241 .set ("id" , 2 )
219242 .set ("name" , "tim" )
220243 .setDate ("bday" , null )
@@ -231,7 +254,7 @@ public void stopReaderTest() throws Exception {
231254 BlockingQueue <DDLEvent > ddlEvents = new ArrayBlockingQueue <>(1 );
232255 BlockingQueue <DMLEvent > dmlEvents = new ArrayBlockingQueue <>(1 );
233256 EventEmitter eventEmitter = new BlockingEventEmitter (ddlEvents , dmlEvents );
234- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
257+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
235258 TimeZone .getDefault ().getID ());
236259
237260 MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
@@ -263,7 +286,7 @@ public void testBinaryHandlingModebyDebezium() throws InterruptedException {
263286 context .addRuntimeArgument (MySqlEventReader .SOURCE_CONNECTOR_PREFIX + "binary.handling.mode" , "HEX" );
264287
265288 MockEventEmitter eventEmitter = new MockEventEmitter (4 );
266- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
289+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
267290 TimeZone .getDefault ().getID ());
268291
269292 MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
0 commit comments