Skip to content

Commit f54888d

Browse files
committed
fix: add cache for table name lookups with @3n0ugh
1 parent 5d0a528 commit f54888d

File tree

1 file changed

+33
-6
lines changed

1 file changed

+33
-6
lines changed

connector.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"sync"
78

89
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
910

@@ -34,6 +35,7 @@ type connector struct {
3435
esClient *es.Client
3536
bulk bulk.Indexer
3637
metrics []prometheus.Collector
38+
partitionCache sync.Map
3739
}
3840

3941
func NewConnector(ctx context.Context, cfg config.Config, handler Handler, options ...Option) (Connector, error) {
@@ -136,7 +138,7 @@ func (c *connector) processMessage(msg Message) bool {
136138
return true
137139
}
138140

139-
fullTableName := fmt.Sprintf("%s.%s", msg.TableNamespace, msg.TableName)
141+
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
140142

141143
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullTableName]; exists {
142144
return true
@@ -148,15 +150,40 @@ func (c *connector) processMessage(msg Message) bool {
148150
return exists
149151
}
150152

151-
parentTableName := c.findParentTable(msg.TableNamespace, msg.TableName)
153+
parentTableName := c.getParentTableName(fullTableName, msg.TableNamespace, msg.TableName)
154+
return parentTableName != ""
155+
}
156+
157+
func (c *connector) getParentTableName(fullTableName, tableNamespace, tableName string) string {
158+
if cachedValue, found := c.partitionCache.Load(fullTableName); found {
159+
parentName, ok := cachedValue.(string)
160+
if !ok {
161+
logger.Error("invalid cache value type for table", "table", fullTableName)
162+
return ""
163+
}
164+
165+
if parentName != "" {
166+
logger.Debug("matched partition table to parent from cache",
167+
"partition", fullTableName,
168+
"parent", parentName)
169+
}
170+
return parentName
171+
}
172+
173+
parentTableName := c.findParentTable(tableNamespace, tableName)
174+
c.partitionCache.Store(fullTableName, parentTableName)
175+
152176
if parentTableName != "" {
153-
logger.Info("matched partition table to parent",
177+
logger.Debug("matched partition table to parent",
154178
"partition", fullTableName,
155179
"parent", parentTableName)
156-
return true
157180
}
158181

159-
return false
182+
return parentTableName
183+
}
184+
185+
func (c *connector) getFullTableName(tableNamespace, tableName string) string {
186+
return fmt.Sprintf("%s.%s", tableNamespace, tableName)
160187
}
161188

162189
func (c *connector) findParentTable(tableNamespace, tableName string) string {
@@ -167,7 +194,7 @@ func (c *connector) findParentTable(tableNamespace, tableName string) string {
167194

168195
for i := 1; i < len(tableParts); i++ {
169196
parentNameCandidate := strings.Join(tableParts[:i], "_")
170-
fullParentName := fmt.Sprintf("%s.%s", tableNamespace, parentNameCandidate)
197+
fullParentName := c.getFullTableName(tableNamespace, parentNameCandidate)
171198

172199
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullParentName]; exists {
173200
return fullParentName

0 commit comments

Comments
 (0)