From bf3807f21d45e35f95c427090429adf6a0976caf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 13 May 2025 10:41:55 +0200 Subject: [PATCH] [core] better kafka metrics --- common/event/writer.go | 73 +++++++++++++++++++------------------ common/event/writer_test.go | 3 +- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index 64f9a98f..7d4c9e51 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -42,9 +42,10 @@ import ( ) var ( - log = logger.New(logrus.StandardLogger(), "event") - KAFKAWRITER = "kafka_writer" - KAFKAPREPARE = "kafka_prepare" + log = logger.New(logrus.StandardLogger(), "event") + KAFKAWRITER = "kafka_writer" + KAFKAPREPARE = "kafka_prepare" + writerBatchsize = 100 ) type Writer interface { @@ -76,7 +77,7 @@ type KafkaWriter struct { toBatchMessagesChan chan kafka.Message messageBuffer FifoBuffer[kafka.Message] // NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster - writeFunction func([]kafka.Message, *monitoring.Metric) + writeFunction func([]kafka.Message) runningWorkers sync.WaitGroup batchingLoopDoneCh chan struct{} } @@ -94,19 +95,30 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { Topic: string(topic), Balancer: &kafka.Hash{}, AllowAutoTopicCreation: true, + BatchTimeout: time.Millisecond, // we are batching the messages ourselves and writing sync into writer, so long timeout is not necessary + BatchSize: writerBatchsize, }, - toBatchMessagesChan: make(chan kafka.Message, 10000), + toBatchMessagesChan: make(chan kafka.Message, 100000), messageBuffer: NewFifoBuffer[kafka.Message](), runningWorkers: sync.WaitGroup{}, batchingLoopDoneCh: make(chan struct{}, 1), } - writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) { - defer monitoring.TimerNS(metric)() + writer.writeFunction = func(messages []kafka.Message) { + metric := writer.newMetric(KAFKAWRITER) + metric.SetFieldUInt64("messages_sent", uint64(len(messages))) + metric.SetFieldUInt64("messages_failed", 0) + + metricDuration := writer.newMetric(KAFKAWRITER) + defer monitoring.SendHistogrammable(&metricDuration) + defer monitoring.TimerNS(&metricDuration)() + if err := writer.WriteMessages(context.Background(), messages...); err != nil { metric.SetFieldUInt64("messages_failed", uint64(len(messages))) log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err) } + + monitoring.Send(&metric) } go writer.writingLoop() @@ -138,18 +150,11 @@ func (w *KafkaWriter) writingLoop() { w.runningWorkers.Done() return default: - messagesToSend := w.messageBuffer.PopMultiple(100) + messagesToSend := w.messageBuffer.PopMultiple(uint(writerBatchsize)) if len(messagesToSend) == 0 { continue } - - metric := w.newMetric(KAFKAWRITER) - metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend))) - metric.SetFieldUInt64("messages_failed", 0) - - w.writeFunction(messagesToSend, &metric) - - monitoring.Send(&metric) + w.writeFunction(messagesToSend) } } } @@ -241,25 +246,23 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time metric := w.newMetric(KAFKAPREPARE) - func() { - defer monitoring.TimerNS(&metric)() - wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp) - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Errorf("Failed to convert event to kafka event: %s", err.Error()) - return - } + defer monitoring.SendHistogrammable(&metric) + defer monitoring.TimerNS(&metric)() - message, err := kafkaEventToKafkaMessage(wrappedEvent, key) - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Errorf("Failed to convert kafka event to message: %s", err.Error()) - return - } - w.toBatchMessagesChan <- message - }() + wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp) + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Errorf("Failed to convert event to kafka event: %s", err.Error()) + return + } - monitoring.Send(&metric) + message, err := kafkaEventToKafkaMessage(wrappedEvent, key) + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Errorf("Failed to convert kafka event to message: %s", err.Error()) + return + } + w.toBatchMessagesChan <- message } diff --git a/common/event/writer_test.go b/common/event/writer_test.go index fc5be690..48b9fd8b 100644 --- a/common/event/writer_test.go +++ b/common/event/writer_test.go @@ -27,7 +27,6 @@ package event import ( "sync" - "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -47,7 +46,7 @@ var _ = Describe("Writer", func() { writer.runningWorkers = sync.WaitGroup{} writer.batchingLoopDoneCh = make(chan struct{}, 1) - writer.writeFunction = func(messages []kafka.Message, _ *monitoring.Metric) { + writer.writeFunction = func(messages []kafka.Message) { Expect(len(messages)).To(Equal(1)) event := &pb.Event{} proto.Unmarshal(messages[0].Value, event)