From 5f84cefdb2096f1762737ead914a94245f24bd8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 2 Apr 2025 16:37:48 +0200 Subject: [PATCH 1/2] [core] remove goroutine in writer.go This goroutine can cause race condition that makes kafka messages in wrong order --- common/event/writer.go | 162 ++++++++++++++++++++--------------------- 1 file changed, 80 insertions(+), 82 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index a6444b10..dd021c89 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -97,91 +97,89 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time return } - go func() { - var ( - err error - wrappedEvent *pb.Event - key []byte = nil - ) - - switch e := e.(type) { - case *pb.Ev_MetaEvent_CoreStart: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, - } - case *pb.Ev_MetaEvent_MesosHeartbeat: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, - } - case *pb.Ev_MetaEvent_FrameworkEvent: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, - } - case *pb.Ev_TaskEvent: - key = []byte(e.Taskid) - if len(key) == 0 { - key = nil - } - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_TaskEvent{TaskEvent: e}, - } - case *pb.Ev_RoleEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RoleEvent{RoleEvent: e}, - } - case *pb.Ev_EnvironmentEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, - } - case *pb.Ev_CallEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CallEvent{CallEvent: e}, - } - case *pb.Ev_IntegratedServiceEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, - } - case *pb.Ev_RunEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RunEvent{RunEvent: e}, - } + var ( + err error + wrappedEvent *pb.Event + key []byte = nil + ) + + switch e := e.(type) { + case *pb.Ev_MetaEvent_CoreStart: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, } - - if wrappedEvent == nil { - err = fmt.Errorf("unsupported event type") - } else { - err = w.doWriteEvent(key, wrappedEvent) + case *pb.Ev_MetaEvent_MesosHeartbeat: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, } - - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Error(err.Error()) + case *pb.Ev_MetaEvent_FrameworkEvent: + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, + } + case *pb.Ev_TaskEvent: + key = []byte(e.Taskid) + if len(key) == 0 { + key = nil + } + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_TaskEvent{TaskEvent: e}, + } + case *pb.Ev_RoleEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_RoleEvent{RoleEvent: e}, + } + case *pb.Ev_EnvironmentEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, + } + case *pb.Ev_CallEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_CallEvent{CallEvent: e}, + } + case *pb.Ev_IntegratedServiceEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, } - }() + case *pb.Ev_RunEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_RunEvent{RunEvent: e}, + } + } + + if wrappedEvent == nil { + err = fmt.Errorf("unsupported event type") + } else { + err = w.doWriteEvent(key, wrappedEvent) + } + + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Error(err.Error()) + } } func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { From fdad4c2b53224bca2720945e7593a0059f343ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 2 Apr 2025 16:56:27 +0200 Subject: [PATCH 2/2] [core] refactored WriteEventWithTimestamp --- common/event/writer.go | 57 +++++++++++------------------------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index dd021c89..d6477f59 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -99,53 +99,32 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time var ( err error - wrappedEvent *pb.Event - key []byte = nil + wrappedEvent *pb.Event = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + } + key []byte = nil ) switch e := e.(type) { case *pb.Ev_MetaEvent_CoreStart: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, - } + wrappedEvent.Payload = &pb.Event_CoreStartEvent{CoreStartEvent: e} case *pb.Ev_MetaEvent_MesosHeartbeat: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, - } + wrappedEvent.Payload = &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e} case *pb.Ev_MetaEvent_FrameworkEvent: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, - } + wrappedEvent.Payload = &pb.Event_FrameworkEvent{FrameworkEvent: e} case *pb.Ev_TaskEvent: key = []byte(e.Taskid) if len(key) == 0 { key = nil } - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_TaskEvent{TaskEvent: e}, - } + wrappedEvent.Payload = &pb.Event_TaskEvent{TaskEvent: e} case *pb.Ev_RoleEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RoleEvent{RoleEvent: e}, - } + wrappedEvent.Payload = &pb.Event_RoleEvent{RoleEvent: e} case *pb.Ev_EnvironmentEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, - } + wrappedEvent.Payload = &pb.Event_EnvironmentEvent{EnvironmentEvent: e} case *pb.Ev_CallEvent: key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ @@ -155,18 +134,12 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time } case *pb.Ev_IntegratedServiceEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, - } + wrappedEvent.Payload = &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e} case *pb.Ev_RunEvent: key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RunEvent{RunEvent: e}, - } + wrappedEvent.Payload = &pb.Event_RunEvent{RunEvent: e} + default: + wrappedEvent = nil } if wrappedEvent == nil {