Skip to content

Commit ad53a3d

Browse files
committed
feat: Auto-detect Elasticsearch version and improve partitioned table handling
1 parent a2afb1c commit ad53a3d

File tree

4 files changed

+48
-13
lines changed

4 files changed

+48
-13
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ This setup ensures continuous data synchronization and minimal downtime in captu
176176
| `elasticsearch.compressionEnabled` | bool | no | false | Enable compression for large messages | Useful if message sizes are large, but may increase CPU usage. |
177177
| `elasticsearch.disableDiscoverNodesOnStart` | bool | no | false | Disable node discovery on client initialization | Skips node discovery when the client starts. |
178178
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5 min | Periodic node discovery interval | Specify in a human-readable format, e.g., `5m` for 5 minutes. |
179-
| `elasticsearch.version` | string | no | 6.8.0 | Elasticsearch version to determine compatibility features | Used to handle version-specific behaviors, such as `_type` parameter support (removed in ES 8.0+). |
179+
| `elasticsearch.version` | string | no | - | Elasticsearch version to determine compatibility features | Used to handle version-specific behaviors, such as `_type` parameter support (removed in ES 8.0+). If not specified, version is automatically detected from the cluster. |
180180

181181
### API
182182

@@ -216,7 +216,7 @@ The connector supports different versions of Elasticsearch through the `elastics
216216
| Below 8.0 | `_type` parameter is included in the index requests |
217217
| 8.0 and above | `_type` parameter is automatically omitted |
218218

219-
If no version is specified, the connector defaults to "7.0.0" behavior (including `_type` parameter).
219+
If no version is specified, the connector will automatically detect the Elasticsearch cluster version by querying the Info API after connection. This eliminates the need to manually configure the version.
220220

221221
### Breaking Changes
222222

config/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,4 @@ func (c *Config) SetDefault() {
5555
duration := 5 * time.Minute
5656
c.Elasticsearch.DiscoverNodesInterval = &duration
5757
}
58-
59-
if c.Elasticsearch.Version == "" {
60-
c.Elasticsearch.Version = "7.0.0"
61-
}
6258
}

connector.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,23 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
139139
func (c *connector) resolveTableToIndexName(fullTableName, tableNamespace, tableName string) string {
140140
tableIndexMapping := c.cfg.Elasticsearch.TableIndexMapping
141141
if len(tableIndexMapping) == 0 {
142-
return fullTableName
142+
return ""
143143
}
144144

145145
if indexName, exists := tableIndexMapping[fullTableName]; exists {
146146
return indexName
147147
}
148148

149-
parentTableName := c.getParentTableName(fullTableName, tableNamespace, tableName)
150-
if parentTableName != "" {
151-
if indexName, exists := tableIndexMapping[parentTableName]; exists {
149+
if t, ok := timescaledb.HyperTables.Load(fullTableName); ok {
150+
parentName := t.(string)
151+
if indexName, exists := tableIndexMapping[parentName]; exists {
152152
return indexName
153153
}
154154
}
155155

156-
if t, ok := timescaledb.HyperTables.Load(fullTableName); ok {
157-
parentName := t.(string)
158-
if indexName, exists := tableIndexMapping[parentName]; exists {
156+
parentTableName := c.getParentTableName(fullTableName, tableNamespace, tableName)
157+
if parentTableName != "" {
158+
if indexName, exists := tableIndexMapping[parentTableName]; exists {
159159
return indexName
160160
}
161161
}

elasticsearch/client/client.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package client
22

33
import (
4+
"encoding/json"
5+
46
"github.com/Trendyol/go-pq-cdc-elasticsearch/config"
7+
"github.com/Trendyol/go-pq-cdc/logger"
58
"github.com/elastic/go-elasticsearch/v7"
69
"github.com/go-playground/errors"
710
)
@@ -31,5 +34,41 @@ func NewClient(config *config.Config) (*elasticsearch.Client, error) {
3134
return nil, errors.New("unauthorized")
3235
}
3336

37+
if config.Elasticsearch.Version == "" {
38+
version, err := detectElasticsearchVersion(client)
39+
if err != nil {
40+
logger.Warn("elasticsearch version detection failed", "error", err, "fallback_version", "7.0.0", "hint", "specify 'elasticsearch.version' in config to set manually")
41+
config.Elasticsearch.Version = "7.0.0"
42+
} else {
43+
logger.Info("elasticsearch version detected", "version", version)
44+
config.Elasticsearch.Version = version
45+
}
46+
}
47+
3448
return client, nil
3549
}
50+
51+
func detectElasticsearchVersion(client *elasticsearch.Client) (string, error) {
52+
info, err := client.Info()
53+
if err != nil {
54+
return "", err
55+
}
56+
57+
var response map[string]interface{}
58+
if err := json.NewDecoder(info.Body).Decode(&response); err != nil {
59+
return "", err
60+
}
61+
defer info.Body.Close()
62+
63+
version, ok := response["version"].(map[string]interface{})
64+
if !ok {
65+
return "", errors.New("version info not found in Elasticsearch response")
66+
}
67+
68+
number, ok := version["number"].(string)
69+
if !ok {
70+
return "", errors.New("version number not found in Elasticsearch response")
71+
}
72+
73+
return number, nil
74+
}

0 commit comments

Comments
 (0)