-
Notifications
You must be signed in to change notification settings - Fork 23
OCTRL-1041: Report DCS call duration per detector not only global #749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,12 @@ const ( | |
| DCS_GENERAL_OP_TIMEOUT = 45 * time.Second | ||
| DCS_TIME_FORMAT = "2006-01-02 15:04:05.000" | ||
| TOPIC = topic.IntegratedService + topic.Separator + "dcs" | ||
|
|
||
| DCS_RESULT_OK = "ok" | ||
| DCS_RESULT_TIMEOUT = "timeout" | ||
| DCS_RESULT_GRPC_TIMEOUT = "gRPC_timeout" | ||
| DCS_RESULT_GRPC_UNKNOWN = "gRPC_unknown" | ||
| DCS_RESULT_GRPC_ERROR = "gRPC_error" | ||
| ) | ||
|
|
||
| type Plugin struct { | ||
|
|
@@ -1469,6 +1475,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| ) (error, []byte) { | ||
| metric := newMetric(runType, envId, "EOR") | ||
| defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() | ||
| eor := "EOR" | ||
| durationsPerDetector := map[dcspb.Detector]time.Duration{} | ||
| start := time.Now() | ||
|
|
||
| totalCallDurationMetric := newMetric(runType, envId, eor) | ||
| totalCallDurationMetric.AddTag("detector", "All") | ||
| defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() | ||
|
|
||
| var dcsEvent *dcspb.RunEvent | ||
| var err error | ||
|
|
@@ -1488,13 +1501,15 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) | ||
| break | ||
| } | ||
| dcsEvent, err = stream.Recv() | ||
| if errors.Is(err, io.EOF) { // correct stream termination | ||
| log.Debug("DCS EOR event stream was closed from the DCS side (EOF)") | ||
| err = nil | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) | ||
| break // no more data | ||
| } | ||
| if errors.Is(err, context.DeadlineExceeded) { | ||
|
|
@@ -1514,6 +1529,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) | ||
| break | ||
| } | ||
| if err != nil { // stream termination in case of unknown or gRPC error | ||
|
|
@@ -1535,6 +1551,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: err.Error(), | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) | ||
|
|
||
| } else if got == codes.Unknown { // unknown error, likely not a gRPC code | ||
| logMsg := "bad DCS EOR event received, any future DCS events are ignored" | ||
|
|
@@ -1551,6 +1568,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: logMsg, | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) | ||
| } else { // some other gRPC error code | ||
| log.WithError(err). | ||
| Debug("DCS EOR call error") | ||
|
|
@@ -1566,6 +1584,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: err.Error(), | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) | ||
| } | ||
|
|
||
| break | ||
|
|
@@ -1581,6 +1600,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| } | ||
|
|
||
| detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() | ||
| durationsPerDetector[dcsEvent.GetDetector()] = time.Since(start) | ||
| ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) | ||
|
|
||
| if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE { | ||
|
|
@@ -1692,15 +1712,23 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| }) | ||
| } | ||
| } | ||
|
|
||
| convertAndSendDetectorDurationsAndStates(eor, detectorStatusMap, durationsPerDetector, envId, runType, &totalCallDurationMetric) | ||
|
|
||
| return err, payloadJsonForKafka | ||
| } | ||
|
|
||
| func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, | ||
| payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, | ||
| callFailedStr string, payload map[string]interface{}, runType string, | ||
| ) (error, []byte) { | ||
| metric := newMetric(runType, envId, "SOR") | ||
| defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() | ||
| sor := "SOR" | ||
| detectorDurations := map[dcspb.Detector]time.Duration{} | ||
| start := time.Now() | ||
|
|
||
| totalCallDurationMetric := newMetric(runType, envId, sor) | ||
| totalCallDurationMetric.AddTag("detector", "All") | ||
| defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() | ||
|
|
||
| var dcsEvent *dcspb.RunEvent | ||
| var err error | ||
|
|
@@ -1720,13 +1748,15 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) | ||
| break | ||
| } | ||
| dcsEvent, err = stream.Recv() | ||
| if errors.Is(err, io.EOF) { // correct stream termination | ||
| log.Debug("DCS SOR event stream was closed from the DCS side (EOF)") | ||
| err = nil | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) | ||
| break // no more data | ||
| } | ||
| if errors.Is(err, context.DeadlineExceeded) { | ||
|
|
@@ -1746,6 +1776,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) | ||
| break | ||
| } | ||
| if err != nil { // stream termination in case of unknown or gRPC error | ||
|
|
@@ -1768,6 +1799,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) | ||
| } else if got == codes.Unknown { // unknown error, likely not a gRPC code | ||
| logMsg := "bad DCS SOR event received, any future DCS events are ignored" | ||
| log.WithError(err). | ||
|
|
@@ -1783,6 +1815,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: logMsg, | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) | ||
| } else { // some other gRPC error code | ||
| log.WithError(err). | ||
| Debug("DCS SOR call error") | ||
|
|
@@ -1798,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: err.Error(), | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) | ||
| } | ||
|
|
||
| break | ||
|
|
@@ -1813,6 +1847,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| } | ||
|
|
||
| detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() | ||
| detectorDurations[dcsEvent.GetDetector()] = time.Since(start) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, the duration will be incorrect if the detector times out or there is some weirder error, am I wrong? In such case, the loop will break before reaching this line. Not sure what to propose for the timeout case... after all, some detectors might complete an operation and some not, so we would want to set the timeout value for those who did not complete, but we would have to do some guess work instead of relying on
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are right, I overlooked the break.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we both agree that we are not sure how to fix this, I tried to resolve the problem by adding
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. We can then observe what we see in prod and adapt it if needed. |
||
| ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) | ||
|
|
||
| if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { | ||
|
|
@@ -1961,15 +1996,45 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| }) | ||
| } | ||
| } | ||
|
|
||
| convertAndSendDetectorDurationsAndStates(sor, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric) | ||
|
|
||
| return err, payloadJsonForKafka | ||
| } | ||
|
|
||
| func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, detectorDurations map[dcspb.Detector]time.Duration, envId, runType string, totalCallMetric *monitoring.Metric) { | ||
| resultsMap := make(map[dcspb.DetectorState]int) | ||
| for dcsDet, state := range detectorStatusMap { | ||
| metric := newMetric(runType, envId, method) | ||
| det := dcsToEcsDetector(dcsDet) | ||
| metric.AddTag("detector", det) | ||
| metric.AddTag("state", dcspb.DetectorState_name[int32(state)]) | ||
| resultsMap[state] += 1 | ||
| if duration, ok := detectorDurations[dcsDet]; ok { | ||
| metric.SetFieldInt64("execution_time_ms", duration.Milliseconds()) | ||
| monitoring.Send(&metric) | ||
| } | ||
| } | ||
| for detectorState, detectorCount := range resultsMap { | ||
| totalCallMetric.SetFieldInt64(dcspb.DetectorState_name[int32(detectorState)], int64(detectorCount)) | ||
| } | ||
| } | ||
|
|
||
| func addFunctionResult(metric *monitoring.Metric, result string) { | ||
| metric.AddTag("result", result) | ||
| } | ||
|
|
||
| func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, | ||
| payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, | ||
| callFailedStr string, payload map[string]interface{}, runType string, | ||
| ) (error, []byte) { | ||
| metric := newMetric(runType, envId, "PFR") | ||
| defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() | ||
| pfr := "PFR" | ||
| detectorDurations := map[dcspb.Detector]time.Duration{} | ||
| start := time.Now() | ||
|
|
||
| totalCallDurationMetric := newMetric(runType, envId, pfr) | ||
| totalCallDurationMetric.AddTag("detector", "All") | ||
| defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)() | ||
|
|
||
| var err error | ||
| var dcsEvent *dcspb.RunEvent | ||
|
|
@@ -1989,13 +2054,15 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) | ||
| break | ||
| } | ||
| dcsEvent, err = stream.Recv() | ||
| if errors.Is(err, io.EOF) { // correct stream termination | ||
| log.Debug("DCS PFR event stream was closed from the DCS side (EOF)") | ||
| err = nil | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK) | ||
| break // no more data | ||
| } | ||
| if errors.Is(err, context.DeadlineExceeded) { | ||
|
|
@@ -2015,6 +2082,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Error: err.Error(), | ||
| }) | ||
|
|
||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT) | ||
| break | ||
| } | ||
| if err != nil { // stream termination in case of unknown or gRPC error | ||
|
|
@@ -2036,6 +2104,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: err.Error(), | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT) | ||
| } else if got == codes.Unknown { // unknown error, likely not a gRPC code | ||
| logMsg := "bad DCS PFR event received, any future DCS events are ignored" | ||
| log.WithError(err). | ||
|
|
@@ -2052,6 +2121,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: logMsg, | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN) | ||
| } else { // some other gRPC error code | ||
| log.WithError(err). | ||
| Error("DCS PFR call error") | ||
|
|
@@ -2067,6 +2137,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| Payload: string(payloadJsonForKafka[:]), | ||
| Error: err.Error(), | ||
| }) | ||
| addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR) | ||
| } | ||
|
|
||
| break | ||
|
|
@@ -2083,6 +2154,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| } | ||
|
|
||
| detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() | ||
| detectorDurations[dcsEvent.GetDetector()] = time.Since(start) | ||
| ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) | ||
|
|
||
| if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { | ||
|
|
@@ -2230,6 +2302,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * | |
| }) | ||
| } | ||
| } | ||
|
|
||
| convertAndSendDetectorDurationsAndStates(pfr, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric) | ||
|
|
||
| return err, payloadJsonForKafka | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nitpicking, but perhaps this would be better?
After all, if I wanted to declare e.g. a constant defining a number of FLPs, we would not call it
twoHundredbutnumberOfFLPs.The same comment applies for other calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree generally with you, but I prefer calling it
sor,eor,pfrin this case. It looks more readable when passing it around knowing it's value and inherently it's meaning. The reason is that wile coding you can see hints inside the editors , so I see something likemethod: sormaking it better thanmethod: callNameI also don't think that your example is quite applicable as
twoHundredis really generic, butsoris quite specific thing in our tech stack.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it's a matter of personal preference and I would not judge one as better than other in our case. It's OK.