Skip to content

Commit d089519

Browse files
committed
feat: add prometheus operations metrics by index name
1 parent eca38c5 commit d089519

File tree

4 files changed

+95
-12
lines changed

4 files changed

+95
-12
lines changed

connector.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cdc
33
import (
44
"context"
55
"fmt"
6+
67
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
78

89
cdc "github.com/Trendyol/go-pq-cdc"
@@ -60,7 +61,9 @@ func NewConnector(ctx context.Context, cfg config.Config, handler Handler, optio
6061
esConnector.bulk, err = bulk.NewBulk(
6162
esConnector.cfg,
6263
esClient,
63-
bulk.WithResponseHandler(esConnector.responseHandler))
64+
pqCDC,
65+
bulk.WithResponseHandler(esConnector.responseHandler),
66+
)
6467
if err != nil {
6568
return nil, errors.Wrap(err, "elasticsearch new bulk")
6669
}
@@ -101,6 +104,9 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
101104
}
102105

103106
if !c.processMessage(msg) {
107+
if err := ctx.Ack(); err != nil {
108+
logger.Error("ack", "error", err)
109+
}
104110
return
105111
}
106112

elasticsearch/bulk/bulk.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"time"
1010

11+
cdc "github.com/Trendyol/go-pq-cdc"
1112
"github.com/Trendyol/go-pq-cdc/pq/timescaledb"
1213
"github.com/elastic/go-elasticsearch/v7"
1314

@@ -59,6 +60,7 @@ type Bulk struct {
5960
batchByteSize int
6061
concurrentRequest int
6162
flushLock sync.Mutex
63+
pqCDC *cdc.Connector
6264
}
6365

6466
type BatchItem struct {
@@ -69,6 +71,7 @@ type BatchItem struct {
6971
func NewBulk(
7072
config *config.Config,
7173
esClient *elasticsearch.Client,
74+
pqCDC cdc.Connector,
7275
options ...Option,
7376
) (*Bulk, error) {
7477
readers := make([]*bytes.MultiDimensionReader, config.Elasticsearch.ConcurrentRequest)
@@ -88,7 +91,7 @@ func NewBulk(
8891
batchByteSizeLimit: int(batchByteSizeLimit),
8992
isClosed: make(chan bool, 1),
9093
esClient: esClient,
91-
metric: NewMetric(config.CDC.Slot.Name),
94+
metric: NewMetric(pqCDC, config.CDC.Slot.Name),
9295
indexMapping: config.Elasticsearch.TableIndexMapping,
9396
config: config,
9497
typeName: []byte(config.Elasticsearch.TypeName),
@@ -132,6 +135,8 @@ func (b *Bulk) AddActions(
132135
b.typeName,
133136
)
134137

138+
b.metric.incrementOp(action.Type, indexName)
139+
135140
key := getActionKey(actions[i])
136141
if batchIndex, ok := b.batchKeys[key]; ok {
137142
b.batchByteSize += len(value) - len(b.batch[batchIndex].Bytes)

elasticsearch/bulk/metric.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package bulk
33
import (
44
"os"
55

6+
cdc "github.com/Trendyol/go-pq-cdc"
7+
"github.com/Trendyol/go-pq-cdc-elasticsearch/elasticsearch"
68
"github.com/prometheus/client_golang/prometheus"
79
)
810

@@ -12,16 +14,26 @@ type Metric interface {
1214
SetProcessLatency(latency int64)
1315
SetBulkRequestProcessLatency(latency int64)
1416
PrometheusCollectors() []prometheus.Collector
17+
incrementOp(opType elasticsearch.ActionType, index string)
1518
}
1619

20+
var hostname, _ = os.Hostname()
21+
1722
type metric struct {
23+
slotName string
24+
pqCDC cdc.Connector
25+
1826
processLatencyMs prometheus.Gauge
1927
bulkRequestProcessLatencyMs prometheus.Gauge
28+
29+
totalIndex map[string]prometheus.Counter
30+
totalDelete map[string]prometheus.Counter
2031
}
2132

22-
func NewMetric(slotName string) Metric {
23-
hostname, _ := os.Hostname()
33+
func NewMetric(pqCDC cdc.Connector, slotName string) Metric {
2434
return &metric{
35+
slotName: slotName,
36+
pqCDC: pqCDC,
2537
processLatencyMs: prometheus.NewGauge(prometheus.GaugeOpts{
2638
Namespace: namespace,
2739
Subsystem: "process_latency",
@@ -42,6 +54,8 @@ func NewMetric(slotName string) Metric {
4254
"slot_name": slotName,
4355
},
4456
}),
57+
totalIndex: make(map[string]prometheus.Counter),
58+
totalDelete: make(map[string]prometheus.Counter),
4559
}
4660
}
4761

@@ -59,3 +73,42 @@ func (m *metric) SetProcessLatency(latency int64) {
5973
func (m *metric) SetBulkRequestProcessLatency(latency int64) {
6074
m.bulkRequestProcessLatencyMs.Set(float64(latency))
6175
}
76+
77+
func (m *metric) incrementOp(opType elasticsearch.ActionType, index string) {
78+
switch opType {
79+
case elasticsearch.Index:
80+
if _, exists := m.totalIndex[index]; !exists {
81+
m.totalIndex[index] = prometheus.NewCounter(prometheus.CounterOpts{
82+
Namespace: namespace,
83+
Subsystem: "index",
84+
Name: "total",
85+
Help: "total number of index operation in elasticsearch",
86+
ConstLabels: prometheus.Labels{
87+
"slot_name": m.slotName,
88+
"index_name": index,
89+
"host": hostname,
90+
},
91+
})
92+
m.pqCDC.SetMetricCollectors(m.totalIndex[index])
93+
}
94+
95+
m.totalIndex[index].Add(1)
96+
case elasticsearch.Delete:
97+
if _, exists := m.totalDelete[index]; !exists {
98+
m.totalDelete[index] = prometheus.NewCounter(prometheus.CounterOpts{
99+
Namespace: namespace,
100+
Subsystem: "delete",
101+
Name: "total",
102+
Help: "total number of delete operation in elasticsearch",
103+
ConstLabels: prometheus.Labels{
104+
"slot_name": m.slotName,
105+
"index_name": index,
106+
"host": hostname,
107+
},
108+
})
109+
m.pqCDC.SetMetricCollectors(m.totalDelete[index])
110+
}
111+
112+
m.totalDelete[index].Add(1)
113+
}
114+
}

example/simple/main.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"log/slog"
8+
"os"
9+
"strconv"
10+
"time"
11+
712
cdc "github.com/Trendyol/go-pq-cdc-elasticsearch"
813
"github.com/Trendyol/go-pq-cdc-elasticsearch/config"
914
"github.com/Trendyol/go-pq-cdc-elasticsearch/elasticsearch"
1015
cdcconfig "github.com/Trendyol/go-pq-cdc/config"
1116
"github.com/Trendyol/go-pq-cdc/pq/publication"
1217
"github.com/Trendyol/go-pq-cdc/pq/slot"
13-
"log/slog"
14-
"os"
15-
"strconv"
16-
"time"
1718
)
1819

1920
/*
@@ -25,10 +26,21 @@ import (
2526
created_on timestamptz
2627
);
2728
29+
CREATE TABLE books (
30+
id serial PRIMARY KEY,
31+
name text NOT NULL,
32+
created_on timestamptz
33+
);
34+
2835
INSERT INTO users (name)
2936
SELECT
3037
'Oyleli' || i
3138
FROM generate_series(1, 100) AS i;
39+
40+
INSERT INTO books (name)
41+
SELECT
42+
'Oyleli' || i
43+
FROM generate_series(1, 100) AS i;
3244
*/
3345

3446
func main() {
@@ -50,10 +62,16 @@ func main() {
5062
publication.OperationTruncate,
5163
publication.OperationUpdate,
5264
},
53-
Tables: publication.Tables{publication.Table{
54-
Name: "users",
55-
ReplicaIdentity: publication.ReplicaIdentityFull,
56-
}},
65+
Tables: publication.Tables{
66+
publication.Table{
67+
Name: "users",
68+
ReplicaIdentity: publication.ReplicaIdentityFull,
69+
},
70+
publication.Table{
71+
Name: "books",
72+
ReplicaIdentity: publication.ReplicaIdentityFull,
73+
},
74+
},
5775
},
5876
Slot: slot.Config{
5977
CreateIfNotExists: true,
@@ -69,6 +87,7 @@ func main() {
6987
BatchTickerDuration: time.Millisecond * 100,
7088
TableIndexMapping: map[string]string{
7189
"public.users": "users",
90+
"public.books": "books",
7291
},
7392
TypeName: "_doc",
7493
URLs: []string{"http://127.0.0.1:9200"},

0 commit comments

Comments
 (0)