diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index 8886f649..0c202e2b 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -67,6 +67,12 @@ const ( DCS_GENERAL_OP_TIMEOUT = 45 * time.Second DCS_TIME_FORMAT = "2006-01-02 15:04:05.000" TOPIC = topic.IntegratedService + topic.Separator + "dcs" + + DCS_RESULT_OK = "ok" + DCS_RESULT_TIMEOUT = "timeout" + DCS_RESULT_GRPC_TIMEOUT = "gRPC_timeout" + DCS_RESULT_GRPC_UNKNOWN = "gRPC_unknown" + DCS_RESULT_GRPC_ERROR = "gRPC_error" ) type Plugin struct { @@ -1469,6 +1475,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * ) (error, []byte) { metric := newMetric(runType, envId, "EOR") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + eor := "EOR" + durationsPerDetector := map[dcspb.Detector]time.Duration{} + start := time.Now() + + totalCallDurationMetric := newMetric(runType, envId, eor) + totalCallDurationMetric.AddTag("detector", "All") + defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() var dcsEvent *dcspb.RunEvent var err error @@ -1488,6 +1501,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) break } dcsEvent, err = stream.Recv() @@ -1495,6 +1509,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * log.Debug("DCS EOR event stream was closed from the DCS side (EOF)") err = nil + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) break // no more data } if errors.Is(err, context.DeadlineExceeded) { @@ -1514,6 +1529,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) break } if err != nil { // stream termination in case of unknown or gRPC error @@ -1535,6 +1551,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) } else if got == codes.Unknown { // unknown error, likely not a gRPC code logMsg := "bad DCS EOR event received, any future DCS events are ignored" @@ -1551,6 +1568,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: logMsg, }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) } else { // some other gRPC error code log.WithError(err). Debug("DCS EOR call error") @@ -1566,6 +1584,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) } break @@ -1581,6 +1600,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * } detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + durationsPerDetector[dcsEvent.GetDetector()] = time.Since(start) ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE { @@ -1692,6 +1712,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * }) } } + + convertAndSendDetectorDurationsAndStates(eor, detectorStatusMap, durationsPerDetector, envId, runType, &totalCallDurationMetric) + return err, payloadJsonForKafka } @@ -1699,8 +1722,13 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, callFailedStr string, payload map[string]interface{}, runType string, ) (error, []byte) { - metric := newMetric(runType, envId, "SOR") - defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + sor := "SOR" + detectorDurations := map[dcspb.Detector]time.Duration{} + start := time.Now() + + totalCallDurationMetric := newMetric(runType, envId, sor) + totalCallDurationMetric.AddTag("detector", "All") + defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() var dcsEvent *dcspb.RunEvent var err error @@ -1720,6 +1748,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) break } dcsEvent, err = stream.Recv() @@ -1727,6 +1756,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * log.Debug("DCS SOR event stream was closed from the DCS side (EOF)") err = nil + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) break // no more data } if errors.Is(err, context.DeadlineExceeded) { @@ -1746,6 +1776,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) break } if err != nil { // stream termination in case of unknown or gRPC error @@ -1768,6 +1799,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) } else if got == codes.Unknown { // unknown error, likely not a gRPC code logMsg := "bad DCS SOR event received, any future DCS events are ignored" log.WithError(err). @@ -1783,6 +1815,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: logMsg, }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) } else { // some other gRPC error code log.WithError(err). Debug("DCS SOR call error") @@ -1798,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) } break @@ -1813,6 +1847,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * } detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + detectorDurations[dcsEvent.GetDetector()] = time.Since(start) ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { @@ -1961,15 +1996,45 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * }) } } + + convertAndSendDetectorDurationsAndStates(sor, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric) + return err, payloadJsonForKafka } +func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, detectorDurations map[dcspb.Detector]time.Duration, envId, runType string, totalCallMetric *monitoring.Metric) { + resultsMap := make(map[dcspb.DetectorState]int) + for dcsDet, state := range detectorStatusMap { + metric := newMetric(runType, envId, method) + det := dcsToEcsDetector(dcsDet) + metric.AddTag("detector", det) + metric.AddTag("state", dcspb.DetectorState_name[int32(state)]) + resultsMap[state] += 1 + if duration, ok := detectorDurations[dcsDet]; ok { + metric.SetFieldInt64("execution_time_ms", duration.Milliseconds()) + monitoring.Send(&metric) + } + } + for detectorState, detectorCount := range resultsMap { + totalCallMetric.SetFieldInt64(dcspb.DetectorState_name[int32(detectorState)], int64(detectorCount)) + } +} + +func addFunctionResult(metric *monitoring.Metric, result string) { + metric.AddTag("result", result) +} + func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, callFailedStr string, payload map[string]interface{}, runType string, ) (error, []byte) { - metric := newMetric(runType, envId, "PFR") - defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + pfr := "PFR" + detectorDurations := map[dcspb.Detector]time.Duration{} + start := time.Now() + + totalCallDurationMetric := newMetric(runType, envId, pfr) + totalCallDurationMetric.AddTag("detector", "All") + defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() var err error var dcsEvent *dcspb.RunEvent @@ -1989,6 +2054,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) break } dcsEvent, err = stream.Recv() @@ -1996,6 +2062,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * log.Debug("DCS PFR event stream was closed from the DCS side (EOF)") err = nil + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) break // no more data } if errors.Is(err, context.DeadlineExceeded) { @@ -2015,6 +2082,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) break } if err != nil { // stream termination in case of unknown or gRPC error @@ -2036,6 +2104,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) } else if got == codes.Unknown { // unknown error, likely not a gRPC code logMsg := "bad DCS PFR event received, any future DCS events are ignored" log.WithError(err). @@ -2052,6 +2121,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: logMsg, }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) } else { // some other gRPC error code log.WithError(err). Error("DCS PFR call error") @@ -2067,6 +2137,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * Payload: string(payloadJsonForKafka[:]), Error: err.Error(), }) + addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) } break @@ -2083,6 +2154,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * } detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + detectorDurations[dcsEvent.GetDetector()] = time.Since(start) ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { @@ -2230,6 +2302,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * }) } } + + convertAndSendDetectorDurationsAndStates(pfr, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric) + return err, payloadJsonForKafka }