Skip to content

Commit 5e80df3

Browse files
authored
fix: add support for elasticsearch v8 indexing
2 parents d6e0d96 + ad53a3d commit 5e80df3

File tree

5 files changed

+98
-37
lines changed

5 files changed

+98
-37
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ This setup ensures continuous data synchronization and minimal downtime in captu
176176
| `elasticsearch.compressionEnabled` | bool | no | false | Enable compression for large messages | Useful if message sizes are large, but may increase CPU usage. |
177177
| `elasticsearch.disableDiscoverNodesOnStart` | bool | no | false | Disable node discovery on client initialization | Skips node discovery when the client starts. |
178178
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5 min | Periodic node discovery interval | Specify in a human-readable format, e.g., `5m` for 5 minutes. |
179+
| `elasticsearch.version` | string | no | - | Elasticsearch version to determine compatibility features | Used to handle version-specific behaviors, such as `_type` parameter support (removed in ES 8.0+). If not specified, version is automatically detected from the cluster. |
179180

180181
### API
181182

@@ -206,6 +207,17 @@ All cdc related metrics are automatically injected. It means you don't need to d
206207
|-------------------|-----------------------------------|
207208
| 0.0.2 or higher | 14 |
208209

210+
### Elasticsearch Version Compatibility
211+
212+
The connector supports different versions of Elasticsearch through the `elasticsearch.version` configuration parameter:
213+
214+
| Elasticsearch Version | Type Parameter Behavior |
215+
|-----------------------|--------------------------------------------------------|
216+
| Below 8.0 | `_type` parameter is included in the index requests |
217+
| 8.0 and above | `_type` parameter is automatically omitted |
218+
219+
If no version is specified, the connector will automatically detect the Elasticsearch cluster version by querying the Info API after connection. This eliminates the need to manually configure the version.
220+
209221
### Breaking Changes
210222

211223
| Date taking effect | Version | Change | How to check |

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Elasticsearch struct {
1515
MaxIdleConnDuration *time.Duration `yaml:"maxIdleConnDuration"`
1616
DiscoverNodesInterval *time.Duration `yaml:"discoverNodesInterval"`
1717
TypeName string `yaml:"typeName"`
18+
Version string `yaml:"version"`
1819
URLs []string `yaml:"urls"`
1920
BatchSizeLimit int `yaml:"batchSizeLimit"`
2021
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`

connector.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
106106
return
107107
}
108108

109-
if !c.processMessage(msg) {
109+
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
110+
111+
indexName := c.resolveTableToIndexName(fullTableName, msg.TableNamespace, msg.TableName)
112+
if indexName == "" {
110113
if err := ctx.Ack(); err != nil {
111114
logger.Error("ack", "error", err)
112115
}
@@ -126,32 +129,38 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
126129
chunks := slices.ChunkWithSize[elasticsearch.Action](actions, batchSizeLimit)
127130
lastChunkIndex := len(chunks) - 1
128131
for idx, chunk := range chunks {
129-
c.bulk.AddActions(ctx, msg.EventTime, chunk, msg.TableNamespace, msg.TableName, idx == lastChunkIndex)
132+
c.bulk.AddActions(ctx, msg.EventTime, chunk, indexName, idx == lastChunkIndex)
130133
}
131134
} else {
132-
c.bulk.AddActions(ctx, msg.EventTime, actions, msg.TableNamespace, msg.TableName, true)
135+
c.bulk.AddActions(ctx, msg.EventTime, actions, indexName, true)
133136
}
134137
}
135138

136-
func (c *connector) processMessage(msg Message) bool {
137-
if len(c.cfg.Elasticsearch.TableIndexMapping) == 0 {
138-
return true
139+
func (c *connector) resolveTableToIndexName(fullTableName, tableNamespace, tableName string) string {
140+
tableIndexMapping := c.cfg.Elasticsearch.TableIndexMapping
141+
if len(tableIndexMapping) == 0 {
142+
return ""
139143
}
140144

141-
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
145+
if indexName, exists := tableIndexMapping[fullTableName]; exists {
146+
return indexName
147+
}
142148

143-
if _, exists := c.cfg.Elasticsearch.TableIndexMapping[fullTableName]; exists {
144-
return true
149+
if t, ok := timescaledb.HyperTables.Load(fullTableName); ok {
150+
parentName := t.(string)
151+
if indexName, exists := tableIndexMapping[parentName]; exists {
152+
return indexName
153+
}
145154
}
146155

147-
t, ok := timescaledb.HyperTables.Load(fullTableName)
148-
if ok {
149-
_, exists := c.cfg.Elasticsearch.TableIndexMapping[t.(string)]
150-
return exists
156+
parentTableName := c.getParentTableName(fullTableName, tableNamespace, tableName)
157+
if parentTableName != "" {
158+
if indexName, exists := tableIndexMapping[parentTableName]; exists {
159+
return indexName
160+
}
151161
}
152162

153-
parentTableName := c.getParentTableName(fullTableName, msg.TableNamespace, msg.TableName)
154-
return parentTableName != ""
163+
return ""
155164
}
156165

157166
func (c *connector) getParentTableName(fullTableName, tableNamespace, tableName string) string {

elasticsearch/bulk/bulk.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
cdc "github.com/Trendyol/go-pq-cdc"
12-
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
1312
"github.com/elastic/go-elasticsearch/v7"
1413

1514
"github.com/Trendyol/go-pq-cdc-elasticsearch/config"
@@ -33,7 +32,7 @@ type Indexer interface {
3332
ctx *replication.ListenerContext,
3433
eventTime time.Time,
3534
actions []elasticsearch2.Action,
36-
tableNamespace, tableName string,
35+
indexName string,
3736
isLastChunk bool,
3837
)
3938
GetMetric() Metric
@@ -43,7 +42,6 @@ type Indexer interface {
4342
type Bulk struct {
4443
metric Metric
4544
responseHandler elasticsearch2.ResponseHandler
46-
indexMapping map[string]string
4745
config *config.Config
4846
batchKeys map[string]int
4947
batchTicker *time.Ticker
@@ -91,7 +89,6 @@ func NewBulk(
9189
isClosed: make(chan bool, 1),
9290
esClient: esClient,
9391
metric: NewMetric(pqCDC, config.CDC.Slot.Name),
94-
indexMapping: config.Elasticsearch.TableIndexMapping,
9592
config: config,
9693
typeName: []byte(config.Elasticsearch.TypeName),
9794
readers: readers,
@@ -118,12 +115,12 @@ func (b *Bulk) AddActions(
118115
ctx *replication.ListenerContext,
119116
eventTime time.Time,
120117
actions []elasticsearch2.Action,
121-
tableNamespace, tableName string,
118+
indexName string,
122119
isLastChunk bool,
123120
) {
124121
b.flushLock.Lock()
125122
for i, action := range actions {
126-
indexName := b.getIndexName(tableNamespace, tableName, action.IndexName)
123+
indexName := b.getIndexName(indexName, action.IndexName)
127124
actions[i].IndexName = indexName
128125
value := getEsActionJSON(
129126
action.ID,
@@ -132,6 +129,7 @@ func (b *Bulk) AddActions(
132129
action.Routing,
133130
action.Source,
134131
b.typeName,
132+
b.config.Elasticsearch.Version,
135133
)
136134

137135
b.metric.incrementOp(action.Type, indexName)
@@ -185,7 +183,21 @@ var metaPool = sync.Pool{
185183
},
186184
}
187185

188-
func getEsActionJSON(docID []byte, action elasticsearch2.ActionType, indexName string, routing *string, source []byte, typeName []byte) []byte {
186+
func isTypeSupported(version string) bool {
187+
if version == "" {
188+
return true
189+
}
190+
191+
parts := strings.Split(version, ".")
192+
if len(parts) == 0 {
193+
return true
194+
}
195+
196+
majorVersion := parts[0]
197+
return majorVersion < "8"
198+
}
199+
200+
func getEsActionJSON(docID []byte, action elasticsearch2.ActionType, indexName string, routing *string, source []byte, typeName []byte, esVersion string) []byte {
189201
meta := metaPool.Get().([]byte)[:0]
190202

191203
if action == elasticsearch2.Index {
@@ -200,7 +212,7 @@ func getEsActionJSON(docID []byte, action elasticsearch2.ActionType, indexName s
200212
meta = append(meta, routingPrefix...)
201213
meta = append(meta, []byte(*routing)...)
202214
}
203-
if typeName != nil {
215+
if typeName != nil && isTypeSupported(esVersion) {
204216
meta = append(meta, typePrefix...)
205217
meta = append(meta, typeName...)
206218
}
@@ -340,25 +352,13 @@ func joinErrors(body map[string]any) (map[string]string, error) {
340352
return ivd, fmt.Errorf(sb.String())
341353
}
342354

343-
func (b *Bulk) getIndexName(tableNamespace, tableName, actionIndexName string) string {
355+
func (b *Bulk) getIndexName(indexName, actionIndexName string) string {
344356
if actionIndexName != "" {
345357
return actionIndexName
346358
}
347359

348-
fullTableName := fmt.Sprintf("%s.%s", tableNamespace, tableName)
349-
350-
indexName := b.indexMapping[fullTableName]
351-
if indexName != "" {
352-
return indexName
353-
}
354-
355-
t, ok := timescaledb.HyperTables.Load(fullTableName)
356-
if ok {
357-
indexName = b.indexMapping[t.(string)]
358-
}
359-
360360
if indexName == "" {
361-
panic(fmt.Sprintf("there is no index mapping for table: %s.%s on your configuration", tableNamespace, tableName))
361+
panic(fmt.Sprintf("there is no index mapping for table: %s on your configuration", indexName))
362362
}
363363

364364
return indexName

elasticsearch/client/client.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package client
22

33
import (
4+
"encoding/json"
5+
46
"github.com/Trendyol/go-pq-cdc-elasticsearch/config"
7+
"github.com/Trendyol/go-pq-cdc/logger"
58
"github.com/elastic/go-elasticsearch/v7"
69
"github.com/go-playground/errors"
710
)
@@ -31,5 +34,41 @@ func NewClient(config *config.Config) (*elasticsearch.Client, error) {
3134
return nil, errors.New("unauthorized")
3235
}
3336

37+
if config.Elasticsearch.Version == "" {
38+
version, err := detectElasticsearchVersion(client)
39+
if err != nil {
40+
logger.Warn("elasticsearch version detection failed", "error", err, "fallback_version", "7.0.0", "hint", "specify 'elasticsearch.version' in config to set manually")
41+
config.Elasticsearch.Version = "7.0.0"
42+
} else {
43+
logger.Info("elasticsearch version detected", "version", version)
44+
config.Elasticsearch.Version = version
45+
}
46+
}
47+
3448
return client, nil
3549
}
50+
51+
func detectElasticsearchVersion(client *elasticsearch.Client) (string, error) {
52+
info, err := client.Info()
53+
if err != nil {
54+
return "", err
55+
}
56+
57+
var response map[string]interface{}
58+
if err := json.NewDecoder(info.Body).Decode(&response); err != nil {
59+
return "", err
60+
}
61+
defer info.Body.Close()
62+
63+
version, ok := response["version"].(map[string]interface{})
64+
if !ok {
65+
return "", errors.New("version info not found in Elasticsearch response")
66+
}
67+
68+
number, ok := version["number"].(string)
69+
if !ok {
70+
return "", errors.New("version number not found in Elasticsearch response")
71+
}
72+
73+
return number, nil
74+
}

0 commit comments

Comments
 (0)