diff --git a/common/ecsmetrics/metric.go b/common/ecsmetrics/metric.go deleted file mode 100644 index a35af762..00000000 --- a/common/ecsmetrics/metric.go +++ /dev/null @@ -1,30 +0,0 @@ -package ecsmetrics - -import ( - "time" - - "github.com/AliceO2Group/Control/common/monitoring" -) - -func NewMetric(name string) monitoring.Metric { - timestamp := time.Now() - metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()} - metric.AddTag("subsystem", "ECS") - return metric -} - -// Timer* functions are meant to be used with defer statement to measure runtime of given function: -// defer TimerNS(&metric)() -func TimerMS(metric *monitoring.Metric) func() { - start := time.Now() - return func() { - metric.AddValue("execution_time_ms", time.Since(start).Milliseconds()) - } -} - -func TimerNS(metric *monitoring.Metric) func() { - start := time.Now() - return func() { - metric.AddValue("execution_time_ns", time.Since(start).Nanoseconds()) - } -} diff --git a/common/ecsmetrics/metrics_test.go b/common/ecsmetrics/metrics_test.go deleted file mode 100644 index 4c46569c..00000000 --- a/common/ecsmetrics/metrics_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package ecsmetrics - -import ( - "fmt" - "testing" - "time" - - "github.com/AliceO2Group/Control/common/monitoring" -) - -func measureFunc(metric *monitoring.Metric) { - defer TimerMS(metric)() - defer TimerNS(metric)() - time.Sleep(100 * time.Millisecond) -} - -func TestSimpleStartStop(t *testing.T) { - metric := NewMetric("test") - measureFunc(&metric) - fmt.Println(metric.Values["execution_time_ms"]) - fmt.Println(metric.Values["execution_time_ns"]) - if metric.Values["execution_time_ms"].(int64) < 100 { - t.Error("wrong milliseconds") - } - if metric.Values["execution_time_ns"].(int64) < 100000000 { - t.Error("wrong nanoseconds") - } -} diff --git a/common/event/writer.go b/common/event/writer.go index fb3fc516..64f9a98f 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -30,7 +30,6 @@ import ( "sync" "time" - "github.com/AliceO2Group/Control/common/ecsmetrics" "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger" "github.com/AliceO2Group/Control/common/logger/infologger" @@ -83,7 +82,7 @@ type KafkaWriter struct { } func (w *KafkaWriter) newMetric(name string) monitoring.Metric { - metric := ecsmetrics.NewMetric(name) + metric := monitoring.NewMetric(name) metric.AddTag("topic", w.Topic) return metric } @@ -103,9 +102,9 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { } writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) { - defer ecsmetrics.TimerNS(metric)() + defer monitoring.TimerNS(metric)() if err := writer.WriteMessages(context.Background(), messages...); err != nil { - metric.AddValue("messages_failed", len(messages)) + metric.SetFieldUInt64("messages_failed", uint64(len(messages))) log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err) } } @@ -145,12 +144,12 @@ func (w *KafkaWriter) writingLoop() { } metric := w.newMetric(KAFKAWRITER) - metric.AddValue("messages_sent", len(messagesToSend)) - metric.AddValue("messages_failed", 0) + metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend))) + metric.SetFieldUInt64("messages_failed", 0) w.writeFunction(messagesToSend, &metric) - monitoring.Send(metric) + monitoring.Send(&metric) } } } @@ -243,7 +242,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time metric := w.newMetric(KAFKAPREPARE) func() { - defer ecsmetrics.TimerNS(&metric)() + defer monitoring.TimerNS(&metric)() wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp) if err != nil { log.WithField("event", e). @@ -262,5 +261,5 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time w.toBatchMessagesChan <- message }() - monitoring.Send(metric) + monitoring.Send(&metric) } diff --git a/common/ecsmetrics/metrics.go b/common/golangmetrics/metrics.go similarity index 59% rename from common/ecsmetrics/metrics.go rename to common/golangmetrics/metrics.go index 066fdf0d..f1bcfabe 100644 --- a/common/ecsmetrics/metrics.go +++ b/common/golangmetrics/metrics.go @@ -1,4 +1,28 @@ -package ecsmetrics +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package golangmetrics import ( internalmetrics "runtime/metrics" @@ -33,14 +57,14 @@ func gather() monitoring.Metric { internalmetrics.Read(samples) - metric := NewMetric("golangruntimemetrics") + metric := monitoring.NewMetric("golangruntimemetrics") for _, sample := range samples { switch sample.Value.Kind() { case internalmetrics.KindUint64: - metric.AddValue(sample.Name, sample.Value.Uint64()) + metric.SetFieldUInt64(sample.Name, sample.Value.Uint64()) case internalmetrics.KindFloat64: - metric.AddValue(sample.Name, sample.Value.Float64()) + metric.SetFieldFloat64(sample.Name, sample.Value.Float64()) case internalmetrics.KindFloat64Histogram: log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name) continue @@ -52,7 +76,7 @@ func gather() monitoring.Metric { return metric } -func StartGolangMetrics(period time.Duration) { +func Start(period time.Duration) { log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting") go func() { log.Debug("Starting golang metrics goroutine") @@ -64,14 +88,15 @@ func StartGolangMetrics(period time.Duration) { return default: log.Debug("sending golang metrics") - monitoring.Send(gather()) + metric := gather() + monitoring.Send(&metric) time.Sleep(period) } } }() } -func StopGolangMetrics() { +func Stop() { endRequestChannel <- struct{}{} <-endRequestChannel } diff --git a/common/monitoring/common.go b/common/monitoring/common.go new file mode 100644 index 00000000..9f21f856 --- /dev/null +++ b/common/monitoring/common.go @@ -0,0 +1,50 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package monitoring + +import ( + "hash/maphash" + "time" +) + +type key struct { + nameTagsHash uint64 + timestamp time.Time +} + +func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) { + hash.WriteString(metric.name) + + for _, tag := range metric.tags { + hash.WriteString(tag.name) + hash.WriteString(tag.value) + } +} + +func hashValueAndReset(hash *maphash.Hash) uint64 { + hashValue := hash.Sum64() + hash.Reset() + return hashValue +} diff --git a/common/monitoring/metric.go b/common/monitoring/metric.go index 89173323..0bb26fa3 100644 --- a/common/monitoring/metric.go +++ b/common/monitoring/metric.go @@ -1,27 +1,131 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + package monitoring +import ( + "fmt" + "io" + "time" + + lp "github.com/influxdata/line-protocol/v2/lineprotocol" +) + type ( - TagsType map[string]any - ValuesType map[string]any + Tag struct { + name string + value string + } + + TagsType []Tag + FieldsType map[string]any ) type Metric struct { - Name string `json:"name"` - Values ValuesType `json:"values"` - Tags TagsType `json:"tags,omitempty"` - Timestamp int64 `json:"timestamp"` + name string + fields FieldsType + tags TagsType + timestamp time.Time } -func (metric *Metric) AddTag(tagName string, value any) { - if metric.Tags == nil { - metric.Tags = make(TagsType) +// Return empty metric, it is used right now mostly in string +// to report metrics correctly to influxdb use NewECSMetric +func NewDefaultMetric(name string, timestamp time.Time) Metric { + return Metric{ + name: name, timestamp: timestamp, } - metric.Tags[tagName] = value } -func (metric *Metric) AddValue(valueName string, value any) { - if metric.Values == nil { - metric.Values = make(ValuesType) +// creates empty metric with tag subsystem=ECS, which +// is used by Telegraf to send metrics from ECS to correct +// bucket +func NewMetric(name string) Metric { + metric := NewDefaultMetric(name, time.Now()) + metric.AddTag("subsystem", "ECS") + return metric +} + +func (metric *Metric) AddTag(tagName string, value string) { + metric.tags = append(metric.tags, Tag{name: tagName, value: value}) +} + +func (metric *Metric) setField(fieldName string, field any) { + if metric.fields == nil { + metric.fields = make(FieldsType) } - metric.Values[valueName] = value + metric.fields[fieldName] = field +} + +func (metric *Metric) SetFieldInt64(fieldName string, field int64) { + metric.setField(fieldName, field) +} + +func (metric *Metric) SetFieldUInt64(fieldName string, field uint64) { + metric.setField(fieldName, field) +} + +func (metric *Metric) SetFieldFloat64(fieldName string, field float64) { + metric.setField(fieldName, field) +} + +func (metric *Metric) MergeFields(other *Metric) { + for fieldName, field := range other.fields { + if storedField, ok := metric.fields[fieldName]; ok { + switch v := field.(type) { + case int64: + metric.fields[fieldName] = v + storedField.(int64) + case uint64: + metric.fields[fieldName] = v + storedField.(uint64) + case float64: + metric.fields[fieldName] = v + storedField.(float64) + } + } else { + metric.fields[fieldName] = field + } + } +} + +func Format(writer io.Writer, metrics []Metric) error { + var enc lp.Encoder + + for _, metric := range metrics { + enc.StartLine(metric.name) + for _, tag := range metric.tags { + enc.AddTag(tag.name, tag.value) + } + + for fieldName, field := range metric.fields { + // we cannot panic as we provide accessors only for allowed type with AddField* + enc.AddField(fieldName, lp.MustNewValue(field)) + } + enc.EndLine(metric.timestamp) + } + + if err := enc.Err(); err != nil { + return err + } + + _, err := fmt.Fprintf(writer, "%s", enc.Bytes()) + return err } diff --git a/common/monitoring/metricsaggregate.go b/common/monitoring/metricsaggregate.go new file mode 100644 index 00000000..7292db76 --- /dev/null +++ b/common/monitoring/metricsaggregate.go @@ -0,0 +1,69 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package monitoring + +import ( + "hash/maphash" + "time" +) + +type bucketsType map[key]*Metric + +type MetricsAggregate struct { + hash maphash.Hash + metricsBuckets bucketsType +} + +func NewMetricsAggregate() *MetricsAggregate { + metrics := &MetricsAggregate{} + metrics.metricsBuckets = make(bucketsType) + return metrics +} + +func (this *MetricsAggregate) AddMetric(metric *Metric) { + metricNameTagsToHash(&this.hash, metric) + hashKey := hashValueAndReset(&this.hash) + + k := key{nameTagsHash: hashKey, timestamp: time.Unix(metric.timestamp.Unix(), 0)} + if storedMetric, ok := this.metricsBuckets[k]; ok { + storedMetric.MergeFields(metric) + } else { + this.metricsBuckets[k] = metric + } +} + +func (this *MetricsAggregate) Clear() { + this.hash.Reset() + clear(this.metricsBuckets) +} + +func (this *MetricsAggregate) GetMetrics() []Metric { + var result []Metric + for key, metric := range this.metricsBuckets { + metric.timestamp = key.timestamp + result = append(result, *metric) + } + return result +} diff --git a/common/monitoring/metricsreservoirsampling.go b/common/monitoring/metricsreservoirsampling.go new file mode 100644 index 00000000..1e5cbe4d --- /dev/null +++ b/common/monitoring/metricsreservoirsampling.go @@ -0,0 +1,107 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package monitoring + +import ( + "hash/maphash" + "time" +) + +type bucketsReservoirSampleType map[key]*metricReservoirSample + +var reservoirSize uint64 = 1000 + +type metricReservoirSample struct { + metric Metric + reservoir reservoirSampling +} + +type MetricsReservoirSampling struct { + hash maphash.Hash + metricsBuckets bucketsReservoirSampleType +} + +func NewMetricsReservoirSampling() *MetricsReservoirSampling { + metrics := &MetricsReservoirSampling{} + metrics.metricsBuckets = make(bucketsReservoirSampleType) + metrics.hash.SetSeed(maphash.MakeSeed()) + return metrics +} + +func metricFieldToFloat64(field any) float64 { + var asserted float64 + switch v := field.(type) { + case int64: + asserted = float64(v) + case uint64: + asserted = float64(v) + case float64: + asserted = v + } + return asserted +} + +func (this *MetricsReservoirSampling) AddMetric(metric *Metric) { + for fieldName, field := range metric.fields { + metricNameTagsToHash(&this.hash, metric) + this.hash.WriteString(fieldName) + k := key{nameTagsHash: hashValueAndReset(&this.hash), timestamp: time.Unix(metric.timestamp.Unix(), 0)} + if storedMetric, ok := this.metricsBuckets[k]; !ok { + newReservoir := newReservoirSampling(fieldName, reservoirSize) + newReservoir.AddPoint(metricFieldToFloat64(field)) + this.metricsBuckets[k] = &metricReservoirSample{metric: *metric, reservoir: newReservoir} + } else { + storedMetric.reservoir.AddPoint(metricFieldToFloat64(field)) + } + } +} + +func (this *MetricsReservoirSampling) Clear() { + this.hash.Reset() + clear(this.metricsBuckets) +} + +func (this *MetricsReservoirSampling) GetMetrics() []Metric { + var result []Metric + for key, reservoirMetric := range this.metricsBuckets { + m := Metric{name: reservoirMetric.metric.name, tags: reservoirMetric.metric.tags, timestamp: key.timestamp} + + mean, median, min, p10, p30, p70, p90, max, count, poolSize := reservoirMetric.reservoir.GetStats() + + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_mean", mean) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_median", median) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_min", min) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p10", p10) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p30", p30) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p70", p70) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p90", p90) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_max", max) + m.SetFieldUInt64(reservoirMetric.reservoir.name+"_count", count) + m.SetFieldUInt64(reservoirMetric.reservoir.name+"_poolsize", poolSize) + + result = append(result, m) + } + return result +} diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index e3ec257f..54b48b1c 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -1,20 +1,46 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + package monitoring import ( "context" - "encoding/json" "fmt" "net/http" "time" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/sirupsen/logrus" ) var ( - server *http.Server - metricsLimit int = 1000000 - metrics []Metric + // scraping endpoint implementation + server *http.Server + // objects to store incoming metrics + metricsInternal *MetricsAggregate + metricsHistogramInternal *MetricsReservoirSampling // channel that is used to request end of metrics server, it sends notification when server ended. // It needs to be read!!! endChannel chan struct{} @@ -22,23 +48,28 @@ var ( // channel used to send metrics into the event loop metricsChannel chan Metric + // channel used to send metrics meant to be proceesed as histogram into the event loop + metricsHistosChannel chan Metric + // channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest metricsRequestedChannel chan struct{} // channel used to send metrics to be reported by http request from event loop metricsExportedToRequest chan []Metric - log = logger.New(logrus.StandardLogger(), "metrics") + log = logger.New(logrus.StandardLogger(), "metrics").WithField("level", infologger.IL_Devel) ) -func initChannels(messageBufferSize int) { +func initChannels() { endChannel = make(chan struct{}) metricsRequestedChannel = make(chan struct{}) // 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if // multiple goroutines want to send metrics without blocking each other - metricsChannel = make(chan Metric, 10000) + metricsChannel = make(chan Metric, 100000) + metricsHistosChannel = make(chan Metric, 100000) metricsExportedToRequest = make(chan []Metric) - metricsLimit = messageBufferSize + metricsInternal = NewMetricsAggregate() + metricsHistogramInternal = NewMetricsReservoirSampling() } func closeChannels() { @@ -49,24 +80,29 @@ func closeChannels() { } // this eventLoop is the main part that processes all metrics send to the package -// 3 events can happen: +// 4 events can happen: // 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice -// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing +// 2. metricsHistosChannel receives message from Send() method. We just add the new metric to metrics slice +// 3. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing // metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice -// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller +// 4. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller // that eventLoop stopped func eventLoop() { for { select { case <-metricsRequestedChannel: - shallowCopyMetrics := metrics - metrics = make([]Metric, 0) - metricsExportedToRequest <- shallowCopyMetrics + aggregatedMetrics := metricsInternal.GetMetrics() + aggregatedMetrics = append(aggregatedMetrics, metricsHistogramInternal.GetMetrics()...) + metricsInternal.Clear() + metricsHistogramInternal.Clear() + + metricsExportedToRequest <- aggregatedMetrics case metric := <-metricsChannel: - if len(metrics) < metricsLimit { - metrics = append(metrics, metric) - } + metricsInternal.AddMetric(&metric) + + case metric := <-metricsHistosChannel: + metricsHistogramInternal.AddMetric(&metric) case <-endChannel: defer func() { @@ -78,18 +114,24 @@ func eventLoop() { } func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Type", "text/plain") metricsRequestedChannel <- struct{}{} metricsToConvert := <-metricsExportedToRequest if metricsToConvert == nil { metricsToConvert = make([]Metric, 0) } - json.NewEncoder(w).Encode(metricsToConvert) + Format(w, metricsToConvert) +} + +func Send(metric *Metric) { + if IsRunning() { + metricsChannel <- *metric + } } -func Send(metric Metric) { +func SendHistogrammable(metric *Metric) { if IsRunning() { - metricsChannel <- metric + metricsHistosChannel <- *metric } } @@ -104,15 +146,14 @@ func handleFunc(endpointName string) { // \param port port where the scraping endpoint will be created // \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics" -// \param messageBufferSize size of buffer for messages where messages are kept between scraping request. // // If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged. -func Run(port uint16, endpointName string, messageBufferSize int) error { +func Run(port uint16, endpointName string) error { if IsRunning() { return nil } - initChannels(messageBufferSize) + initChannels() go eventLoop() @@ -133,7 +174,6 @@ func Stop() { endChannel <- struct{}{} <-endChannel server = nil - metrics = nil } func IsRunning() bool { diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go index 7b707c3e..ef43cbf9 100644 --- a/common/monitoring/monitoring_test.go +++ b/common/monitoring/monitoring_test.go @@ -1,9 +1,39 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + package monitoring import ( - "encoding/json" + "bytes" "fmt" + "io" + "math" + "math/rand" "net/http" + "os" + "runtime/pprof" + "strings" "testing" "time" ) @@ -26,10 +56,10 @@ func isRunningWithTimeout(t *testing.T, timeout time.Duration) { // block until either length of metrics is the same as \requiredMessages or timeout is triggered func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages int) { timeoutChan := time.After(timeout) - for len(metrics) != requiredMessages { + for len(metricsInternal.GetMetrics()) != requiredMessages { select { case <-timeoutChan: - t.Errorf("Timeout %v triggered when waiting for %v messages, got %v", timeout, requiredMessages, len(metrics)) + t.Errorf("Timeout %v triggered when waiting for %v messages, got %v", timeout, requiredMessages, len(metricsInternal.GetMetrics())) return default: @@ -39,13 +69,13 @@ func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages in } func TestSimpleStartStop(t *testing.T) { - go Run(1234, "/random", 100) + go Run(1234, "/random") isRunningWithTimeout(t, time.Second) Stop() } func TestStartMultipleStop(t *testing.T) { - go Run(1234, "/random", 100) + go Run(1234, "/random") isRunningWithTimeout(t, time.Second) Stop() Stop() @@ -56,7 +86,7 @@ func cleaningUpAfterTest() { } func initTest() { - go Run(12345, "notimportant", 100) + go Run(12345, "notimportant") } // decorator function that properly inits and cleans after higher level test of Monitoring package @@ -69,19 +99,21 @@ func testFunction(t *testing.T, testToRun func(*testing.T)) { func TestSendingSingleMetric(t *testing.T) { testFunction(t, func(t *testing.T) { - metric := Metric{Name: "test"} + metric := &Metric{name: "test"} Send(metric) hasNumberOfMetrics(t, time.Second, 1) - if metrics[0].Name != "test" { - t.Errorf("Got wrong name %s in stored metric", metrics[0].Name) + aggregatedMetrics := metricsInternal.GetMetrics() + + if aggregatedMetrics[0].name != "test" { + t.Errorf("Got wrong name %s in stored metric", aggregatedMetrics[0].name) } }) } func TestExportingMetrics(t *testing.T) { testFunction(t, func(t *testing.T) { - metric := Metric{Name: "test"} + metric := &Metric{name: "test"} Send(metric) hasNumberOfMetrics(t, time.Second, 1) @@ -92,52 +124,36 @@ func TestExportingMetrics(t *testing.T) { t.Errorf("Got wrong amount of metrics %d, expected 1", len(metricsToExport)) } - if metricsToExport[0].Name != "test" { - t.Errorf("Got wrong name of metric %s, expected test", metricsToExport[0].Name) - } - }) -} - -func TestBufferLimit(t *testing.T) { - testFunction(t, func(t *testing.T) { - metricsLimit = 1 - metric := Metric{Name: "test"} - metric.Timestamp = 10 - metric.AddTag("tag1", 42) - metric.AddValue("value1", 11) - - Send(metric) - hasNumberOfMetrics(t, time.Second, 1) - - Send(metric) - time.Sleep(100 * time.Millisecond) - - if len(metrics) != 1 { - t.Errorf("Metrics length is %d, but should be 1 after sending second metric", len(metrics)) + if metricsToExport[0].name != "test" { + t.Errorf("Got wrong name of metric %s, expected test", metricsToExport[0].name) } }) } func TestHttpRun(t *testing.T) { - go Run(9876, "/metrics", 10) + go Run(9876, "/metrics") defer Stop() isRunningWithTimeout(t, time.Second) - metric := Metric{Name: "test"} - metric.Timestamp = 10 - metric.AddTag("tag1", 42) - metric.AddValue("value1", 11) - Send(metric) + metric := Metric{name: "test"} + metric.timestamp = time.Unix(10, 0) + metric.AddTag("tag1", "42") + metric.SetFieldInt64("value1", 11) + Send(&metric) response, err := http.Get("http://localhost:9876/metrics") if err != nil { t.Fatalf("Failed to GET metrics at port 9876: %v", err) } - decoder := json.NewDecoder(response.Body) - var receivedMetrics []Metric - if err = decoder.Decode(&receivedMetrics); err != nil { - t.Fatalf("Failed to decoded Metric: %v", err) + message, err := io.ReadAll(response.Body) + if err != nil { + t.Errorf("Failed to read response Body: %v", err) + } + + receivedMetrics, err := parseMultipleLineProtocol(string(message)) + if err != nil { + t.Errorf("Failed to parse message: %v", string(message)) } receivedMetric := receivedMetrics[0] @@ -146,7 +162,7 @@ func TestHttpRun(t *testing.T) { t.Errorf("Got wrong name of metric %s, expected test", receivedMetric.Name) } - if receivedMetric.Timestamp != 10 { + if receivedMetric.Timestamp != time.Unix(10, 0).UnixNano() { t.Errorf("Got wrong timestamp of metric %d, expected 10", receivedMetric.Timestamp) } @@ -154,59 +170,441 @@ func TestHttpRun(t *testing.T) { t.Errorf("Got wrong number of tags %d, expected 1", len(receivedMetric.Tags)) } - if receivedMetric.Tags["tag1"].(float64) != 42 { - t.Error("Failed to retreive tags: tag1 with value 42") + if receivedMetric.Tags["tag1"] != "42" { + t.Errorf("Failed to retreive tags: tag1 with value 42, %+v", receivedMetric.Tags) } - if len(receivedMetric.Values) != 1 { - t.Errorf("Got wrong number of values %d, expected 1", len(receivedMetric.Values)) + if len(receivedMetric.Fields) != 1 { + t.Errorf("Got wrong number of values %d, expected 1", len(receivedMetric.Fields)) } - if receivedMetric.Values["value1"].(float64) != 11 { - t.Error("Failed to retreive tags: value1 with value 11") + if receivedMetric.Fields["value1"] != "11i" { + t.Errorf("Failed to retreive tags: value1 with value 11: %+v", receivedMetric.Fields) } } -// This benchmark cannot be run for too long as it will fill whole RAM even with -// results: -// goos: linux -// goarch: amd64 -// pkg: github.com/AliceO2Group/Control/common/monitoring -// cpu: 11th Gen Intel(R) Core(TM) i9-11900H @ 2.50GHz -// BenchmarkSendingMetrics-16 -// -// 123365481 192.6 ns/op -// PASS -// ok github.com/AliceO2Group/Control/common/monitoring 44.686s -func BenchmarkSendingMetrics(b *testing.B) { - Run(12345, "/metrics", 100) - - // this goroutine keeps clearing results so RAM does not exhausted - go func() { - for { - select { - case <-endChannel: - endChannel <- struct{}{} - break - default: - if len(metrics) >= 10000000 { - metricsRequestedChannel <- struct{}{} - <-metricsExportedToRequest +func parseMultipleLineProtocol(input string) ([]struct { + Name string + Tags map[string]string + Fields map[string]string + Timestamp int64 +}, error, +) { + lines := strings.Split(strings.TrimSpace(input), "\n") + parsed := []struct { + Name string + Tags map[string]string + Fields map[string]string + Timestamp int64 + }{} + + for _, line := range lines { + parts := strings.SplitN(line, " ", 3) + if len(parts) < 3 { + return nil, fmt.Errorf("invalid line protocol format: %s", line) + } + + // Parse measurement and tags + nameTags := strings.Split(parts[0], ",") + name := nameTags[0] + tags := map[string]string{} + for _, tag := range nameTags[1:] { + kv := strings.SplitN(tag, "=", 2) + if len(kv) == 2 { + tags[kv[0]] = kv[1] + } + } + + // Parse fields (leave as raw string for comparison) + fields := map[string]string{} + for _, field := range strings.Split(parts[1], ",") { + kv := strings.SplitN(field, "=", 2) + if len(kv) == 2 { + fields[kv[0]] = kv[1] + } + } + + // Parse timestamp + var ts int64 + _, err := fmt.Sscanf(parts[2], "%d", &ts) + if err != nil { + return nil, fmt.Errorf("invalid timestamp in line: %s", line) + } + + parsed = append(parsed, struct { + Name string + Tags map[string]string + Fields map[string]string + Timestamp int64 + }{ + Name: name, + Tags: tags, + Fields: fields, + Timestamp: ts, + }) + } + + return parsed, nil +} + +func MapsEqual[K comparable, V comparable](a, b map[K]V) bool { + if len(a) != len(b) { + return false + } + for key, valueA := range a { + if valueB, ok := b[key]; !ok || valueA != valueB { + return false + } + } + return true +} + +func TestMetricsFormat(t *testing.T) { + metrics := []Metric{} + metric1 := Metric{name: "test"} + metric1.timestamp = time.Unix(10, 0) + metric1.AddTag("tag1", "42") + metric1.SetFieldInt64("int64", 1) + metric1.SetFieldUInt64("uint64", 2) + metric1.SetFieldFloat64("float64", 3) + + metrics = append(metrics, metric1) + + metric2 := Metric{name: "test1"} + metric2.timestamp = time.Unix(10, 0) + metric2.AddTag("tag1", "43") + metric2.SetFieldInt64("int64", 2) + metric2.SetFieldUInt64("uint64", 3) + metric2.SetFieldFloat64("float64", 4) + + metrics = append(metrics, metric2) + + buf := bytes.Buffer{} + + Format(&buf, metrics) + + metricsParsed, err := parseMultipleLineProtocol(buf.String()) + if err != nil { + t.Error(err) + } + + for idx, metricParsed := range metricsParsed { + metricOrig := metrics[idx] + + if metricParsed.Name != metricOrig.name { + t.Errorf("failed to compare %v and %v ", metricOrig, metricParsed) + } + // if !MapsEqual(metricParsed.Tags, metricOrig.Tags) { + // t.Errorf("failed to compare %v and %v ", metricOrig.Tags, metricParsed.Tags) + // } + // if !MapsEqual(metricParsed.Fields, metricOrig.Values) { + // t.Errorf("failed to compare %v and %v ", metricOrig.Tags, metricParsed.Tags) + // } + if metricParsed.Timestamp != metricOrig.timestamp.UnixNano() { + t.Errorf("failed to compare %v and %v ", metricOrig.timestamp.UnixNano(), metricParsed.Timestamp) + } + } +} + +// generate unix timestamp with given seconds and random amount of nsecs +func generateTimestamp(seconds int64, r *rand.Rand) time.Time { + return time.Unix(seconds, r.Int63n(999999999)) +} + +func TestMetricsObject(t *testing.T) { + r := rand.New(rand.NewSource(42)) + tags1 := TagsType{Tag{"tag1", "1"}, Tag{"tag2", "2"}} + metrics := NewMetricsAggregate() + for i := 0; i != 1000; i++ { + metrics.AddMetric(&Metric{name: "test", tags: tags1, fields: FieldsType{"val1": int64(1)}, timestamp: generateTimestamp(10, r)}) + metrics.AddMetric(&Metric{name: "test", tags: tags1, fields: FieldsType{"val1": int64(1)}, timestamp: generateTimestamp(11, r)}) + metrics.AddMetric(&Metric{name: "test", tags: tags1, fields: FieldsType{"val2": int64(1)}, timestamp: generateTimestamp(11, r)}) + } + + if len(metrics.metricsBuckets) != 2 { + t.Errorf("there is diffent number of buckets, wanted 2 got %d", len(metrics.metricsBuckets)) + } + + var aggregatedVal1 int64 + var aggregatedVal2 int64 + for key, metric := range metrics.metricsBuckets { + if key.timestamp != time.Unix(10, 0) && key.timestamp != time.Unix(11, 0) { + t.Errorf("expected bucket timestamp as 10 or 11, got %ds, %dns", key.timestamp.Unix(), key.timestamp.UnixNano()) + } + aggregatedVal1 += metric.fields["val1"].(int64) + if _, ok := metric.fields["val2"]; ok { + aggregatedVal2 += metric.fields["val2"].(int64) + } + } + + if aggregatedVal1 != 2000 { + t.Errorf("all values \"val1\" in buckets inside the object should have been aggregated to 2000, but got %d", aggregatedVal1) + } + + if aggregatedVal2 != 1000 { + t.Errorf("all values \"val2\" in buckets inside the object should have been aggregated to 1000, but got %d", aggregatedVal2) + } + + aggregatedMetrics := metrics.GetMetrics() + if len(metrics.metricsBuckets) != 2 { + t.Errorf("there should be only 2 metrics buckets, but the number is %d", len(metrics.metricsBuckets)) + } + + aggregatedVal1 = 0 + aggregatedVal2 = 0 + for _, metric := range aggregatedMetrics { + if metric.timestamp != time.Unix(10, 0) && metric.timestamp != time.Unix(11, 0) { + t.Errorf("expected aggregated metric timestamp as 10 or 11, got %ds, %dns", metric.timestamp.Unix(), metric.timestamp.UnixNano()) + } + aggregatedVal1 += metric.fields["val1"].(int64) + if _, ok := metric.fields["val2"]; ok { + aggregatedVal2 += metric.fields["val2"].(int64) + } + } + if aggregatedVal1 != 2000 { + t.Errorf("all values \"val1\" of aggregated metrics should have been aggregated to 2000, but got %d", aggregatedVal1) + } + + if aggregatedVal2 != 1000 { + t.Errorf("all values \"val2\" of aggregated metrics should have been aggregated to 1000, but got %d", aggregatedVal2) + } + + metrics.Clear() + aggregatedMetrics = metrics.GetMetrics() + if len(aggregatedMetrics) != 0 { + t.Errorf("metrics object should be empty after clearing, but we got some metrics: %+v", aggregatedMetrics) + } +} + +func TestApproximateHistogram(t *testing.T) { + histo := newReservoirSampling("test", 500) + for i := 0; i != 20; i++ { + histo.AddPoint(10) + } + + mean, median, min, p10, p30, p70, p90, max, count, poolSize := histo.GetStats() + + if count != 20 { + t.Errorf("wrong count before reset, expected 20, got %d", count) + } + + if poolSize != 20 { + t.Errorf("wrong poolSize, expected 20, got %d", poolSize) + } + + if mean != 10 || median != 10 || min != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || max != 10 { + t.Errorf("one of the values is not 10 when it should be 10, mean %v, median %v, 10p %v, 30p %v, 70p %v, 90p %v", mean, median, p10, p30, p70, p90) + } + + histo.Reset() + _, _, _, _, _, _, _, _, count, poolSize = histo.GetStats() + if count != 0 { + t.Errorf("wrong count before reset, expected 0, got %d", count) + } + + if poolSize != 0 { + t.Errorf("wrong poolSize, expected 0, got %d", poolSize) + } + + for i := 0; i != 2000; i++ { + histo.AddPoint(10) + } + + mean, median, min, p10, p30, p70, p90, max, count, poolSize = histo.GetStats() + + if count != 2000 { + t.Errorf("wrong count before reset, expected 2000, got %d", count) + } + + if poolSize != 500 { + t.Errorf("wrong poolSize, expected 500, got %d", poolSize) + } + + if mean != 10 || median != 10 || min != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || max != 10 { + t.Errorf("one of the values is not 10 when it should be 10, mean %v, median %v, 10p %v, 30p %v, 70p %v, 90p %v", mean, median, p10, p30, p70, p90) + } + + histo.Reset() + _, _, _, _, _, _, _, _, count, poolSize = histo.GetStats() + if count != 0 { + t.Errorf("wrong count before reset, expected 0, got %d", count) + } + + if poolSize != 0 { + t.Errorf("wrong poolSize, expected 0, got %d", poolSize) + } + + for i := 0; i != 10000; i++ { + histo.AddPoint((float64(rand.Int63n(100)))) + } + + mean, median, min, p10, p30, p70, p90, max, count, poolSize = histo.GetStats() + + if count != 10000 { + t.Errorf("wrong count before reset, expected 10000, got %d", count) + } + + if poolSize != 500 { + t.Errorf("wrong poolSize, expected 500, got %d", poolSize) + } + + if math.Abs(mean-50) > 5 { + t.Errorf("wrong mean value, expected 50+-5 got %v", mean) + } + + if math.Abs(float64(median-50)) > 5 { + t.Errorf("wrong median value, expected 50+-5 got %v", median) + } + + if float64(min) > 5 { + t.Errorf("wrong min value, expected 0+-5 got %v", min) + } + + if math.Abs(float64(p10-10)) > 5 { + t.Errorf("wrong 10p value, expected 10+-5 got %v", p10) + } + + if math.Abs(float64(p30-30)) > 5 { + t.Errorf("wrong 30p value, expected 30+-5 got %v", p30) + } + + if math.Abs(float64(p70-70)) > 5 { + t.Errorf("wrong 50p value, expected 70+-5 got %v", p70) + } + + if math.Abs(float64(p90-90)) > 5 { + t.Errorf("wrong 90p value, expected 90+-5 got %v", p90) + } + + if math.Abs(float64(max-100)) > 5 { + t.Errorf("wrong max value, expected 100+-5 got %v", max) + } +} + +func TestMetricsHistogramObject(t *testing.T) { + metricsHisto := NewMetricsReservoirSampling() + r := rand.New(rand.NewSource(42)) + tags1 := TagsType{Tag{"tag1", "1"}, Tag{"tag2", "2"}} + for i := 0; i != 2000; i++ { + metricsHisto.AddMetric(&Metric{name: "test", tags: tags1, fields: FieldsType{"val1": int64(r.Int31n(100)), "val2": int64(r.Int31n(100))}, timestamp: generateTimestamp(10, r)}) + metricsHisto.AddMetric(&Metric{name: "test", tags: tags1, fields: FieldsType{"val1": int64(r.Int31n(100)), "val2": int64(r.Int31n(100))}, timestamp: generateTimestamp(15, r)}) + } + metrics := metricsHisto.GetMetrics() + if len(metrics) != 4 { + t.Errorf("received wrong number of histogram metrics, expected 4, got %d", len(metrics)) + } + + for _, metric := range metrics { + for valueName, value := range metric.fields { + if strings.Contains(valueName, "mean") { + if math.Abs(value.(float64)-50) > 5 { + t.Errorf("wrong mean value, expected 50+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "median") { + if math.Abs(value.(float64)-50) > 5 { + t.Errorf("wrong median value, expected 50+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "p10") { + if math.Abs(value.(float64)-10) > 5 { + t.Errorf("wrong p10, expected 10+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "p30") { + if math.Abs(value.(float64)-30) > 5 { + t.Errorf("wrong p30, expected 30+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "p70") { + if math.Abs(value.(float64)-70) > 5 { + t.Errorf("wrong p70, expected 70+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "p90") { + if math.Abs(value.(float64)-90) > 5 { + t.Errorf("wrong p90, expected 90+-5 got %v", value.(float64)) + } + } + if strings.Contains(valueName, "count") { + if value.(uint64) != 2000 { + t.Errorf("wrong number of count, expected 2000 got %v", value.(uint64)) + } + } + if strings.Contains(valueName, "poolsize") { + if value.(uint64) != 1000 { + t.Errorf("wrong number of poolsize, expected 1000 got %v", value.(uint64)) } } - time.Sleep(100 * time.Millisecond) } - }() + } - defer Stop() + metricsHisto.Clear() + if m := metricsHisto.GetMetrics(); len(m) != 0 { + t.Errorf("histogram metrics should be empty after clearing, but we got histogram metrics: %+v", m) + } +} + +func measureFunc(metric *Metric) { + defer TimerMS(metric)() + defer TimerNS(metric)() + time.Sleep(100 * time.Millisecond) +} - metric := Metric{Name: "testname", Timestamp: 12345} - metric.AddValue("value", 42) - metric.AddTag("tag", 40) +func TestTimers(t *testing.T) { + metric := NewMetric("test") + measureFunc(&metric) + fields := metric.fields + if fields["execution_time_ms"].(int64) < 100 { + t.Error("wrong milliseconds") + } + if fields["execution_time_ns"].(int64) < 100000000 { + t.Error("wrong nanoseconds") + } +} - for i := 0; i < b.N; i++ { - Send(metric) +func BenchmarkSimple(b *testing.B) { + cpuProfileFile, err := os.Create("cpu_profile.pprof") + if err != nil { + b.Fatalf("could not create CPU profile: %v", err) + } + defer cpuProfileFile.Close() + + pprof.StartCPUProfile(cpuProfileFile) + defer pprof.StopCPUProfile() + + metrics := NewMetricsAggregate() + + metricToAdd1Tag1Val := Metric{name: "metricToAdd1Tag1Val", tags: TagsType{Tag{"tag1", "tag1"}}, fields: FieldsType{"val1": int64(1)}, timestamp: time.Unix(10, 1111)} + metricToAdd2Tag2Val := Metric{name: "metricToAdd1Tag1Val", tags: TagsType{Tag{"tag1", "tag1"}, Tag{"tag2", "tag2"}}, fields: FieldsType{"val1": int64(1), "val2": int64(2)}, timestamp: time.Unix(10, 1223)} + metricToAdd3Tag3Val := Metric{name: "metricToAdd1Tag1Val", tags: TagsType{Tag{"tag1", "tag1"}, Tag{"tag2", "tag2"}, Tag{"tag3", "tag3"}}, fields: FieldsType{"val1": int64(10), "val2": int64(2), "val3": int64(3)}, timestamp: time.Unix(10, 2531)} + + for b.Loop() { + metrics.AddMetric(&metricToAdd1Tag1Val) + metrics.AddMetric(&metricToAdd2Tag2Val) + metrics.AddMetric(&metricToAdd3Tag3Val) } - fmt.Println("") + heapProfileFile, err := os.Create("heap_profile.pprof") + if err != nil { + log.Fatalf("could not create heap profile: %v", err) + } + defer heapProfileFile.Close() + pprof.WriteHeapProfile(heapProfileFile) +} + +func BenchmarkSendingMetrics(b *testing.B) { + go Run(12345, "/metrics") + + defer Stop() + + metric := Metric{name: "testname", timestamp: time.Unix(12345, 0)} + metric.SetFieldInt64("value1", 42) + metric.SetFieldInt64("value2", 42) + metric.AddTag("tag1", "40") + metric.AddTag("tag2", "40") + + for b.Loop() { + Send(&metric) + } } diff --git a/common/monitoring/reservoirsampling.go b/common/monitoring/reservoirsampling.go new file mode 100644 index 00000000..b954d731 --- /dev/null +++ b/common/monitoring/reservoirsampling.go @@ -0,0 +1,94 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package monitoring + +import ( + "math/rand" + "sort" +) + +// Documentation how does the calculation of percentiles work +// https://en.wikipedia.org/wiki/Reservoir_sampling + +type reservoirSampling struct { + samples []float64 + samplesLimit uint64 + name string + countSinceReset uint64 +} + +func newReservoirSampling(name string, limit uint64) reservoirSampling { + return reservoirSampling{ + samples: make([]float64, 0, limit), + samplesLimit: limit, + name: name, + countSinceReset: 0, + } +} + +func (this *reservoirSampling) AddPoint(val float64) { + this.countSinceReset += 1 + if len(this.samples) < int(this.samplesLimit) { + this.samples = append(this.samples, val) + } else { + if j := rand.Int63n(int64(this.countSinceReset)); j < int64(len(this.samples)) { + this.samples[j] = val + } + } +} + +func (this *reservoirSampling) indexForPercentile(percentile int) int { + return int(float64(len(this.samples)) * 0.01 * float64(percentile)) +} + +func (this *reservoirSampling) Reset() { + this.samples = this.samples[:0] + this.countSinceReset = 0 +} + +func (this *reservoirSampling) GetStats() (mean float64, median float64, min float64, percentile10 float64, percentile30 float64, percentile70 float64, percentile90 float64, max float64, count uint64, poolSize uint64) { + sort.Slice(this.samples, func(i, j int) bool { return this.samples[i] < this.samples[j] }) + + samplesCount := len(this.samples) + if samplesCount == 0 { + return 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + } + + var sum float64 + for _, val := range this.samples { + sum += float64(val) + } + + return sum / float64(samplesCount), + this.samples[this.indexForPercentile(50)], + this.samples[0], + this.samples[this.indexForPercentile(10)], + this.samples[this.indexForPercentile(30)], + this.samples[this.indexForPercentile(70)], + this.samples[this.indexForPercentile(90)], + this.samples[len(this.samples)-1], + this.countSinceReset, + uint64(len(this.samples)) +} diff --git a/common/monitoring/timer.go b/common/monitoring/timer.go new file mode 100644 index 00000000..722839a7 --- /dev/null +++ b/common/monitoring/timer.go @@ -0,0 +1,19 @@ +package monitoring + +import "time" + +// Timer* functions are meant to be used with defer statement to measure runtime of given function: +// defer TimerNS(&metric)() +func TimerMS(metric *Metric) func() { + start := time.Now() + return func() { + metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds()) + } +} + +func TimerNS(metric *Metric) func() { + start := time.Now() + return func() { + metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds()) + } +} diff --git a/core/config.go b/core/config.go index 9d7513f7..0f64cbac 100644 --- a/core/config.go +++ b/core/config.go @@ -129,7 +129,6 @@ func setDefaults() error { viper.SetDefault("enableKafka", true) viper.SetDefault("logAllIL", false) viper.SetDefault("metricsEndpoint", "8088/ecsmetrics") - viper.SetDefault("metricsBufferSize", 1000000) return nil } @@ -201,7 +200,6 @@ func setFlags() error { pflag.Bool("enableKafka", viper.GetBool("enableKafka"), "Turn on the kafka messaging") pflag.Bool("logAllIL", viper.GetBool("logAllIL"), "Send all the logs into IL, including Debug and Trace messages") pflag.String("metricsEndpoint", viper.GetString("metricsEndpoint"), "Http endpoint from which metrics can be scraped: [port/endpoint]") - pflag.Int("metricsBufferSize", viper.GetInt("metricsBufferSize"), "Limit for how many metrics can be stored in buffer in between scraping requests") pflag.Parse() return viper.BindPFlags(pflag.CommandLine) diff --git a/core/core.go b/core/core.go index ade41804..d6a2cebf 100644 --- a/core/core.go +++ b/core/core.go @@ -35,8 +35,8 @@ import ( "syscall" "time" - "github.com/AliceO2Group/Control/common/ecsmetrics" "github.com/AliceO2Group/Control/common/event/topic" + "github.com/AliceO2Group/Control/common/golangmetrics" "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" @@ -82,13 +82,13 @@ func runMetrics() { go func() { log.Infof("Starting to listen on endpoint %s:%d for metrics", endpoint, port) - if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed { - ecsmetrics.StopGolangMetrics() + if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint)); err != nil && err != http.ErrServerClosed { + golangmetrics.Stop() log.Errorf("failed to run metrics on port %d and endpoint: %s") } }() - ecsmetrics.StartGolangMetrics(10 * time.Second) + golangmetrics.Start(10 * time.Second) } // Run is the entry point for this scheduler. @@ -144,7 +144,7 @@ func Run() error { // Plugins need to start after taskman is running, because taskman provides the FID integration.PluginsInstance().InitAll(state.taskman.GetFrameworkID()) runMetrics() - defer ecsmetrics.StopGolangMetrics() + defer golangmetrics.Stop() defer monitoring.Stop() log.WithField("level", infologger.IL_Devel).Infof("Everything initiated and listening on control port: %d", viper.GetInt("controlPort")) diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 00000000..a925513f --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,246 @@ + + + + + + + + + + + + + + + + + + + + + + + + +# Metrics in ECS + +## Overview and simple usage + +ECS core implements metrics in folder common/monitoring. Metrics stack in ALICE +experiment is based on influxdb and Telegraf, where Telegraf scrapes metrics +from given http endpoint in arbitrary format and sends it into the influxdb database +instance. In order to expose endpoint with metrics we can set cli parameter `metricsEndpoint`, +which is in format `[port]/[endpoint]` (default: `8088/ecsmetrics`). +After running core with given parameter we can scrape this endpoint with eg. curl: + +``` +curl http://127.0.0.1:8088/ecsmetrics +``` + +Result of this command is for example: + +``` +kafka,subsystem=ECS,topic=aliecs.run send_bytes=1638400u,sent_messages=42u 1746457955000000000 +``` + +This format is called [influx line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/). +We will use this example to introduce influxdb metrics format. Every line is +one metric and every metric is composed from multiple parts separated by commas +and spaces: + +1) name of measurement - (required) string on the beginning of the line (`golangruntimemetrics`) +2) comma +3) tags - (optional) key-value list separated by commas. `subsystem=ECS,topic=aliecs.run` is +the example of key-value list, where subsystem and topic are the keys, +ECS and aliecs.run are values. +4) space - divides measurement and tags from the fields holding measurement data +5) fields - (required) actual values in same format as tags. We support +`int64`, `uint64` and `float64` values (`/sched/goroutines:goroutines=42u`) +6) space - divides fields and timestamp +7) timestamp - (optional) int64 value of unix timestamp in ns + +In order to provide support for this format we introduced Metric structure in [common/monitoring/metric.go](../common/monitoring/metric.go). +Following code shows how to create a Metric with `measurement` as measurement name, +one tag `tag1=val1` and field `field1=42u`: + +```go +m := monitoring.NewMetric("measurement", time.Now()) +m.AddTag("tag1", "val1") +m.SetFieldUInt64("field1", 42) +``` + +However we also need to be able to store metrics, so these can be scraped correctly. +This mechanism is implemented in [common/monitoring/monitoring.go](../common/monitoring/monitoring.go). +Metrics endpoint is run by calling `Run(port, endpointName)`. As this method is blocking +it is advised to call it from `goroutine`. After this method is called we can than send +metrics via methods `Send` and `SendHistogrammable`. If you want to send simple metrics +(eg. counter of messages sent) you are advised to use simple `Send`. However, if you are +interested into having percentiles reported from metrics you should use +`SendHistogrammable`. + +```go +go monigoring.Run(8088, "/metrics") +m := monitoring.NewMetric("measurement", time.Now()) +m.AddTag("tag1", "val1") +m.SetFieldUInt64("field1", 42) +monigoring.Send(&m) +monigoring.SendHistogrammable(&m) +monigoring.Stop() +``` + +Example for this use-case is duration of some function, +eg. measure sending batch of messages. If we want the best coverage of metrics possible +we can combine both of these to measure amount of messages send per batch and +also measurement duration of the send. For example in code you can take a look actual +actual code in [writer.go](../common/event/writer.go) where we are sending multiple +fields per metric and demonstrate full potential of these metrics. + +Previous code example will result in following metrics to be reported: + +``` +measurement,tag1=val1 field1=1 [timestamp] +measurement,tag1=val1 field1_mean=1,field1_median=1,field1_min=1,field1_p10=1,field1_p30=1,field1_p70=1,field1_p90=1,field1_max=1,field1_count=1,field1_poolsize=1 [timestamp] +``` + +In following text we will talk about aggregating over time interval +which is always 1 second. + +First metric is self explanatory, but we can see that Histogrammable metric +reports multiple percentiles, mean, min and max. These values can be the same if +we don't receive send values during 1 second. Moreover it reports `count` +and `poolsize`, where `count` describes number of times this metric was sent +and `poolsize` is internal metric, which will be described later. + +## Types and aggregation of metrics + +### Metric types + +We mentioned in previous part that there are two ways how to send metrics in ECS +resulting in two different outcomes, based on these outcomes we talk about two +metric types: + +1) Counter - `Send` +2) Histogrammable - `SendHistogrammable` + +Creation of both metrics is done by `NewMetric(measurement, timestamp)`, +so both use same object `Metric`. The distinction is done by Send methods: +`Send` and `SendHistogrammable`. + +Simple `Send(*Metric)`is used to just store metric with given tags and fields without +creating any other information. This metric is just aggregated +(more about it later). + +Sending metric through `SendHistogrammable(*Metric)` +will result in metric being added into the collection of metrics with same +measurement and tags. This collection is used to compute percentiles +(10, 30, 50, 70, 90), min, max and count (count of elements added into +the collection since last reset). These values are reported as fields +by appending strings to the name of original field: + +| Meaning | Appended field | +| --------|-------------| +| 10-percentile | field_10p | +| 30-percentile | field_30p | +| 70-percentile | field_70p | +| 90-percentile | field_90p | +| median (50-percentile) | field_median | +| mean | field_mean | +| min | field_min | +| max | field_max | +| count | field_count | + +### Aggregation + +To reduce network bandwidth and RAM usage, we aggregate all metrics. +This is done by grouping them into one-second intervals based on: + +- Timestamps rounded down to the nearest second +- Measurement name +- Tags + +This aggregation method applies to all metric types. + +If multiple metrics share the same measurement name and tags but have timestamps +less than one second apart, they will be grouped into a single bucket, using the +timestamp rounded down to the nearest second. + +However, if the measurement name or tags even slightly differ the metrics +will not be grouped and will remain separate. Also, if a metric is missing +a tag present in the others, it won't be aggregated with them. + +``` +notaggregated,tag1=val1 fields1=1i 1000000123 +aggregated,tag1=val1 fields1=1i 1000000001 +aggregated,tag1=val1 fields1=1i 1000000021 +aggregated,tag1=val1 fields1=1i,fields2=1i 1000000021 +aggregated,tag1=val1,tag2=val2 fields1=1i 1000030021 +aggregated,tag1=val1 fields1=2i 2000000021 +``` + +If all of these metrics are send and thus aggregated we will result in following: + +``` +notaggregated,tag1=val1 fields1=1i 1000000000 +aggregated,tag1=val1 fields1=3i,fields2=1i 1000000000 +aggregated,tag1=val1,tag2=val2 fields1=1i 1000000000 +aggregated,tag1=val1 fields1=2i 2000000000 +``` + +Explanation: + +- `notaggregated` is unique measurement. +- `aggregated` with one tag will have value of `fields1` equal to 3 +as three metrics fell into the same timestamp bucket and `fields2` was aggregated into the same metric +as tags and measurement were the same. +- `aggregated` with either multiple tags or timestamp which is over 2s +cannot be aggregated anywhere and are held as unique values. + +The same would happen Histogrammables, except that the aggregation would not be addition +if different points, but creating statistical report as mentioned in previous part. + +## Implementation details + +### Event loop + +In order to send metrics from unlimited amount of goroutines, we need to have +robust and thread-safe mechanism. It is implemented in [common/monitoring/monitoring.go](../common/monitoring/monitoring.go) +as event loop (`eventLoop`) that reads data from two buffered channels (`metricsChannel` and `metricsHistosChannel`) +with one goroutine. Apart from reading messages from these two channels event loop also handles +scraping requests from `http.Server` endpoint. As the http endpoint is called by a +different goroutine than the one processing event loop, we use another two channels: +`metricsRequestedChannel` which is used by the endpoint to request current metrics. +Transformed metrics are sent via `metricsExportedToRequest` back to the endpoint. + +Methods `Send` and `SendHistogrammable` write to the corresponding channels, +which are consumed by event loop. + +### Hashing to aggregate + +In order to correctly implement behaviour described in the part about Aggregation +we use the same implementation in two container aggregating objects +`MetricsAggregate`, `MetricsReservoirSampling` implemented in files +[common/monitoring/metricsaggregate.go](../common/monitoring/metricsaggregate.go) +and [metricsreservoirsampling.go](../common/monitoring/metricsreservoirsampling.go) +in the same directory. The implementation is done as different buckets +in map with distinct keys (`metricsBuckets`). These keys need to be unique +according to the timestamp and tags. We use struct `key` composed +from `time.Time` and `maphash.Hash`. Hash was chosen so we don't have +to compare arbitrary amount of tags, where keys and it's values +must be compared piece by piece. If we are inserting new bucket into `metricsBuckets` +we create new key by rounding down timestamp `time.Unix(metric.timestamp.Unix(), 0)` +and hashing all tags and their values. This will result to unique buckets +distinguished by timestamp and tags collection. + +However there is a potential problem: +We are storing tags as unsorted slice in `Metric`, so it is possible to create +two metrics with same tags, but in different order. These will result +in different hashes as maphash is order-dependent. + +### Sampling reservoir + +We are computing percentiles from all metrics sent via `SendHistogrammable` +method, but we are computing these from streaming data with unknown +limits, so we cannot easily create histogram. However there exist simple +solution called sampling reservoir and is discussed in [this wiki article](https://en.wikipedia.org/wiki/Reservoir_sampling). +It uses easy principle where every value from streaming data must have +the same probability of staying inside fixed buffer called reservoir. diff --git a/go.mod b/go.mod index 3610a085..7c605f97 100644 --- a/go.mod +++ b/go.mod @@ -118,6 +118,7 @@ require ( github.com/huandu/xstrings v1.4.0 // indirect github.com/imdario/mergo v0.3.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/influxdata/line-protocol/v2 v2.2.1 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect diff --git a/go.sum b/go.sum index 0bae094a..6dfc979d 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17 h1:QeVUsEDNrLBW4tMgZHvxy18sKtr6VI492kBhUfhDJNI= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.2.5 h1:6iR5tXJ/e6tJZzzdMc1km3Sa7RRIVBKAK32O2s7AYfo= @@ -75,6 +76,9 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/flosch/pongo2/v6 v6.0.0 h1:lsGru8IAzHgIAw6H2m4PCyleO58I40ow6apih0WprMU= github.com/flosch/pongo2/v6 v6.0.0/go.mod h1:CuDpFm47R0uGGE7z13/tTlt1Y6zdxvr2RLT5LJhsHEU= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -129,6 +133,8 @@ github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -197,6 +203,12 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= +github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= @@ -223,6 +235,7 @@ github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -288,6 +301,7 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= @@ -576,6 +590,7 @@ google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= @@ -590,6 +605,7 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=