Skip to content

Commit 5d0a528

Browse files
committed
fix: Add support for PostgreSQL partitioned tables
1 parent 860e974 commit 5d0a528

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed

connector.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cdc
33
import (
44
"context"
55
"fmt"
6+
"strings"
67

78
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
89

@@ -142,10 +143,36 @@ func (c *connector) processMessage(msg Message) bool {
142143
}
143144

144145
t, ok := timescaledb.HyperTables.Load(fullTableName)
145-
if !ok {
146-
return false
146+
if ok {
147+
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
148+
return exists
147149
}
148150

149-
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
150-
return exists
151+
parentTableName := c.findParentTable(msg.TableNamespace, msg.TableName)
152+
if parentTableName != "" {
153+
logger.Info("matched partition table to parent",
154+
"partition", fullTableName,
155+
"parent", parentTableName)
156+
return true
157+
}
158+
159+
return false
160+
}
161+
162+
func (c *connector) findParentTable(tableNamespace, tableName string) string {
163+
tableParts := strings.Split(tableName, "_")
164+
if len(tableParts) <= 1 {
165+
return ""
166+
}
167+
168+
for i := 1; i < len(tableParts); i++ {
169+
parentNameCandidate := strings.Join(tableParts[:i], "_")
170+
fullParentName := fmt.Sprintf("%s.%s", tableNamespace, parentNameCandidate)
171+
172+
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullParentName]; exists {
173+
return fullParentName
174+
}
175+
}
176+
177+
return ""
151178
}

0 commit comments

Comments
 (0)