Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions collector/data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type DataStream struct {
client *http.Client
url *url.URL

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter

dataStreamMetrics []*dataStreamMetric
}

Expand All @@ -59,18 +56,6 @@ func NewDataStream(logger log.Logger, client *http.Client, url *url.URL) *DataSt
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "up"),
Help: "Was the last scrape of the ElasticSearch Data Stream stats endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "total_scrapes"),
Help: "Current total ElasticSearch Data STream scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "data_stream_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
dataStreamMetrics: []*dataStreamMetric{
{
Type: prometheus.CounterValue,
Expand Down Expand Up @@ -105,10 +90,6 @@ func (ds *DataStream) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range ds.dataStreamMetrics {
ch <- metric.Desc
}

ch <- ds.up.Desc()
ch <- ds.totalScrapes.Desc()
ch <- ds.jsonParseFailures.Desc()
}

func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse, error) {
Expand Down Expand Up @@ -138,12 +119,10 @@ func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse,

bts, err := io.ReadAll(res.Body)
if err != nil {
ds.jsonParseFailures.Inc()
return dsr, err
}

if err := json.Unmarshal(bts, &dsr); err != nil {
ds.jsonParseFailures.Inc()
return dsr, err
}

Expand All @@ -152,25 +131,16 @@ func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse,

// Collect gets DataStream metric values
func (ds *DataStream) Collect(ch chan<- prometheus.Metric) {
ds.totalScrapes.Inc()
defer func() {
ch <- ds.up
ch <- ds.totalScrapes
ch <- ds.jsonParseFailures
}()

dataStreamStatsResp, err := ds.fetchAndDecodeDataStreamStats()
if err != nil {
ds.up.Set(0)
level.Warn(ds.logger).Log(
"msg", "failed to fetch and decode data stream stats",
"err", err,
)
return
}

ds.up.Set(1)

for _, metric := range ds.dataStreamMetrics {
for _, dataStream := range dataStreamStatsResp.DataStreamStats {
fmt.Printf("Metric: %+v", dataStream)
Expand Down
79 changes: 50 additions & 29 deletions collector/data_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,65 @@
package collector

import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus/testutil"
)

func TestDataStream(t *testing.T) {
tcs := map[string]string{
"7.15.0": `{"_shards":{"total":30,"successful":30,"failed":0},"data_stream_count":2,"backing_indices":7,"total_store_size_bytes":1103028116,"data_streams":[{"data_stream":"foo","backing_indices":5,"store_size_bytes":429205396,"maximum_timestamp":1656079894000},{"data_stream":"bar","backing_indices":2,"store_size_bytes":673822720,"maximum_timestamp":1656028796000}]}`,
}
for ver, out := range tcs {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, out)
}))
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
s := NewDataStream(log.NewNopLogger(), http.DefaultClient, u)
stats, err := s.fetchAndDecodeDataStreamStats()
if err != nil {
t.Fatalf("Failed to fetch or decode data stream stats: %s", err)
}
t.Logf("[%s] Data Stream Response: %+v", ver, stats)
dataStreamStats := stats.DataStreamStats[0]

if dataStreamStats.BackingIndices != 5 {
t.Errorf("Bad number of backing indices")
}

if dataStreamStats.StoreSizeBytes != 429205396 {
t.Errorf("Bad store size bytes valuee")
}

tests := []struct {
name string
file string
want string
}{
{
name: "7.15.0",
file: "../fixtures/datastream/7.15.0.json",
want: `# HELP elasticsearch_data_stream_backing_indices_total Number of backing indices
# TYPE elasticsearch_data_stream_backing_indices_total counter
elasticsearch_data_stream_backing_indices_total{data_stream="bar"} 2
elasticsearch_data_stream_backing_indices_total{data_stream="foo"} 5
# HELP elasticsearch_data_stream_store_size_bytes Store size of data stream
# TYPE elasticsearch_data_stream_store_size_bytes counter
elasticsearch_data_stream_store_size_bytes{data_stream="bar"} 6.7382272e+08
elasticsearch_data_stream_store_size_bytes{data_stream="foo"} 4.29205396e+08
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f, err := os.Open(tt.file)
if err != nil {
t.Fatal(err)
}
defer f.Close()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(w, f)
}))
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatal(err)
}

c := NewDataStream(log.NewNopLogger(), http.DefaultClient, u)
if err != nil {
t.Fatal(err)
}

if err := testutil.CollectAndCompare(c, strings.NewReader(tt.want)); err != nil {
t.Fatal(err)
}
})
}
}
24 changes: 24 additions & 0 deletions fixtures/datastream/7.15.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"_shards": {
"total": 30,
"successful": 30,
"failed": 0
},
"data_stream_count": 2,
"backing_indices": 7,
"total_store_size_bytes": 1103028116,
"data_streams": [
{
"data_stream": "foo",
"backing_indices": 5,
"store_size_bytes": 429205396,
"maximum_timestamp": 1656079894000
},
{
"data_stream": "bar",
"backing_indices": 2,
"store_size_bytes": 673822720,
"maximum_timestamp": 1656028796000
}
]
}