@@ -64,6 +64,12 @@ public class MySqlEventReaderIntegrationTest {
6464 Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
6565 Schema .Field .of ("name" , Schema .of (Schema .Type .STRING )),
6666 Schema .Field .of ("bday" , Schema .nullableOf (Schema .of (Schema .LogicalType .DATE ))));
67+ private static final String BINARYCOL_TABLE = "binarycoltable" ;
68+ private static final Schema EXPECTED_BINARYCOL_SCHEMA = Schema .recordOf (
69+ BINARYCOL_TABLE ,
70+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
71+ Schema .Field .of ("bincol" , Schema .of (Schema .Type .STRING )));
72+
6773 private static String password ;
6874 private static int port ;
6975
@@ -109,6 +115,13 @@ public static void setupClass() throws Exception {
109115 CUSTOMERS_TABLE ));
110116 }
111117
118+ // create table with Binary col
119+ try (Statement statement = connection .createStatement ()) {
120+ statement .execute (
121+ String .format ("CREATE TABLE %s (id int PRIMARY KEY, bincol BINARY(16) not null)" ,
122+ BINARYCOL_TABLE ));
123+ }
124+
112125 // insert sample data
113126 try (PreparedStatement ps = connection .prepareStatement (String .format ("INSERT INTO %s VALUES (?, ?, ?)" ,
114127 CUSTOMERS_TABLE ))) {
@@ -239,4 +252,30 @@ public void stopReaderTest() throws Exception {
239252 eventReader .stop ();
240253 Assert .assertFalse (eventReader .failedToStop ());
241254 }
242- }
255+
256+ @ Test
257+ public void testBinaryHandlingModebyDebezium () throws InterruptedException {
258+ //https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-binary-handling-mode
259+ SourceTable sourceTable = new SourceTable (DB , BINARYCOL_TABLE , null ,
260+ Collections .emptySet (), Collections .emptySet (), Collections .emptySet ());
261+
262+ MockContext context = new MockContext (Driver .class );
263+ context .addRuntimeArgument (MySqlEventReader .SOURCE_CONNECTOR_PREFIX + "binary.handling.mode" , "HEX" );
264+
265+ MockEventEmitter eventEmitter = new MockEventEmitter (4 );
266+ MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
267+ TimeZone .getDefault ().getID ());
268+
269+ MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
270+ context , eventEmitter );
271+ eventReader .start (new Offset ());
272+
273+ eventEmitter .waitForExpectedEvents (30 , TimeUnit .SECONDS );
274+
275+ DDLEvent ddlEvent = eventEmitter .getDdlEvents ().get (3 );
276+ Assert .assertEquals (DDLOperation .Type .CREATE_TABLE , ddlEvent .getOperation ().getType ());
277+ Assert .assertEquals (DB , ddlEvent .getOperation ().getDatabaseName ());
278+ Assert .assertEquals (BINARYCOL_TABLE , ddlEvent .getOperation ().getTableName ());
279+ Assert .assertEquals (EXPECTED_BINARYCOL_SCHEMA , ddlEvent .getSchema ());
280+ }
281+ }
0 commit comments