From 5f84cefdb2096f1762737ead914a94245f24bd8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 2 Apr 2025 16:37:48 +0200 Subject: [PATCH 1/2] [core] remove goroutine in writer.go This goroutine can cause race condition that makes kafka messages in wrong order --- common/event/writer.go | 162 ++++++++++++++++++++--------------------- 1 file changed, 80 insertions(+), 82 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index a6444b10..dd021c89 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -97,91 +97,89 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time return } - go func() { - var ( - err error - wrappedEvent *pb.Event - key []byte = nil - ) - - switch e := e.(type) { - case *pb.Ev_MetaEvent_CoreStart: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, - } - case *pb.Ev_MetaEvent_MesosHeartbeat: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, - } - case *pb.Ev_MetaEvent_FrameworkEvent: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, - } - case *pb.Ev_TaskEvent: - key = []byte(e.Taskid) - if len(key) == 0 { - key = nil - } - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_TaskEvent{TaskEvent: e}, - } - case *pb.Ev_RoleEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RoleEvent{RoleEvent: e}, - } - case *pb.Ev_EnvironmentEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, - } - case *pb.Ev_CallEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CallEvent{CallEvent: e}, - } - case *pb.Ev_IntegratedServiceEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, - } - case *pb.Ev_RunEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RunEvent{RunEvent: e}, - } + var ( + err error + wrappedEvent *pb.Event + key []byte = nil + ) + + switch e := e.(type) { + case *pb.Ev_MetaEvent_CoreStart: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, } - - if wrappedEvent == nil { - err = fmt.Errorf("unsupported event type") - } else { - err = w.doWriteEvent(key, wrappedEvent) + case *pb.Ev_MetaEvent_MesosHeartbeat: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, } - - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Error(err.Error()) + case *pb.Ev_MetaEvent_FrameworkEvent: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, + } + case *pb.Ev_TaskEvent: + key = []byte(e.Taskid) + if len(key) == 0 { + key = nil + } + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_TaskEvent{TaskEvent: e}, + } + case *pb.Ev_RoleEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_RoleEvent{RoleEvent: e}, + } + case *pb.Ev_EnvironmentEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, + } + case *pb.Ev_CallEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_CallEvent{CallEvent: e}, + } + case *pb.Ev_IntegratedServiceEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, } - }() + case *pb.Ev_RunEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_RunEvent{RunEvent: e}, + } + } + + if wrappedEvent == nil { + err = fmt.Errorf("unsupported event type") + } else { + err = w.doWriteEvent(key, wrappedEvent) + } + + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Error(err.Error()) + } } func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { From b24aa48037ff86ebfad0525998d5d1fa86f349c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 3 Apr 2025 10:52:37 +0200 Subject: [PATCH 2/2] [core] async kafka writing with error reporting --- common/event/writer.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index dd021c89..134a28d9 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -55,22 +55,28 @@ func (*DummyWriter) Close() {} type KafkaWriter struct { *kafka.Writer + toWriteChan chan kafka.Message } func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { - return &KafkaWriter{ + writer := &KafkaWriter{ Writer: &kafka.Writer{ Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...), Topic: string(topic), Balancer: &kafka.Hash{}, AllowAutoTopicCreation: true, }, + toWriteChan: make(chan kafka.Message, 1000), } + + go writer.writingLoop() + return writer } func (w *KafkaWriter) Close() { if w != nil { - w.Close() + close(w.toWriteChan) + w.Writer.Close() } } @@ -80,6 +86,16 @@ func (w *KafkaWriter) WriteEvent(e interface{}) { } } +// TODO: we can optimise this to write multiple message at once +func (w *KafkaWriter) writingLoop() { + for message := range w.toWriteChan { + err := w.WriteMessages(context.Background(), message) + if err != nil { + log.Errorf("failed to write async kafka message: %w", err) + } + } +} + type HasEnvID interface { GetEnvironmentId() string } @@ -200,9 +216,11 @@ func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { message.Key = key } - err = w.WriteMessages(context.Background(), message) - if err != nil { - return fmt.Errorf("failed to write event: %w", err) + select { + case w.toWriteChan <- message: + default: + log.Warnf("Writer of kafka topic [%s] cannot write because channel is full, discarding a message", w.Writer.Topic) } + return nil }