Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 38 additions & 35 deletions common/event/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ import (
)

var (
log = logger.New(logrus.StandardLogger(), "event")
KAFKAWRITER = "kafka_writer"
KAFKAPREPARE = "kafka_prepare"
log = logger.New(logrus.StandardLogger(), "event")
KAFKAWRITER = "kafka_writer"
KAFKAPREPARE = "kafka_prepare"
writerBatchsize = 100
)

type Writer interface {
Expand Down Expand Up @@ -76,7 +77,7 @@ type KafkaWriter struct {
toBatchMessagesChan chan kafka.Message
messageBuffer FifoBuffer[kafka.Message]
// NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster
writeFunction func([]kafka.Message, *monitoring.Metric)
writeFunction func([]kafka.Message)
runningWorkers sync.WaitGroup
batchingLoopDoneCh chan struct{}
}
Expand All @@ -94,19 +95,30 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
Topic: string(topic),
Balancer: &kafka.Hash{},
AllowAutoTopicCreation: true,
BatchTimeout: time.Millisecond, // we are batching the messages ourselves and writing sync into writer, so long timeout is not necessary
BatchSize: writerBatchsize,
},
toBatchMessagesChan: make(chan kafka.Message, 10000),
toBatchMessagesChan: make(chan kafka.Message, 100000),
messageBuffer: NewFifoBuffer[kafka.Message](),
runningWorkers: sync.WaitGroup{},
batchingLoopDoneCh: make(chan struct{}, 1),
}

writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
defer monitoring.TimerNS(metric)()
writer.writeFunction = func(messages []kafka.Message) {
metric := writer.newMetric(KAFKAWRITER)
metric.SetFieldUInt64("messages_sent", uint64(len(messages)))
metric.SetFieldUInt64("messages_failed", 0)

metricDuration := writer.newMetric(KAFKAWRITER)
defer monitoring.SendHistogrammable(&metricDuration)
defer monitoring.TimerNS(&metricDuration)()

if err := writer.WriteMessages(context.Background(), messages...); err != nil {
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
}

monitoring.Send(&metric)
}

go writer.writingLoop()
Expand Down Expand Up @@ -138,18 +150,11 @@ func (w *KafkaWriter) writingLoop() {
w.runningWorkers.Done()
return
default:
messagesToSend := w.messageBuffer.PopMultiple(100)
messagesToSend := w.messageBuffer.PopMultiple(uint(writerBatchsize))
if len(messagesToSend) == 0 {
continue
}

metric := w.newMetric(KAFKAWRITER)
metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend)))
metric.SetFieldUInt64("messages_failed", 0)

w.writeFunction(messagesToSend, &metric)

monitoring.Send(&metric)
w.writeFunction(messagesToSend)
}
}
}
Expand Down Expand Up @@ -241,25 +246,23 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time

metric := w.newMetric(KAFKAPREPARE)

func() {
defer monitoring.TimerNS(&metric)()
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Errorf("Failed to convert event to kafka event: %s", err.Error())
return
}
defer monitoring.SendHistogrammable(&metric)
defer monitoring.TimerNS(&metric)()

message, err := kafkaEventToKafkaMessage(wrappedEvent, key)
if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Errorf("Failed to convert kafka event to message: %s", err.Error())
return
}
w.toBatchMessagesChan <- message
}()
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Errorf("Failed to convert event to kafka event: %s", err.Error())
return
}

monitoring.Send(&metric)
message, err := kafkaEventToKafkaMessage(wrappedEvent, key)
if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Errorf("Failed to convert kafka event to message: %s", err.Error())
return
}
w.toBatchMessagesChan <- message
}
3 changes: 1 addition & 2 deletions common/event/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package event
import (
"sync"

"github.com/AliceO2Group/Control/common/monitoring"
pb "github.com/AliceO2Group/Control/common/protos"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -47,7 +46,7 @@ var _ = Describe("Writer", func() {
writer.runningWorkers = sync.WaitGroup{}
writer.batchingLoopDoneCh = make(chan struct{}, 1)

writer.writeFunction = func(messages []kafka.Message, _ *monitoring.Metric) {
writer.writeFunction = func(messages []kafka.Message) {
Expect(len(messages)).To(Equal(1))
event := &pb.Event{}
proto.Unmarshal(messages[0].Value, event)
Expand Down