diff --git a/common/event/writer.go b/common/event/writer.go index a6444b10..d6477f59 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -97,91 +97,62 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time return } - go func() { - var ( - err error - wrappedEvent *pb.Event - key []byte = nil - ) - - switch e := e.(type) { - case *pb.Ev_MetaEvent_CoreStart: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e}, - } - case *pb.Ev_MetaEvent_MesosHeartbeat: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}, - } - case *pb.Ev_MetaEvent_FrameworkEvent: - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, - } - case *pb.Ev_TaskEvent: - key = []byte(e.Taskid) - if len(key) == 0 { - key = nil - } - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_TaskEvent{TaskEvent: e}, - } - case *pb.Ev_RoleEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RoleEvent{RoleEvent: e}, - } - case *pb.Ev_EnvironmentEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, - } - case *pb.Ev_CallEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_CallEvent{CallEvent: e}, - } - case *pb.Ev_IntegratedServiceEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, - } - case *pb.Ev_RunEvent: - key = extractAndConvertEnvID(e) - wrappedEvent = &pb.Event{ - Timestamp: timestamp.UnixMilli(), - TimestampNano: timestamp.UnixNano(), - Payload: &pb.Event_RunEvent{RunEvent: e}, - } + var ( + err error + wrappedEvent *pb.Event = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), } - - if wrappedEvent == nil { - err = fmt.Errorf("unsupported event type") - } else { - err = w.doWriteEvent(key, wrappedEvent) + key []byte = nil + ) + + switch e := e.(type) { + case *pb.Ev_MetaEvent_CoreStart: + wrappedEvent.Payload = &pb.Event_CoreStartEvent{CoreStartEvent: e} + case *pb.Ev_MetaEvent_MesosHeartbeat: + wrappedEvent.Payload = &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e} + case *pb.Ev_MetaEvent_FrameworkEvent: + wrappedEvent.Payload = &pb.Event_FrameworkEvent{FrameworkEvent: e} + case *pb.Ev_TaskEvent: + key = []byte(e.Taskid) + if len(key) == 0 { + key = nil } - - if err != nil { - log.WithField("event", e). - WithField("level", infologger.IL_Support). - Error(err.Error()) + wrappedEvent.Payload = &pb.Event_TaskEvent{TaskEvent: e} + case *pb.Ev_RoleEvent: + key = extractAndConvertEnvID(e) + wrappedEvent.Payload = &pb.Event_RoleEvent{RoleEvent: e} + case *pb.Ev_EnvironmentEvent: + key = extractAndConvertEnvID(e) + wrappedEvent.Payload = &pb.Event_EnvironmentEvent{EnvironmentEvent: e} + case *pb.Ev_CallEvent: + key = extractAndConvertEnvID(e) + wrappedEvent = &pb.Event{ + Timestamp: timestamp.UnixMilli(), + TimestampNano: timestamp.UnixNano(), + Payload: &pb.Event_CallEvent{CallEvent: e}, } - }() + case *pb.Ev_IntegratedServiceEvent: + key = extractAndConvertEnvID(e) + wrappedEvent.Payload = &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e} + case *pb.Ev_RunEvent: + key = extractAndConvertEnvID(e) + wrappedEvent.Payload = &pb.Event_RunEvent{RunEvent: e} + default: + wrappedEvent = nil + } + + if wrappedEvent == nil { + err = fmt.Errorf("unsupported event type") + } else { + err = w.doWriteEvent(key, wrappedEvent) + } + + if err != nil { + log.WithField("event", e). + WithField("level", infologger.IL_Support). + Error(err.Error()) + } } func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {