Skip to content

Commit 8c9f1bc

Browse files
committed
feat: add filter for message processing
1 parent f9c117d commit 8c9f1bc

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

connector.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package cdc
22

33
import (
44
"context"
5+
"fmt"
6+
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
57

68
cdc "github.com/Trendyol/go-pq-cdc"
79
"github.com/Trendyol/go-pq-cdc-elasticsearch/config"
@@ -98,6 +100,10 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
98100
return
99101
}
100102

103+
if !c.processMessage(msg) {
104+
return
105+
}
106+
101107
actions := c.handler(msg)
102108
if len(actions) == 0 {
103109
if err := ctx.Ack(); err != nil {
@@ -117,3 +123,23 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
117123
c.bulk.AddActions(ctx, msg.EventTime, actions, msg.TableNamespace, msg.TableName, true)
118124
}
119125
}
126+
127+
func (c *connector) processMessage(msg Message) bool {
128+
if len(c.cfg.Elasticsearch.TableIndexMapping) == 0 {
129+
return true
130+
}
131+
132+
fullTableName := fmt.Sprintf("%s.%s", msg.TableNamespace, msg.TableName)
133+
134+
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullTableName]; exists {
135+
return true
136+
}
137+
138+
t, ok := timescaledb.HyperTables.Load(fullTableName)
139+
if !ok {
140+
return false
141+
}
142+
143+
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
144+
return exists
145+
}

0 commit comments

Comments
 (0)