Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,46 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Schema;

import java.time.Instant;
import java.util.Optional;

/** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */
@Internal
public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInference {

private static final long serialVersionUID = 1L;

protected DataType inferString(Object value, Schema schema) {
// PostgreSQL TIMESTAMPTZ is encoded as ZonedTimestamp in Debezium
// We need to return TIMESTAMP_TZ (ZonedTimestampType) instead of TIMESTAMP_LTZ
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
.map(Instant::getNano)
.orElse(0);

int precision;
if (nano == 0) {
precision = 0;
} else if (nano % 1000 > 0) {
precision = 9;
} else if (nano % 1000_000 > 0) {
precision = 6;
} else if (nano % 1000_000_000 > 0) {
precision = 3;
} else {
precision = 0;
}
// Return TIMESTAMP_TZ (ZonedTimestampType) for PostgreSQL TIMESTAMPTZ
return DataTypes.TIMESTAMP_TZ(precision);
}
return super.inferString(value, schema);
}

protected DataType inferStruct(Object value, Schema schema) {
// the Geometry datatype in PostgresSQL will be converted to
// a String with Json format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -68,6 +68,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -298,7 +299,8 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
ZonedTimestampData.fromOffsetDateTime(
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
};

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
Expand Down Expand Up @@ -354,7 +356,8 @@ public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
ZonedTimestampData.fromOffsetDateTime(
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
};

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
Expand Down Expand Up @@ -409,7 +412,8 @@ public void testTimeTypesWithTemporalModeConnect() throws Exception {
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
ZonedTimestampData.fromOffsetDateTime(
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
};

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
Expand Down Expand Up @@ -1042,7 +1046,7 @@ private Instant toInstant(String ts) {
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP_LTZ(0));
DataTypes.TIMESTAMP_TZ(0));

private static final RowType HSTORE_TYPES_WITH_ADAPTIVE =
RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source;

import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Method;

import static org.assertj.core.api.Assertions.assertThat;

/** Test cases for {@link PostgresSchemaDataTypeInference}. */
class PostgresSchemaDataTypeInferenceTest {

private final PostgresSchemaDataTypeInference inference = new PostgresSchemaDataTypeInference();

private DataType inferString(Object value, Schema schema) {
try {
Method method =
PostgresSchemaDataTypeInference.class.getDeclaredMethod(
"inferString", Object.class, Schema.class);
method.setAccessible(true);
return (DataType) method.invoke(inference, value, schema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
void testInferZonedTimestampWithZeroPrecision() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result = inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema);

assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
}

@Test
void testInferZonedTimestampWithMillisecondPrecision() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result = inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema);

assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(3));
}

@Test
void testInferZonedTimestampWithMicrosecondPrecision() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result = inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema);

assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(6));
}

@Test
void testInferZonedTimestampWithNanosecondPrecision() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result = inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema);

assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(9));
}

@Test
void testInferZonedTimestampWithDifferentTimezones() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result1 = inferString("2020-07-17T18:00:22+05:30", zonedTimestampSchema);
DataType result2 = inferString("2020-07-17T18:00:22-08:00", zonedTimestampSchema);
DataType result3 = inferString("2020-07-17T18:00:22Z", zonedTimestampSchema);

assertThat(result1).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
assertThat(result2).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
assertThat(result3).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
}

@Test
void testInferZonedTimestampWithNullValue() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

DataType result = inferString(null, zonedTimestampSchema);

assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
}

@Test
void testInferNonZonedTimestampString() {
Schema regularStringSchema =
SchemaBuilder.string().name("some.other.schema").optional().build();

DataType result = inferString("some string value", regularStringSchema);

assertThat(result).isEqualTo(DataTypes.STRING());
}

@Test
void testInferZonedTimestampWithVariousPrecisions() {
Schema zonedTimestampSchema =
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();

assertThat(inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(0));

assertThat(inferString("2020-07-17T18:00:22.1+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));

assertThat(inferString("2020-07-17T18:00:22.12+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));

assertThat(inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));

assertThat(inferString("2020-07-17T18:00:22.1234+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(6));

assertThat(inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(6));

assertThat(inferString("2020-07-17T18:00:22.1234567+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(9));

assertThat(inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema))
.isEqualTo(DataTypes.TIMESTAMP_TZ(9));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -54,6 +55,7 @@
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -65,6 +67,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -196,6 +199,8 @@ protected DeserializationRuntimeConverter createNotNullConverter(DataType type)
return this::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
case TIMESTAMP_WITH_TIME_ZONE:
return this::convertToZonedTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return this::convertToLocalTimeZoneTimestamp;
case FLOAT:
Expand Down Expand Up @@ -367,6 +372,35 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
+ dbzObj.getClass().getName());
}

protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// ZonedTimestamp type is encoded in string type with timezone offset
// Format: ISO-8601 with timezone offset (e.g., "2020-07-17T18:00:22+00:00")
// According to Debezium documentation, PostgreSQL TIMESTAMPTZ is ALWAYS encoded as
// String
// with ZonedTimestamp.SCHEMA_NAME, regardless of time.precision.mode
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
// Parse using Debezium's ZonedTimestamp formatter
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str, ZonedTimestamp.FORMATTER);
return ZonedTimestampData.fromOffsetDateTime(offsetDateTime);
} else {
// Fallback to standard ISO-8601 parsing
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str);
return ZonedTimestampData.fromOffsetDateTime(offsetDateTime);
}
}
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP WITH TIME ZONE from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName()
+ " with schema name '"
+ (schema != null ? schema.name() : "null")
+ "'. PostgreSQL TIMESTAMPTZ should always be encoded as String with "
+ ZonedTimestamp.SCHEMA_NAME);
}

protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
Expand Down
Loading