diff --git a/common/ecsmetrics/metric.go b/common/ecsmetrics/metric.go index fda30f87..a35af762 100644 --- a/common/ecsmetrics/metric.go +++ b/common/ecsmetrics/metric.go @@ -12,3 +12,19 @@ func NewMetric(name string) monitoring.Metric { metric.AddTag("subsystem", "ECS") return metric } + +// Timer* functions are meant to be used with defer statement to measure runtime of given function: +// defer TimerNS(&metric)() +func TimerMS(metric *monitoring.Metric) func() { + start := time.Now() + return func() { + metric.AddValue("execution_time_ms", time.Since(start).Milliseconds()) + } +} + +func TimerNS(metric *monitoring.Metric) func() { + start := time.Now() + return func() { + metric.AddValue("execution_time_ns", time.Since(start).Nanoseconds()) + } +} diff --git a/common/ecsmetrics/metrics_test.go b/common/ecsmetrics/metrics_test.go new file mode 100644 index 00000000..4c46569c --- /dev/null +++ b/common/ecsmetrics/metrics_test.go @@ -0,0 +1,28 @@ +package ecsmetrics + +import ( + "fmt" + "testing" + "time" + + "github.com/AliceO2Group/Control/common/monitoring" +) + +func measureFunc(metric *monitoring.Metric) { + defer TimerMS(metric)() + defer TimerNS(metric)() + time.Sleep(100 * time.Millisecond) +} + +func TestSimpleStartStop(t *testing.T) { + metric := NewMetric("test") + measureFunc(&metric) + fmt.Println(metric.Values["execution_time_ms"]) + fmt.Println(metric.Values["execution_time_ns"]) + if metric.Values["execution_time_ms"].(int64) < 100 { + t.Error("wrong milliseconds") + } + if metric.Values["execution_time_ns"].(int64) < 100000000 { + t.Error("wrong nanoseconds") + } +} diff --git a/common/event/event_suite_test.go b/common/event/event_suite_test.go new file mode 100644 index 00000000..03af84ee --- /dev/null +++ b/common/event/event_suite_test.go @@ -0,0 +1,13 @@ +package event_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestEvent(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Event Suite") +} diff --git a/common/event/fifobuffer.go b/common/event/fifobuffer.go new file mode 100644 index 00000000..a49cf4f2 --- /dev/null +++ b/common/event/fifobuffer.go @@ -0,0 +1,87 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + "math" + "sync" +) + +// This structure is meant to be used as a threadsafe FIFO with builtin waiting for new data +// in its Pop and PopMultiple functions. It is meant to be used with multiple goroutines, it is a +// waste of synchronization mechanisms if used synchronously. +type FifoBuffer[T any] struct { + lock sync.Mutex + cond sync.Cond + + buffer []T +} + +func NewFifoBuffer[T any]() (result FifoBuffer[T]) { + result = FifoBuffer[T]{ + lock: sync.Mutex{}, + } + result.cond = *sync.NewCond(&result.lock) + return +} + +func (this *FifoBuffer[T]) Push(value T) { + this.cond.L.Lock() + this.buffer = append(this.buffer, value) + this.cond.Signal() + this.cond.L.Unlock() +} + +// Blocks until it has some value in internal buffer +func (this *FifoBuffer[T]) PopMultiple(numberToPop uint) (result []T) { + this.cond.L.Lock() + defer this.cond.L.Unlock() + + for len(this.buffer) == 0 { + this.cond.Wait() + // this check is used when ReleaseGoroutines is called on waiting goroutine + if len(this.buffer) == 0 { + return + } + } + + result = make([]T, int(math.Min(float64(numberToPop), float64(len(this.buffer))))) + copy(result, this.buffer[0:len(result)]) + this.buffer = this.buffer[len(result):] + + return +} + +func (this *FifoBuffer[T]) Length() int { + this.cond.L.Lock() + defer this.cond.L.Unlock() + return len(this.buffer) +} + +func (this *FifoBuffer[T]) ReleaseGoroutines() { + this.cond.L.Lock() + this.cond.Broadcast() + this.cond.L.Unlock() +} diff --git a/common/event/fifobuffer_test.go b/common/event/fifobuffer_test.go new file mode 100644 index 00000000..fed31837 --- /dev/null +++ b/common/event/fifobuffer_test.go @@ -0,0 +1,129 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("FifoBuffer", func() { + When("Poping lower amount of items than inside of a buffer", func() { + It("returns requested items", func() { + buffer := NewFifoBuffer[int]() + buffer.Push(1) + buffer.Push(2) + buffer.Push(3) + + Expect(buffer.Length()).To(Equal(3)) + + results := buffer.PopMultiple(2) + Expect(results).To(Equal([]int{1, 2})) + }) + }) + + When("Poping higher amount of items than inside of a buffer", func() { + It("returns only available items", func() { + buffer := NewFifoBuffer[int]() + buffer.Push(1) + + results := buffer.PopMultiple(2) + Expect(results).To(Equal([]int{1})) + }) + }) + + When("We use buffer with multiple goroutines pushing first (PopMultiple)", func() { + It("is synchronised properly", func() { + buffer := NewFifoBuffer[int]() + channel := make(chan struct{}) + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + buffer.Push(1) + channel <- struct{}{} + wg.Done() + }() + + go func() { + <-channel + result := buffer.PopMultiple(42) + Expect(result, 1) + wg.Done() + }() + + wg.Wait() + }) + }) + + When("We use buffer with multiple goroutines popping first", func() { + It("is synchronised properly", func() { + buffer := NewFifoBuffer[int]() + channel := make(chan struct{}) + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + // Pop is blocking is we have empty buffer, so we notify before + channel <- struct{}{} + result := buffer.PopMultiple(42) + Expect(result, 1) + wg.Done() + }() + + go func() { + <-channel + buffer.Push(1) + wg.Done() + }() + + wg.Wait() + }) + }) + + When("We block FifoBuffer without data and call Release", func() { + It("releases goroutines properly", func() { + buffer := NewFifoBuffer[int]() + everythingDone := sync.WaitGroup{} + channel := make(chan struct{}) + + everythingDone.Add(1) + go func() { + channel <- struct{}{} + buffer.PopMultiple(42) + everythingDone.Done() + }() + <-channel + time.Sleep(100 * time.Millisecond) + buffer.ReleaseGoroutines() + everythingDone.Wait() + }) + }) +}) diff --git a/common/event/writer.go b/common/event/writer.go index 134a28d9..fb3fc516 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -27,11 +27,14 @@ package event import ( "context" "fmt" + "sync" "time" + "github.com/AliceO2Group/Control/common/ecsmetrics" "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/segmentio/kafka-go" "github.com/sirupsen/logrus" @@ -39,7 +42,11 @@ import ( "google.golang.org/protobuf/proto" ) -var log = logger.New(logrus.StandardLogger(), "event") +var ( + log = logger.New(logrus.StandardLogger(), "event") + KAFKAWRITER = "kafka_writer" + KAFKAPREPARE = "kafka_prepare" +) type Writer interface { WriteEvent(e interface{}) @@ -53,9 +60,32 @@ func (*DummyWriter) WriteEvent(interface{}) {} func (*DummyWriter) WriteEventWithTimestamp(interface{}, time.Time) {} func (*DummyWriter) Close() {} +// Kafka writer is used to convert events from events.proto into kafka messages and to write them. +// it is built with 2 workers: +// +// #1 is gathering kafka.Message from any goroutine which sends message into buffered channel and puts them into FifoBuffer. +// #2 is poping any messages from FifoBuffer and sends them to Kafka +// +// The reason for this setup over setting Async: true in kafka.Writer is the ability to have some error handling +// of failed messages. Moreover if we used only one worker that gathers messages from channel and then sends them directly to Kafka, +// we would block whole core if we receive lot of messages at once. So we split functionality into two workers: one is +// putting all messages into the buffer, so if we have a lot of messages buffer just grows without blocking whole core and the +// second does all the sending. This setup allows us to gather messages from any amount of goroutines without blocking/losing messages. +// Another benefit is batching messages instead of writing them one by one. type KafkaWriter struct { *kafka.Writer - toWriteChan chan kafka.Message + toBatchMessagesChan chan kafka.Message + messageBuffer FifoBuffer[kafka.Message] + // NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster + writeFunction func([]kafka.Message, *monitoring.Metric) + runningWorkers sync.WaitGroup + batchingLoopDoneCh chan struct{} +} + +func (w *KafkaWriter) newMetric(name string) monitoring.Metric { + metric := ecsmetrics.NewMetric(name) + metric.AddTag("topic", w.Topic) + return metric } func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { @@ -66,16 +96,32 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { Balancer: &kafka.Hash{}, AllowAutoTopicCreation: true, }, - toWriteChan: make(chan kafka.Message, 1000), + toBatchMessagesChan: make(chan kafka.Message, 10000), + messageBuffer: NewFifoBuffer[kafka.Message](), + runningWorkers: sync.WaitGroup{}, + batchingLoopDoneCh: make(chan struct{}, 1), + } + + writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) { + defer ecsmetrics.TimerNS(metric)() + if err := writer.WriteMessages(context.Background(), messages...); err != nil { + metric.AddValue("messages_failed", len(messages)) + log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err) + } } go writer.writingLoop() + go writer.batchingLoop() + return writer } func (w *KafkaWriter) Close() { if w != nil { - close(w.toWriteChan) + // We are waiting until both loops (batching and writing) are done + w.runningWorkers.Add(2) + close(w.toBatchMessagesChan) + w.runningWorkers.Wait() w.Writer.Close() } } @@ -86,16 +132,38 @@ func (w *KafkaWriter) WriteEvent(e interface{}) { } } -// TODO: we can optimise this to write multiple message at once func (w *KafkaWriter) writingLoop() { - for message := range w.toWriteChan { - err := w.WriteMessages(context.Background(), message) - if err != nil { - log.Errorf("failed to write async kafka message: %w", err) + for { + select { + case <-w.batchingLoopDoneCh: + w.runningWorkers.Done() + return + default: + messagesToSend := w.messageBuffer.PopMultiple(100) + if len(messagesToSend) == 0 { + continue + } + + metric := w.newMetric(KAFKAWRITER) + metric.AddValue("messages_sent", len(messagesToSend)) + metric.AddValue("messages_failed", 0) + + w.writeFunction(messagesToSend, &metric) + + monitoring.Send(metric) } } } +func (w *KafkaWriter) batchingLoop() { + for message := range w.toBatchMessagesChan { + w.messageBuffer.Push(message) + } + w.batchingLoopDoneCh <- struct{}{} + w.messageBuffer.ReleaseGoroutines() + w.runningWorkers.Done() +} + type HasEnvID interface { GetEnvironmentId() string } @@ -108,104 +176,52 @@ func extractAndConvertEnvID[T HasEnvID](object T) []byte { return nil } -func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) { - if w == nil { - return +// TODO: there should be written test to verify converting all of these messages +func internalEventToKafkaEvent(internalEvent interface{}, timestamp time.Time) (kafkaEvent *pb.Event, key []byte, err error) { + kafkaEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), } - var ( - err error - wrappedEvent *pb.Event - key []byte = nil - ) - - switch e := e.(type) { + switch e := internalEvent.(type) { case *pb.Ev_MetaEvent_CoreStart: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, - } + kafkaEvent.Payload = &pb.Event_CoreStartEvent{CoreStartEvent: e} case *pb.Ev_MetaEvent_MesosHeartbeat: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, - } + kafkaEvent.Payload = &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e} case *pb.Ev_MetaEvent_FrameworkEvent: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, - } + kafkaEvent.Payload = &pb.Event_FrameworkEvent{FrameworkEvent: e} case *pb.Ev_TaskEvent: key = []byte(e.Taskid) if len(key) == 0 { key = nil } - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_TaskEvent{TaskEvent: e}, - } + kafkaEvent.Payload = &pb.Event_TaskEvent{TaskEvent: e} case *pb.Ev_RoleEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RoleEvent{RoleEvent: e}, - } + kafkaEvent.Payload = &pb.Event_RoleEvent{RoleEvent: e} case *pb.Ev_EnvironmentEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, - } + kafkaEvent.Payload = &pb.Event_EnvironmentEvent{EnvironmentEvent: e} case *pb.Ev_CallEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CallEvent{CallEvent: e}, - } + kafkaEvent.Payload = &pb.Event_CallEvent{CallEvent: e} case *pb.Ev_IntegratedServiceEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, - } + kafkaEvent.Payload = &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e} case *pb.Ev_RunEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RunEvent{RunEvent: e}, - } - } - - if wrappedEvent == nil { + kafkaEvent.Payload = &pb.Event_RunEvent{RunEvent: e} + default: err = fmt.Errorf("unsupported event type") - } else { - err = w.doWriteEvent(key, wrappedEvent) } - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Error(err.Error()) - } + return } -func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { - if w == nil { - return nil - } - - data, err := proto.Marshal(e) +func kafkaEventToKafkaMessage(kafkaEvent *pb.Event, key []byte) (kafka.Message, error) { + data, err := proto.Marshal(kafkaEvent) if err != nil { - return fmt.Errorf("failed to marshal event: %w", err) + return kafka.Message{}, fmt.Errorf("failed to marshal event: %w", err) } message := kafka.Message{ @@ -216,11 +232,35 @@ func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { message.Key = key } - select { - case w.toWriteChan <- message: - default: - log.Warnf("Writer of kafka topic [%s] cannot write because channel is full, discarding a message", w.Writer.Topic) + return message, nil +} + +func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) { + if w == nil { + return } - return nil + metric := w.newMetric(KAFKAPREPARE) + + func() { + defer ecsmetrics.TimerNS(&metric)() + wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp) + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Errorf("Failed to convert event to kafka event: %s", err.Error()) + return + } + + message, err := kafkaEventToKafkaMessage(wrappedEvent, key) + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Errorf("Failed to convert kafka event to message: %s", err.Error()) + return + } + w.toBatchMessagesChan <- message + }() + + monitoring.Send(metric) } diff --git a/common/event/writer_test.go b/common/event/writer_test.go new file mode 100644 index 00000000..fc5be690 --- /dev/null +++ b/common/event/writer_test.go @@ -0,0 +1,68 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package event + +import ( + "sync" + + "github.com/AliceO2Group/Control/common/monitoring" + pb "github.com/AliceO2Group/Control/common/protos" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/segmentio/kafka-go" + "google.golang.org/protobuf/proto" +) + +var _ = Describe("Writer", func() { + When("event is written into writer", func() { + It("transforms it to kafka message and sends it", func() { + channel := make(chan struct{}) + writer := KafkaWriter{} + writer.toBatchMessagesChan = make(chan kafka.Message, 100) + writer.messageBuffer = NewFifoBuffer[kafka.Message]() + writer.Writer = &kafka.Writer{} + writer.Topic = "testtopic" + writer.runningWorkers = sync.WaitGroup{} + writer.batchingLoopDoneCh = make(chan struct{}, 1) + + writer.writeFunction = func(messages []kafka.Message, _ *monitoring.Metric) { + Expect(len(messages)).To(Equal(1)) + event := &pb.Event{} + proto.Unmarshal(messages[0].Value, event) + Expect(event.GetCoreStartEvent().FrameworkId).To(Equal("FrameworkId")) + channel <- struct{}{} + } + + go writer.writingLoop() + go writer.batchingLoop() + + event := &pb.Ev_MetaEvent_CoreStart{FrameworkId: "FrameworkId"} + + writer.WriteEvent(event) + <-channel + writer.Close() + }) + }) +}) diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index be788f7d..682f0bb8 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -36,7 +36,7 @@ func initChannels(messageBufferSize int) { metricsRequestedChannel = make(chan struct{}) // 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if // multiple goroutines want to send metrics without blocking each other - metricsChannel = make(chan Metric, 100) + metricsChannel = make(chan Metric, 10000) metricsExportedToRequest = make(chan []Metric) metricsLimit = messageBufferSize }