Skip to content

Commit 5bbb47c

Browse files
committed
Improved Elasticsearch index resolution and bulk operations
1 parent 6794085 commit 5bbb47c

File tree

2 files changed

+30
-28
lines changed

2 files changed

+30
-28
lines changed

connector.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,8 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
108108

109109
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
110110

111-
tableName, exists := c.cfg.Elasticsearch.TableIndexMapping[fullTableName]
112-
if !exists {
113-
parentTableName := c.getParentTableName(fullTableName, msg.TableNamespace, msg.TableName)
114-
if parentTableName != "" {
115-
tableName = c.cfg.Elasticsearch.TableIndexMapping[parentTableName]
116-
}
117-
}
118-
119-
if !c.isTableInMapping(tableName) {
111+
indexName := c.resolveTableToIndexName(fullTableName, msg.TableNamespace, msg.TableName)
112+
if indexName == "" {
120113
if err := ctx.Ack(); err != nil {
121114
logger.Error("ack", "error", err)
122115
}
@@ -136,29 +129,38 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
136129
chunks := slices.ChunkWithSize[elasticsearch.Action](actions, batchSizeLimit)
137130
lastChunkIndex := len(chunks) - 1
138131
for idx, chunk := range chunks {
139-
c.bulk.AddActions(ctx, msg.EventTime, chunk, fullTableName, idx == lastChunkIndex)
132+
c.bulk.AddActions(ctx, msg.EventTime, chunk, indexName, idx == lastChunkIndex)
140133
}
141134
} else {
142-
c.bulk.AddActions(ctx, msg.EventTime, actions, tableName, true)
135+
c.bulk.AddActions(ctx, msg.EventTime, actions, indexName, true)
143136
}
144137
}
145138

146-
func (c *connector) isTableInMapping(tableName string) bool {
147-
if len(c.cfg.Elasticsearch.TableIndexMapping) == 0 {
148-
return true
139+
func (c *connector) resolveTableToIndexName(fullTableName, tableNamespace, tableName string) string {
140+
tableIndexMapping := c.cfg.Elasticsearch.TableIndexMapping
141+
if len(tableIndexMapping) == 0 {
142+
return fullTableName
149143
}
150144

151-
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[tableName]; exists {
152-
return true
145+
if indexName, exists := tableIndexMapping[fullTableName]; exists {
146+
return indexName
153147
}
154148

155-
t, ok := timescaledb.HyperTables.Load(tableName)
156-
if ok {
157-
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
158-
return exists
149+
parentTableName := c.getParentTableName(fullTableName, tableNamespace, tableName)
150+
if parentTableName != "" {
151+
if indexName, exists := tableIndexMapping[parentTableName]; exists {
152+
return indexName
153+
}
159154
}
160155

161-
return tableName != ""
156+
if t, ok := timescaledb.HyperTables.Load(fullTableName); ok {
157+
parentName := t.(string)
158+
if indexName, exists := tableIndexMapping[parentName]; exists {
159+
return indexName
160+
}
161+
}
162+
163+
return ""
162164
}
163165

164166
func (c *connector) getParentTableName(fullTableName, tableNamespace, tableName string) string {

elasticsearch/bulk/bulk.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Indexer interface {
3232
ctx *replication.ListenerContext,
3333
eventTime time.Time,
3434
actions []elasticsearch2.Action,
35-
tableName string,
35+
indexName string,
3636
isLastChunk bool,
3737
)
3838
GetMetric() Metric
@@ -115,12 +115,12 @@ func (b *Bulk) AddActions(
115115
ctx *replication.ListenerContext,
116116
eventTime time.Time,
117117
actions []elasticsearch2.Action,
118-
tableName string,
118+
indexName string,
119119
isLastChunk bool,
120120
) {
121121
b.flushLock.Lock()
122122
for i, action := range actions {
123-
indexName := b.getIndexName(tableName, action.IndexName)
123+
indexName := b.getIndexName(indexName, action.IndexName)
124124
actions[i].IndexName = indexName
125125
value := getEsActionJSON(
126126
action.ID,
@@ -352,16 +352,16 @@ func joinErrors(body map[string]any) (map[string]string, error) {
352352
return ivd, fmt.Errorf(sb.String())
353353
}
354354

355-
func (b *Bulk) getIndexName(tableName, actionIndexName string) string {
355+
func (b *Bulk) getIndexName(indexName, actionIndexName string) string {
356356
if actionIndexName != "" {
357357
return actionIndexName
358358
}
359359

360-
if tableName == "" {
361-
panic(fmt.Sprintf("there is no index mapping for table: %s on your configuration", tableName))
360+
if indexName == "" {
361+
panic(fmt.Sprintf("there is no index mapping for table: %s on your configuration", indexName))
362362
}
363363

364-
return tableName
364+
return indexName
365365
}
366366

367367
func (b *Bulk) handleResponse(batchActions []*elasticsearch2.Action, errs map[string]string) {

0 commit comments

Comments
 (0)