Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions common/ecsmetrics/metric.go

This file was deleted.

28 changes: 0 additions & 28 deletions common/ecsmetrics/metrics_test.go

This file was deleted.

17 changes: 8 additions & 9 deletions common/event/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sync"
"time"

"github.com/AliceO2Group/Control/common/ecsmetrics"
"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/common/logger/infologger"
Expand Down Expand Up @@ -83,7 +82,7 @@ type KafkaWriter struct {
}

func (w *KafkaWriter) newMetric(name string) monitoring.Metric {
metric := ecsmetrics.NewMetric(name)
metric := monitoring.NewMetric(name)
metric.AddTag("topic", w.Topic)
return metric
}
Expand All @@ -103,9 +102,9 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
}

writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
defer ecsmetrics.TimerNS(metric)()
defer monitoring.TimerNS(metric)()
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
metric.AddValue("messages_failed", len(messages))
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
}
}
Expand Down Expand Up @@ -145,12 +144,12 @@ func (w *KafkaWriter) writingLoop() {
}

metric := w.newMetric(KAFKAWRITER)
metric.AddValue("messages_sent", len(messagesToSend))
metric.AddValue("messages_failed", 0)
metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend)))
metric.SetFieldUInt64("messages_failed", 0)

w.writeFunction(messagesToSend, &metric)

monitoring.Send(metric)
monitoring.Send(&metric)
}
}
}
Expand Down Expand Up @@ -243,7 +242,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
metric := w.newMetric(KAFKAPREPARE)

func() {
defer ecsmetrics.TimerNS(&metric)()
defer monitoring.TimerNS(&metric)()
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
if err != nil {
log.WithField("event", e).
Expand All @@ -262,5 +261,5 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
w.toBatchMessagesChan <- message
}()

monitoring.Send(metric)
monitoring.Send(&metric)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
package ecsmetrics
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package golangmetrics

import (
internalmetrics "runtime/metrics"
Expand Down Expand Up @@ -33,14 +57,14 @@ func gather() monitoring.Metric {

internalmetrics.Read(samples)

metric := NewMetric("golangruntimemetrics")
metric := monitoring.NewMetric("golangruntimemetrics")

for _, sample := range samples {
switch sample.Value.Kind() {
case internalmetrics.KindUint64:
metric.AddValue(sample.Name, sample.Value.Uint64())
metric.SetFieldUInt64(sample.Name, sample.Value.Uint64())
case internalmetrics.KindFloat64:
metric.AddValue(sample.Name, sample.Value.Float64())
metric.SetFieldFloat64(sample.Name, sample.Value.Float64())
case internalmetrics.KindFloat64Histogram:
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
continue
Expand All @@ -52,7 +76,7 @@ func gather() monitoring.Metric {
return metric
}

func StartGolangMetrics(period time.Duration) {
func Start(period time.Duration) {
log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting")
go func() {
log.Debug("Starting golang metrics goroutine")
Expand All @@ -64,14 +88,15 @@ func StartGolangMetrics(period time.Duration) {
return
default:
log.Debug("sending golang metrics")
monitoring.Send(gather())
metric := gather()
monitoring.Send(&metric)
time.Sleep(period)
}
}
}()
}

func StopGolangMetrics() {
func Stop() {
endRequestChannel <- struct{}{}
<-endRequestChannel
}
50 changes: 50 additions & 0 deletions common/monitoring/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package monitoring

import (
"hash/maphash"
"time"
)

type key struct {
nameTagsHash uint64
timestamp time.Time
}

func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
hash.WriteString(metric.name)

for _, tag := range metric.tags {
hash.WriteString(tag.name)
hash.WriteString(tag.value)
}
}

func hashValueAndReset(hash *maphash.Hash) uint64 {
hashValue := hash.Sum64()
hash.Reset()
return hashValue
}
132 changes: 118 additions & 14 deletions common/monitoring/metric.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,131 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package monitoring

import (
"fmt"
"io"
"time"

lp "github.com/influxdata/line-protocol/v2/lineprotocol"
)

type (
TagsType map[string]any
ValuesType map[string]any
Tag struct {
name string
value string
}

TagsType []Tag
FieldsType map[string]any
)

type Metric struct {
Name string `json:"name"`
Values ValuesType `json:"values"`
Tags TagsType `json:"tags,omitempty"`
Timestamp int64 `json:"timestamp"`
name string
fields FieldsType
tags TagsType
timestamp time.Time
}

func (metric *Metric) AddTag(tagName string, value any) {
if metric.Tags == nil {
metric.Tags = make(TagsType)
// Return empty metric, it is used right now mostly in string
// to report metrics correctly to influxdb use NewECSMetric
func NewDefaultMetric(name string, timestamp time.Time) Metric {
return Metric{
name: name, timestamp: timestamp,
}
metric.Tags[tagName] = value
}

func (metric *Metric) AddValue(valueName string, value any) {
if metric.Values == nil {
metric.Values = make(ValuesType)
// creates empty metric with tag subsystem=ECS, which
// is used by Telegraf to send metrics from ECS to correct
// bucket
func NewMetric(name string) Metric {
metric := NewDefaultMetric(name, time.Now())
metric.AddTag("subsystem", "ECS")
return metric
}

func (metric *Metric) AddTag(tagName string, value string) {
metric.tags = append(metric.tags, Tag{name: tagName, value: value})
}

func (metric *Metric) setField(fieldName string, field any) {
if metric.fields == nil {
metric.fields = make(FieldsType)
}
metric.Values[valueName] = value
metric.fields[fieldName] = field
}

func (metric *Metric) SetFieldInt64(fieldName string, field int64) {
metric.setField(fieldName, field)
}

func (metric *Metric) SetFieldUInt64(fieldName string, field uint64) {
metric.setField(fieldName, field)
}

func (metric *Metric) SetFieldFloat64(fieldName string, field float64) {
metric.setField(fieldName, field)
}

func (metric *Metric) MergeFields(other *Metric) {
for fieldName, field := range other.fields {
if storedField, ok := metric.fields[fieldName]; ok {
switch v := field.(type) {
case int64:
metric.fields[fieldName] = v + storedField.(int64)
case uint64:
metric.fields[fieldName] = v + storedField.(uint64)
case float64:
metric.fields[fieldName] = v + storedField.(float64)
}
} else {
metric.fields[fieldName] = field
}
}
}

func Format(writer io.Writer, metrics []Metric) error {
var enc lp.Encoder

for _, metric := range metrics {
enc.StartLine(metric.name)
for _, tag := range metric.tags {
enc.AddTag(tag.name, tag.value)
}

for fieldName, field := range metric.fields {
// we cannot panic as we provide accessors only for allowed type with AddField*
enc.AddField(fieldName, lp.MustNewValue(field))
}
enc.EndLine(metric.timestamp)
}

if err := enc.Err(); err != nil {
return err
}

_, err := fmt.Fprintf(writer, "%s", enc.Bytes())
return err
}
Loading