Skip to content

Commit b72bf35

Browse files
support mysql timestamp data type correctly (#187)
support mysql timestamp data type correctly
1 parent 4d67842 commit b72bf35

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/Records.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.debezium.time.NanoTimestamp;
3131
import io.debezium.time.Time;
3232
import io.debezium.time.Timestamp;
33+
import io.debezium.time.ZonedTimestamp;
3334
import org.apache.kafka.connect.data.Decimal;
3435
import org.apache.kafka.connect.data.Field;
3536
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -204,7 +205,14 @@ public static StructuredRecord convert(Struct struct) {
204205
builder.setTimestamp(fieldName, getZonedDateTime((long) val, TimeUnit.MILLISECONDS));
205206
break;
206207
case TIMESTAMP_MICROS:
207-
builder.setTimestamp(fieldName, getZonedDateTime((long) val, TimeUnit.MICROSECONDS));
208+
if (val instanceof Long) {
209+
builder.setTimestamp(fieldName, getZonedDateTime((long) val, TimeUnit.MICROSECONDS));
210+
break;
211+
}
212+
if (val instanceof String) {
213+
builder.setTimestamp(fieldName, ZonedDateTime.parse((String)val));
214+
break;
215+
}
208216
break;
209217
case TIME_MILLIS:
210218
builder.setTime(fieldName, LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos((int) val)));
@@ -239,11 +247,16 @@ private static ZonedDateTime getZonedDateTime(long ts, TimeUnit unit) {
239247
long mod = unit.convert(1, TimeUnit.SECONDS);
240248
int fraction = (int) (ts % mod);
241249
long tsInSeconds = unit.toSeconds(ts);
250+
return getZonedDateTime(tsInSeconds, fraction, unit);
251+
}
252+
253+
private static ZonedDateTime getZonedDateTime(long epochSecond, long fraction, TimeUnit fractionUnit) {
242254
// create an Instant with time in seconds and fraction which will be stored as nano seconds.
243-
Instant instant = Instant.ofEpochSecond(tsInSeconds, unit.toNanos(fraction));
255+
Instant instant = Instant.ofEpochSecond(epochSecond, fractionUnit.toNanos(fraction));
244256
return ZonedDateTime.ofInstant(instant, ZoneOffset.UTC);
245257
}
246258

259+
247260
private static Object convert(org.apache.kafka.connect.data.Schema schema, Object val) {
248261
if (val == null) {
249262
return null;
@@ -302,7 +315,11 @@ public static Schema convert(org.apache.kafka.connect.data.Schema schema) {
302315
}
303316
break;
304317
case STRING:
305-
converted = Schema.of(Schema.Type.STRING);
318+
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
319+
converted = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
320+
} else {
321+
converted = Schema.of(Schema.Type.STRING);
322+
}
306323
break;
307324
case INT8:
308325
case INT16:

delta-plugins-common/src/test/java/io/cdap/delta/plugin/common/RecordsTest.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,30 @@
1717
package io.cdap.delta.plugin.common;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20-
import org.apache.kafka.connect.data.Schema;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.debezium.time.ZonedTimestamp;
2122
import org.apache.kafka.connect.data.SchemaBuilder;
2223
import org.apache.kafka.connect.data.Struct;
2324
import org.junit.Assert;
2425
import org.junit.Test;
2526

27+
import java.time.ZoneId;
28+
import java.time.ZonedDateTime;
29+
2630
/**
2731
* Test case for {@link Records} class.
2832
*/
2933
public class RecordsTest {
3034

3135
@Test
32-
public void testConvert() {
36+
public void testConvertTinyInt() {
3337
String fieldName = "priority";
34-
Schema dataSchema = SchemaBuilder.struct().name("TinyIntSchema").field(fieldName, Schema.INT16_SCHEMA).build();
38+
org.apache.kafka.connect.data.Schema dataSchema = SchemaBuilder
39+
.struct()
40+
.name("TinyIntSchema")
41+
.field(fieldName,
42+
org.apache.kafka.connect.data.Schema.INT16_SCHEMA)
43+
.build();
3544
Struct struct = new Struct(dataSchema);
3645
Short val = 1;
3746
struct.put(fieldName, val);
@@ -42,4 +51,22 @@ public void testConvert() {
4251
io.cdap.cdap.api.data.schema.Schema.Type.INT));
4352
Assert.assertEquals(val.intValue(), (int) convert.get(fieldName));
4453
}
54+
55+
@Test
56+
public void testConvertTimeStamp() {
57+
String fieldName = "timeCreated";
58+
org.apache.kafka.connect.data.Schema dataSchema =
59+
SchemaBuilder.struct()
60+
.name(fieldName).field(fieldName, ZonedTimestamp.schema()).build();
61+
Struct struct = new Struct(dataSchema);
62+
String val = "2011-12-03T10:15:30.030431+01:00";
63+
struct.put(fieldName, val);
64+
StructuredRecord converted = Records.convert(struct);
65+
Assert.assertEquals(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS),
66+
converted.getSchema().getField(fieldName).getSchema());
67+
Assert.assertEquals(converted.getTimestamp(fieldName),
68+
ZonedDateTime.parse(val).withZoneSameInstant(ZoneId.of("UTC")));
69+
70+
71+
}
4572
}

0 commit comments

Comments
 (0)