diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java index 90a66978029..2073c812390 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java @@ -25,14 +25,46 @@ import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; import io.debezium.data.geometry.Point; +import io.debezium.time.ZonedTimestamp; import org.apache.kafka.connect.data.Schema; +import java.time.Instant; +import java.util.Optional; + /** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */ @Internal public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInference { private static final long serialVersionUID = 1L; + protected DataType inferString(Object value, Schema schema) { + // PostgreSQL TIMESTAMPTZ is encoded as ZonedTimestamp in Debezium + // We need to return TIMESTAMP_TZ (ZonedTimestampType) instead of TIMESTAMP_LTZ + if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { + int nano = + Optional.ofNullable((String) value) + .map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from)) + .map(Instant::getNano) + .orElse(0); + + int precision; + if (nano == 0) { + precision = 0; + } else if (nano % 1000 > 0) { + precision = 9; + } else if (nano % 1000_000 > 0) { + precision = 6; + } else if (nano % 1000_000_000 > 0) { + precision = 3; + } else { + precision = 0; + } + // Return TIMESTAMP_TZ (ZonedTimestampType) for PostgreSQL TIMESTAMPTZ + return DataTypes.TIMESTAMP_TZ(precision); + } + return super.inferString(value, schema); + } + protected DataType inferStruct(Object value, Schema schema) { // the Geometry datatype in PostgresSQL will be converted to // a String with Json format diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 0dac3c153e8..89613ce88f2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -23,10 +23,10 @@ import org.apache.flink.cdc.common.data.ArrayData; import org.apache.flink.cdc.common.data.DateData; import org.apache.flink.cdc.common.data.DecimalData; -import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimeData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryMapData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -68,6 +68,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; @@ -298,7 +299,8 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception { TimestampData.fromLocalDateTime( LocalDateTime.parse("2020-07-17T18:00:22.123456")), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), - LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + ZonedTimestampData.fromOffsetDateTime( + OffsetDateTime.parse("2020-07-17T10:00:22Z")), }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; @@ -354,7 +356,8 @@ public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception { TimestampData.fromLocalDateTime( LocalDateTime.parse("2020-07-17T18:00:22.123456")), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), - LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + ZonedTimestampData.fromOffsetDateTime( + OffsetDateTime.parse("2020-07-17T10:00:22Z")), }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; @@ -409,7 +412,8 @@ public void testTimeTypesWithTemporalModeConnect() throws Exception { TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")), - LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + ZonedTimestampData.fromOffsetDateTime( + OffsetDateTime.parse("2020-07-17T10:00:22Z")), }; List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; @@ -1042,7 +1046,7 @@ private Instant toInstant(String ts) { DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(), - DataTypes.TIMESTAMP_LTZ(0)); + DataTypes.TIMESTAMP_TZ(0)); private static final RowType HSTORE_TYPES_WITH_ADAPTIVE = RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInferenceTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInferenceTest.java new file mode 100644 index 00000000000..60ebfdc6c66 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInferenceTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import io.debezium.time.ZonedTimestamp; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test cases for {@link PostgresSchemaDataTypeInference}. */ +class PostgresSchemaDataTypeInferenceTest { + + private final PostgresSchemaDataTypeInference inference = new PostgresSchemaDataTypeInference(); + + private DataType inferString(Object value, Schema schema) { + try { + Method method = + PostgresSchemaDataTypeInference.class.getDeclaredMethod( + "inferString", Object.class, Schema.class); + method.setAccessible(true); + return (DataType) method.invoke(inference, value, schema); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void testInferZonedTimestampWithZeroPrecision() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result = inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema); + + assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + } + + @Test + void testInferZonedTimestampWithMillisecondPrecision() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result = inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema); + + assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(3)); + } + + @Test + void testInferZonedTimestampWithMicrosecondPrecision() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result = inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema); + + assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(6)); + } + + @Test + void testInferZonedTimestampWithNanosecondPrecision() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result = inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema); + + assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(9)); + } + + @Test + void testInferZonedTimestampWithDifferentTimezones() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result1 = inferString("2020-07-17T18:00:22+05:30", zonedTimestampSchema); + DataType result2 = inferString("2020-07-17T18:00:22-08:00", zonedTimestampSchema); + DataType result3 = inferString("2020-07-17T18:00:22Z", zonedTimestampSchema); + + assertThat(result1).isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + assertThat(result2).isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + assertThat(result3).isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + } + + @Test + void testInferZonedTimestampWithNullValue() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + DataType result = inferString(null, zonedTimestampSchema); + + assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + } + + @Test + void testInferNonZonedTimestampString() { + Schema regularStringSchema = + SchemaBuilder.string().name("some.other.schema").optional().build(); + + DataType result = inferString("some string value", regularStringSchema); + + assertThat(result).isEqualTo(DataTypes.STRING()); + } + + @Test + void testInferZonedTimestampWithVariousPrecisions() { + Schema zonedTimestampSchema = + SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + assertThat(inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(0)); + + assertThat(inferString("2020-07-17T18:00:22.1+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(3)); + + assertThat(inferString("2020-07-17T18:00:22.12+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(3)); + + assertThat(inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(3)); + + assertThat(inferString("2020-07-17T18:00:22.1234+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(6)); + + assertThat(inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(6)); + + assertThat(inferString("2020-07-17T18:00:22.1234567+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(9)); + + assertThat(inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema)) + .isEqualTo(DataTypes.TIMESTAMP_TZ(9)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index c484709ff58..0bd1cb94929 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimeData; import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -54,6 +55,7 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTimestamp; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -65,6 +67,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -196,6 +199,8 @@ protected DeserializationRuntimeConverter createNotNullConverter(DataType type) return this::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return this::convertToTimestamp; + case TIMESTAMP_WITH_TIME_ZONE: + return this::convertToZonedTimestamp; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return this::convertToLocalTimeZoneTimestamp; case FLOAT: @@ -367,6 +372,35 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) { + dbzObj.getClass().getName()); } + protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // ZonedTimestamp type is encoded in string type with timezone offset + // Format: ISO-8601 with timezone offset (e.g., "2020-07-17T18:00:22+00:00") + // According to Debezium documentation, PostgreSQL TIMESTAMPTZ is ALWAYS encoded as + // String + // with ZonedTimestamp.SCHEMA_NAME, regardless of time.precision.mode + if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { + // Parse using Debezium's ZonedTimestamp formatter + OffsetDateTime offsetDateTime = OffsetDateTime.parse(str, ZonedTimestamp.FORMATTER); + return ZonedTimestampData.fromOffsetDateTime(offsetDateTime); + } else { + // Fallback to standard ISO-8601 parsing + OffsetDateTime offsetDateTime = OffsetDateTime.parse(str); + return ZonedTimestampData.fromOffsetDateTime(offsetDateTime); + } + } + throw new IllegalArgumentException( + "Unable to convert to TIMESTAMP WITH TIME ZONE from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName() + + " with schema name '" + + (schema != null ? schema.name() : "null") + + "'. PostgreSQL TIMESTAMPTZ should always be encoded as String with " + + ZonedTimestamp.SCHEMA_NAME); + } + protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) { if (dbzObj instanceof String) { String str = (String) dbzObj; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/test/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchemaTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/test/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchemaTest.java new file mode 100644 index 00000000000..57141f6d1cb --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/test/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchemaTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.debezium.event; + +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; + +import io.debezium.time.ZonedTimestamp; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test cases for {@link DebeziumEventDeserializationSchema}. */ +class DebeziumEventDeserializationSchemaTest { + + private TestDebeziumEventDeserializationSchema deserializer; + + @BeforeEach + void setUp() { + deserializer = new TestDebeziumEventDeserializationSchema(); + } + + @Test + void testConvertToZonedTimestampWithZonedTimestampSchema() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22+00:00"; + Object result = deserializer.convertToZonedTimestamp(timestampStr, schema); + + assertThat(result).isInstanceOf(ZonedTimestampData.class); + ZonedTimestampData zonedTimestampData = (ZonedTimestampData) result; + OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER); + assertThat(zonedTimestampData.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected); + } + + @Test + void testConvertToZonedTimestampWithDifferentTimezones() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestamp1 = "2020-07-17T18:00:22+05:30"; + String timestamp2 = "2020-07-17T18:00:22-08:00"; + String timestamp3 = "2020-07-17T18:00:22Z"; + + ZonedTimestampData result1 = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp1, schema); + ZonedTimestampData result2 = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp2, schema); + ZonedTimestampData result3 = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp3, schema); + + assertThat(result1.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestamp1, ZonedTimestamp.FORMATTER)); + assertThat(result2.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestamp2, ZonedTimestamp.FORMATTER)); + assertThat(result3.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestamp3, ZonedTimestamp.FORMATTER)); + } + + @Test + void testConvertToZonedTimestampWithMillisecondPrecision() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22.123+00:00"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + assertThat(result.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER)); + } + + @Test + void testConvertToZonedTimestampWithMicrosecondPrecision() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22.123456+00:00"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + assertThat(result.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER)); + } + + @Test + void testConvertToZonedTimestampWithNanosecondPrecision() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22.123456789+00:00"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + assertThat(result.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER)); + } + + @Test + void testConvertToZonedTimestampWithStandardIso8601Format() throws Exception { + Schema schema = SchemaBuilder.string().name("some.other.schema").optional().build(); + + String timestampStr = "2020-07-17T18:00:22+00:00"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + assertThat(result.getZonedDateTime().toOffsetDateTime()) + .isEqualTo(OffsetDateTime.parse(timestampStr)); + } + + @Test + void testConvertToZonedTimestampThrowsExceptionForNonString() { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + assertThatThrownBy(() -> deserializer.convertToZonedTimestamp(12345, schema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unable to convert to TIMESTAMP WITH TIME ZONE"); + } + + @Test + void testConvertToZonedTimestampPreservesTimezoneOffset() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22+05:30"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER); + assertThat(result.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected); + assertThat(result.getZonedDateTime().toOffsetDateTime().getOffset()) + .isEqualTo(ZoneOffset.of("+05:30")); + } + + @Test + void testConvertToZonedTimestampWithUtcTimezone() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String timestampStr = "2020-07-17T18:00:22Z"; + ZonedTimestampData result = + (ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema); + + OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER); + assertThat(result.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected); + assertThat(result.getZonedDateTime().toOffsetDateTime().getOffset()) + .isEqualTo(ZoneOffset.UTC); + } + + @Test + void testConvertToZonedTimestampRoundTrip() throws Exception { + Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build(); + + String originalTimestamp = "2020-07-17T18:00:22.123456+05:30"; + ZonedTimestampData result = + (ZonedTimestampData) + deserializer.convertToZonedTimestamp(originalTimestamp, schema); + + OffsetDateTime converted = result.getZonedDateTime().toOffsetDateTime(); + String roundTrip = ZonedTimestamp.FORMATTER.format(converted); + + assertThat(roundTrip).isEqualTo(originalTimestamp); + } + + private static class TestDebeziumEventDeserializationSchema + extends DebeziumEventDeserializationSchema { + + public TestDebeziumEventDeserializationSchema() { + super( + new org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference(), + DebeziumChangelogMode.ALL); + } + + public Object convertToZonedTimestamp(Object dbzObj, Schema schema) { + return super.convertToZonedTimestamp(dbzObj, schema); + } + + @Override + protected boolean isDataChangeRecord(org.apache.kafka.connect.source.SourceRecord record) { + return false; + } + + @Override + protected boolean isSchemaChangeRecord( + org.apache.kafka.connect.source.SourceRecord record) { + return false; + } + + @Override + protected org.apache.flink.cdc.common.event.TableId getTableId( + org.apache.kafka.connect.source.SourceRecord record) { + return org.apache.flink.cdc.common.event.TableId.tableId("test", "test"); + } + + @Override + protected java.util.Map getMetadata( + org.apache.kafka.connect.source.SourceRecord record) { + return java.util.Collections.emptyMap(); + } + + @Override + protected java.util.List + deserializeSchemaChangeRecord(org.apache.kafka.connect.source.SourceRecord record) { + return java.util.Collections.emptyList(); + } + } +}