Skip to content

Commit fc9f42c

Browse files
authored
Refactor data stream collector (#983)
- Move metric DESC to vars to aid in unused linter checks - Use new Collector interface Signed-off-by: Joe Adams <github@joeadams.io>
1 parent 4ab0f07 commit fc9f42c

File tree

5 files changed

+75
-147
lines changed

5 files changed

+75
-147
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
## master / unreleased
22

3+
BREAKING CHANGES:
4+
5+
The flag `--es.data_stream` has been renamed to `--collector.data-stream`.
6+
7+
* [CHANGE] Rename --es.data_stream to --collector.data-stream #983
8+
39
## 1.9.0 / 2025-02-27
410

511
BREAKING CHANGES:

collector/data_stream.go

Lines changed: 67 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -14,141 +14,108 @@
1414
package collector
1515

1616
import (
17+
"context"
1718
"encoding/json"
1819
"fmt"
19-
"io"
2020
"log/slog"
2121
"net/http"
2222
"net/url"
23-
"path"
2423

2524
"github.com/prometheus/client_golang/prometheus"
2625
)
2726

28-
type dataStreamMetric struct {
29-
Type prometheus.ValueType
30-
Desc *prometheus.Desc
31-
Value func(dataStreamStats DataStreamStatsDataStream) float64
32-
Labels func(dataStreamStats DataStreamStatsDataStream) []string
33-
}
34-
3527
var (
36-
defaultDataStreamLabels = []string{"data_stream"}
37-
defaultDataStreamLabelValues = func(dataStreamStats DataStreamStatsDataStream) []string {
38-
return []string{dataStreamStats.DataStream}
39-
}
28+
dataStreamBackingIndicesTotal = prometheus.NewDesc(
29+
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
30+
"Number of backing indices",
31+
[]string{"data_stream"},
32+
nil,
33+
)
34+
dataStreamStoreSizeBytes = prometheus.NewDesc(
35+
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
36+
"Store size of data stream",
37+
[]string{"data_stream"},
38+
nil,
39+
)
4040
)
4141

42+
func init() {
43+
registerCollector("data-stream", defaultDisabled, NewDataStream)
44+
}
45+
4246
// DataStream Information Struct
4347
type DataStream struct {
4448
logger *slog.Logger
45-
client *http.Client
46-
url *url.URL
47-
48-
dataStreamMetrics []*dataStreamMetric
49+
hc *http.Client
50+
u *url.URL
4951
}
5052

5153
// NewDataStream defines DataStream Prometheus metrics
52-
func NewDataStream(logger *slog.Logger, client *http.Client, url *url.URL) *DataStream {
54+
func NewDataStream(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, error) {
5355
return &DataStream{
5456
logger: logger,
55-
client: client,
56-
url: url,
57-
58-
dataStreamMetrics: []*dataStreamMetric{
59-
{
60-
Type: prometheus.CounterValue,
61-
Desc: prometheus.NewDesc(
62-
prometheus.BuildFQName(namespace, "data_stream", "backing_indices_total"),
63-
"Number of backing indices",
64-
defaultDataStreamLabels, nil,
65-
),
66-
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
67-
return float64(dataStreamStats.BackingIndices)
68-
},
69-
Labels: defaultDataStreamLabelValues,
70-
},
71-
{
72-
Type: prometheus.CounterValue,
73-
Desc: prometheus.NewDesc(
74-
prometheus.BuildFQName(namespace, "data_stream", "store_size_bytes"),
75-
"Store size of data stream",
76-
defaultDataStreamLabels, nil,
77-
),
78-
Value: func(dataStreamStats DataStreamStatsDataStream) float64 {
79-
return float64(dataStreamStats.StoreSizeBytes)
80-
},
81-
Labels: defaultDataStreamLabelValues,
82-
},
83-
},
84-
}
57+
hc: hc,
58+
u: u,
59+
}, nil
8560
}
8661

87-
// Describe adds DataStream metrics descriptions
88-
func (ds *DataStream) Describe(ch chan<- *prometheus.Desc) {
89-
for _, metric := range ds.dataStreamMetrics {
90-
ch <- metric.Desc
91-
}
62+
// DataStreamStatsResponse is a representation of the Data Stream stats
63+
type DataStreamStatsResponse struct {
64+
Shards DataStreamStatsShards `json:"_shards"`
65+
DataStreamCount int64 `json:"data_stream_count"`
66+
BackingIndices int64 `json:"backing_indices"`
67+
TotalStoreSizeBytes int64 `json:"total_store_size_bytes"`
68+
DataStreamStats []DataStreamStatsDataStream `json:"data_streams"`
9269
}
9370

94-
func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse, error) {
95-
var dsr DataStreamStatsResponse
71+
// DataStreamStatsShards defines data stream stats shards information structure
72+
type DataStreamStatsShards struct {
73+
Total int64 `json:"total"`
74+
Successful int64 `json:"successful"`
75+
Failed int64 `json:"failed"`
76+
}
9677

97-
u := *ds.url
98-
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
99-
res, err := ds.client.Get(u.String())
100-
if err != nil {
101-
return dsr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
102-
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
103-
}
78+
// DataStreamStatsDataStream defines the structure of per data stream stats
79+
type DataStreamStatsDataStream struct {
80+
DataStream string `json:"data_stream"`
81+
BackingIndices int64 `json:"backing_indices"`
82+
StoreSizeBytes int64 `json:"store_size_bytes"`
83+
MaximumTimestamp int64 `json:"maximum_timestamp"`
84+
}
10485

105-
defer func() {
106-
err = res.Body.Close()
107-
if err != nil {
108-
ds.logger.Warn(
109-
"failed to close http.Client",
110-
"err", err,
111-
)
112-
}
113-
}()
114-
115-
if res.StatusCode != http.StatusOK {
116-
return dsr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
117-
}
86+
func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
87+
var dsr DataStreamStatsResponse
88+
89+
u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"})
11890

119-
bts, err := io.ReadAll(res.Body)
91+
resp, err := getURL(ctx, ds.hc, ds.logger, u.String())
12092
if err != nil {
121-
return dsr, err
93+
return err
12294
}
12395

124-
if err := json.Unmarshal(bts, &dsr); err != nil {
125-
return dsr, err
96+
if err := json.Unmarshal(resp, &dsr); err != nil {
97+
return err
12698
}
12799

128-
return dsr, nil
129-
}
100+
for _, dataStream := range dsr.DataStreamStats {
101+
fmt.Printf("Metric: %+v", dataStream)
130102

131-
// Collect gets DataStream metric values
132-
func (ds *DataStream) Collect(ch chan<- prometheus.Metric) {
103+
ch <- prometheus.MustNewConstMetric(
104+
dataStreamBackingIndicesTotal,
105+
prometheus.CounterValue,
106+
float64(dataStream.BackingIndices),
107+
dataStream.DataStream,
108+
)
133109

134-
dataStreamStatsResp, err := ds.fetchAndDecodeDataStreamStats()
135-
if err != nil {
136-
ds.logger.Warn(
137-
"failed to fetch and decode data stream stats",
138-
"err", err,
110+
ch <- prometheus.MustNewConstMetric(
111+
dataStreamStoreSizeBytes,
112+
prometheus.CounterValue,
113+
float64(dataStream.StoreSizeBytes),
114+
dataStream.DataStream,
139115
)
140-
return
141-
}
142116

143-
for _, metric := range ds.dataStreamMetrics {
144-
for _, dataStream := range dataStreamStatsResp.DataStreamStats {
145-
fmt.Printf("Metric: %+v", dataStream)
146-
ch <- prometheus.MustNewConstMetric(
147-
metric.Desc,
148-
metric.Type,
149-
metric.Value(dataStream),
150-
metric.Labels(dataStream)...,
151-
)
152-
}
153117
}
118+
119+
return nil
120+
154121
}

collector/data_stream_response.go

Lines changed: 0 additions & 38 deletions
This file was deleted.

collector/data_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ func TestDataStream(t *testing.T) {
6565
t.Fatal(err)
6666
}
6767

68-
c := NewDataStream(promslog.NewNopLogger(), http.DefaultClient, u)
68+
c, err := NewDataStream(promslog.NewNopLogger(), u, http.DefaultClient)
6969
if err != nil {
7070
t.Fatal(err)
7171
}
7272

73-
if err := testutil.CollectAndCompare(c, strings.NewReader(tt.want)); err != nil {
73+
if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil {
7474
t.Fatal(err)
7575
}
7676
})

main.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,6 @@ func main() {
8787
esExportShards = kingpin.Flag("es.shards",
8888
"Export stats for shards in the cluster (implies --es.indices).").
8989
Default("false").Bool()
90-
esExportDataStream = kingpin.Flag("es.data_stream",
91-
"Export stats for Data Streams.").
92-
Default("false").Bool()
9390
esClusterInfoInterval = kingpin.Flag("es.clusterinfo.interval",
9491
"Cluster info update interval for the cluster label").
9592
Default("5m").Duration()
@@ -217,10 +214,6 @@ func main() {
217214
}
218215
}
219216

220-
if *esExportDataStream {
221-
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
222-
}
223-
224217
if *esExportIndicesSettings {
225218
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
226219
}

0 commit comments

Comments
 (0)