Skip to content

Commit d6e0d96

Browse files
authored
Merge pull request #10 from mertbilgic/fix/elastic-mapping-for-partioned-table
fix: Add support for PostgreSQL partitioned tables by @mertbilgic
2 parents 860e974 + 6bc376c commit d6e0d96

File tree

1 file changed

+60
-6
lines changed

1 file changed

+60
-6
lines changed

connector.go

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package cdc
33
import (
44
"context"
55
"fmt"
6+
"strings"
7+
"sync"
68

79
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
810

@@ -26,11 +28,12 @@ type Connector interface {
2628
}
2729

2830
type connector struct {
31+
partitionCache sync.Map
2932
handler Handler
3033
responseHandler elasticsearch.ResponseHandler
3134
cfg *config.Config
32-
cdc cdc.Connector
3335
esClient *es.Client
36+
cdc cdc.Connector
3437
bulk bulk.Indexer
3538
metrics []prometheus.Collector
3639
}
@@ -135,17 +138,68 @@ func (c *connector) processMessage(msg Message) bool {
135138
return true
136139
}
137140

138-
fullTableName := fmt.Sprintf("%s.%s", msg.TableNamespace, msg.TableName)
141+
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
139142

140143
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullTableName]; exists {
141144
return true
142145
}
143146

144147
t, ok := timescaledb.HyperTables.Load(fullTableName)
145-
if !ok {
146-
return false
148+
if ok {
149+
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
150+
return exists
151+
}
152+
153+
parentTableName := c.getParentTableName(fullTableName, msg.TableNamespace, msg.TableName)
154+
return parentTableName != ""
155+
}
156+
157+
func (c *connector) getParentTableName(fullTableName, tableNamespace, tableName string) string {
158+
if cachedValue, found := c.partitionCache.Load(fullTableName); found {
159+
parentName, ok := cachedValue.(string)
160+
if !ok {
161+
logger.Error("invalid cache value type for table", "table", fullTableName)
162+
return ""
163+
}
164+
165+
if parentName != "" {
166+
logger.Debug("matched partition table to parent from cache",
167+
"partition", fullTableName,
168+
"parent", parentName)
169+
}
170+
return parentName
171+
}
172+
173+
parentTableName := c.findParentTable(tableNamespace, tableName)
174+
c.partitionCache.Store(fullTableName, parentTableName)
175+
176+
if parentTableName != "" {
177+
logger.Debug("matched partition table to parent",
178+
"partition", fullTableName,
179+
"parent", parentTableName)
180+
}
181+
182+
return parentTableName
183+
}
184+
185+
func (c *connector) getFullTableName(tableNamespace, tableName string) string {
186+
return fmt.Sprintf("%s.%s", tableNamespace, tableName)
187+
}
188+
189+
func (c *connector) findParentTable(tableNamespace, tableName string) string {
190+
tableParts := strings.Split(tableName, "_")
191+
if len(tableParts) <= 1 {
192+
return ""
193+
}
194+
195+
for i := 1; i < len(tableParts); i++ {
196+
parentNameCandidate := strings.Join(tableParts[:i], "_")
197+
fullParentName := c.getFullTableName(tableNamespace, parentNameCandidate)
198+
199+
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullParentName]; exists {
200+
return fullParentName
201+
}
147202
}
148203

149-
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
150-
return exists
204+
return ""
151205
}

0 commit comments

Comments
 (0)