Skip to content

Commit d8efcb6

Browse files
MehmetFiratKomurcufirat.komurcu
andauthored
feat: add painless script update action
* #31 add painless script feat. * #12 add painless script feat. * 12 add painless script feat. * #12 refactor: reorder fields in Action and Script structs for consistency * create go.sum file --------- Co-authored-by: firat.komurcu <firat.komurcu@trendyol.com>
1 parent 56f93e4 commit d8efcb6

File tree

7 files changed

+554
-6
lines changed

7 files changed

+554
-6
lines changed

elasticsearch/action.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
package elasticsearch
22

3+
import "encoding/json"
4+
35
type ActionType string
46

57
const (
6-
Index ActionType = "Index"
7-
Delete ActionType = "Delete"
8+
Index ActionType = "Index"
9+
Delete ActionType = "Delete"
10+
ScriptUpdate ActionType = "ScriptUpdate"
811
)
912

1013
type Action struct {
1114
Routing *string
12-
Type ActionType
1315
IndexName string
16+
Type ActionType
1417
Source []byte
1518
ID []byte
1619
}
1720

21+
type Script struct {
22+
Params map[string]interface{} `json:"params,omitempty"`
23+
Source string `json:"source"`
24+
}
25+
1826
func NewDeleteAction(key []byte, routing *string) Action {
1927
return Action{
2028
ID: key,
@@ -31,3 +39,13 @@ func NewIndexAction(key []byte, source []byte, routing *string) Action {
3139
Type: Index,
3240
}
3341
}
42+
43+
func NewScriptUpdateAction(id []byte, script Script, routing *string) Action {
44+
scriptBytes, _ := json.Marshal(script)
45+
return Action{
46+
ID: id,
47+
Type: ScriptUpdate,
48+
Source: scriptBytes,
49+
Routing: routing,
50+
}
51+
}

elasticsearch/bulk/bulk.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,13 @@ func (b *Bulk) AddActions(
171171
var (
172172
indexPrefix = []byte(`{"index":{"_index":"`)
173173
deletePrefix = []byte(`{"delete":{"_index":"`)
174+
updatePrefix = []byte(`{"update":{"_index":"`)
175+
scriptPrefix = []byte(`{"script":`)
174176
idPrefix = []byte(`","_id":"`)
175177
typePrefix = []byte(`","_type":"`)
176178
routingPrefix = []byte(`","routing":"`)
177179
postFix = []byte(`"}}`)
180+
scriptPostfix = []byte(`,"scripted_upsert":true}`)
178181
)
179182

180183
var metaPool = sync.Pool{
@@ -200,11 +203,15 @@ func isTypeSupported(version string) bool {
200203
func getEsActionJSON(docID []byte, action elasticsearch2.ActionType, indexName string, routing *string, source []byte, typeName []byte, esVersion string) []byte {
201204
meta := metaPool.Get().([]byte)[:0]
202205

203-
if action == elasticsearch2.Index {
206+
switch action {
207+
case elasticsearch2.Index:
204208
meta = append(meta, indexPrefix...)
205-
} else {
209+
case elasticsearch2.Delete:
206210
meta = append(meta, deletePrefix...)
211+
case elasticsearch2.ScriptUpdate:
212+
meta = append(meta, updatePrefix...)
207213
}
214+
208215
meta = append(meta, []byte(indexName)...)
209216
meta = append(meta, idPrefix...)
210217
meta = append(meta, bytes.EscapePredefinedBytes(docID)...)
@@ -217,9 +224,16 @@ func getEsActionJSON(docID []byte, action elasticsearch2.ActionType, indexName s
217224
meta = append(meta, typeName...)
218225
}
219226
meta = append(meta, postFix...)
220-
if action == elasticsearch2.Index {
227+
228+
switch action {
229+
case elasticsearch2.Index:
230+
meta = append(meta, '\n')
231+
meta = append(meta, source...)
232+
case elasticsearch2.ScriptUpdate:
221233
meta = append(meta, '\n')
234+
meta = append(meta, scriptPrefix...)
222235
meta = append(meta, source...)
236+
meta = append(meta, scriptPostfix...)
223237
}
224238
meta = append(meta, '\n')
225239
return meta

example/script-update/README.md

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Script Update Example
2+
3+
This example demonstrates how to use painless scripts and partial updates with go-pq-cdc-elasticsearch.
4+
5+
## Features Demonstrated
6+
7+
1. Script Updates
8+
- Simple field updates
9+
- Conditional updates
10+
- Using script parameters
11+
12+
2. Doc Updates
13+
- Partial document updates
14+
- Field-specific updates
15+
16+
## Prerequisites
17+
18+
- Docker and Docker Compose
19+
- Go 1.20 or later
20+
21+
## Setup
22+
23+
1. Start the required services:
24+
25+
```bash
26+
export STACK_VERSION=7.17.11
27+
docker-compose up -d
28+
```
29+
30+
2. Wait for all services to be healthy (check with `docker-compose ps`)
31+
32+
3. Create the database table and insert sample data:
33+
34+
```sql
35+
psql "postgres://script_cdc_user:script_cdc_pass@127.0.0.1/script_cdc_db?replication=database"
36+
37+
CREATE TABLE products (
38+
id serial PRIMARY KEY,
39+
name text NOT NULL,
40+
price decimal(10,2),
41+
stock integer,
42+
last_updated timestamptz
43+
);
44+
45+
-- Insert sample data
46+
INSERT INTO products (name, price, stock, last_updated)
47+
VALUES
48+
('Product 1', 99.99, 100, NOW()),
49+
('Product 2', 149.99, 50, NOW()),
50+
('Product 3', 199.99, 25, NOW());
51+
```
52+
53+
## Running the Example
54+
55+
1. Build and run the example:
56+
57+
```bash
58+
go mod tidy
59+
go run main.go
60+
```
61+
62+
2. Test different update scenarios:
63+
64+
```sql
65+
-- Update stock using script
66+
UPDATE products SET stock = stock + 10 WHERE id = 1;
67+
68+
-- Update price conditionally
69+
UPDATE products SET price = 89.99 WHERE id = 2;
70+
71+
-- Update multiple fields
72+
UPDATE products SET stock = stock - 5, last_updated = NOW() WHERE id = 3;
73+
```
74+
75+
## Understanding the Code
76+
77+
The example demonstrates three types of updates:
78+
79+
1. **Script Updates for Stock Changes**:
80+
81+
```go
82+
script := map[string]interface{}{
83+
"source": "ctx._source.stock = params.new_stock",
84+
"params": map[string]interface{}{
85+
"new_stock": newStock,
86+
},
87+
}
88+
```
89+
90+
2. **Conditional Script Updates for Price Changes**:
91+
92+
```go
93+
script := map[string]interface{}{
94+
"source": "if (ctx._source.price != params.new_price) { ctx._source.price = params.new_price }",
95+
"params": map[string]interface{}{
96+
"new_price": newPrice,
97+
},
98+
}
99+
```
100+
101+
3. **Doc Updates for Other Fields**:
102+
103+
```go
104+
updateData := map[string]interface{}{
105+
"name": msg.NewData["name"],
106+
"last_updated": msg.NewData["last_updated"],
107+
}
108+
```
109+
110+
## Monitoring
111+
112+
- Check Elasticsearch indices: http://localhost:9200/products/_search
113+
- View logs: `docker-compose logs -f`
114+
- Monitor metrics: http://localhost:8081/metrics
115+
116+
## Cleanup
117+
118+
```bash
119+
docker-compose down -v
120+
```
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
version: "3"
2+
services:
3+
postgres:
4+
image: postgres:16.2
5+
restart: always
6+
command: ["-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10"]
7+
environment:
8+
POSTGRES_USER: "script_cdc_user"
9+
POSTGRES_PASSWORD: "script_cdc_pass"
10+
POSTGRES_DB: "script_cdc_db"
11+
POSTGRES_HOST_AUTH_METHOD: trust
12+
ports:
13+
- 5432:5432
14+
15+
es01:
16+
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.11
17+
labels:
18+
co.elastic.logs/module: elasticsearch
19+
volumes:
20+
- es01_data:/usr/share/elasticsearch/data
21+
networks:
22+
- cdc_net
23+
ports:
24+
- 9200:9200
25+
environment:
26+
- node.name=es01
27+
- cluster.name=script_cdc_cluster
28+
- discovery.type=single-node
29+
- ELASTIC_PASSWORD=script_cdc_es_pass
30+
- bootstrap.memory_lock=true
31+
- xpack.security.enabled=true
32+
- xpack.security.http.ssl.enabled=false
33+
- xpack.security.transport.ssl.enabled=false
34+
ulimits:
35+
memlock:
36+
soft: -1
37+
hard: -1
38+
healthcheck:
39+
test:
40+
[
41+
"CMD-SHELL",
42+
"curl -u elastic:script_cdc_es_pass -I -o /dev/null http://localhost:9200 -w '%{http_code}' | grep -q '200'",
43+
]
44+
interval: 10s
45+
timeout: 10s
46+
retries: 120
47+
48+
kibana:
49+
depends_on:
50+
es01:
51+
condition: service_healthy
52+
image: docker.elastic.co/kibana/kibana:7.17.11
53+
networks:
54+
- cdc_net
55+
labels:
56+
co.elastic.logs/module: kibana
57+
volumes:
58+
- kibana1_data:/usr/share/kibana/data
59+
ports:
60+
- "5601:5601"
61+
environment:
62+
- SERVERNAME=kibana
63+
- ELASTICSEARCH_HOSTS=http://es01:9200
64+
- ELASTICSEARCH_USERNAME=elastic
65+
- ELASTICSEARCH_PASSWORD=script_cdc_es_pass
66+
healthcheck:
67+
test:
68+
[
69+
"CMD-SHELL",
70+
"curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
71+
]
72+
interval: 10s
73+
timeout: 10s
74+
retries: 120
75+
76+
volumes:
77+
postgres_data: null
78+
kibana1_data: null
79+
es01_data: null
80+
81+
networks:
82+
cdc_net:
83+
driver: bridge

example/script-update/go.mod

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
module script-update
2+
3+
go 1.22.5
4+
5+
toolchain go1.23.4
6+
7+
require (
8+
github.com/Trendyol/go-pq-cdc v0.0.12
9+
github.com/Trendyol/go-pq-cdc-elasticsearch v0.0.0
10+
)
11+
12+
require (
13+
github.com/andybalholm/brotli v1.1.0 // indirect
14+
github.com/avast/retry-go/v4 v4.6.0 // indirect
15+
github.com/beorn7/perks v1.0.1 // indirect
16+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
17+
github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect
18+
github.com/go-playground/errors v3.3.0+incompatible // indirect
19+
github.com/jackc/pgpassfile v1.0.0 // indirect
20+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
21+
github.com/jackc/pgx/v5 v5.6.0 // indirect
22+
github.com/json-iterator/go v1.1.12 // indirect
23+
github.com/klauspost/compress v1.17.9 // indirect
24+
github.com/lib/pq v1.10.9 // indirect
25+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
26+
github.com/modern-go/reflect2 v1.0.2 // indirect
27+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
28+
github.com/prometheus/client_golang v1.19.1 // indirect
29+
github.com/prometheus/client_model v0.6.1 // indirect
30+
github.com/prometheus/common v0.55.0 // indirect
31+
github.com/prometheus/procfs v0.15.1 // indirect
32+
github.com/valyala/bytebufferpool v1.0.0 // indirect
33+
github.com/valyala/fasthttp v1.55.0 // indirect
34+
golang.org/x/crypto v0.24.0 // indirect
35+
golang.org/x/sync v0.7.0 // indirect
36+
golang.org/x/sys v0.21.0 // indirect
37+
golang.org/x/text v0.16.0 // indirect
38+
google.golang.org/protobuf v1.34.2 // indirect
39+
gopkg.in/yaml.v2 v2.4.0 // indirect
40+
)
41+
42+
replace github.com/Trendyol/go-pq-cdc-elasticsearch => ../../

0 commit comments

Comments
 (0)