From 03053e92b88777081c00d3afb45a4a12b8914fc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 28 Aug 2025 18:13:42 +0200 Subject: [PATCH] [monitoring] OCTRL-1042: Add run type to metrics calls --- common/monitoring/grpcinterceptor.go | 25 +++++++++++++++++++ core/environment/environment.go | 1 + core/integration/ccdb/plugin.go | 8 +++--- core/integration/dcs/plugin.go | 37 ++++++++++++++++------------ core/integration/ddsched/plugin.go | 20 ++++++--------- core/integration/odc/plugin.go | 24 +++++++++--------- core/integration/plugin.go | 28 ++++++++++++++++++++- core/integration/trg/plugin.go | 25 ++++++++++--------- core/workflow/callable/call.go | 17 ++++++++++++- 9 files changed, 128 insertions(+), 57 deletions(-) diff --git a/common/monitoring/grpcinterceptor.go b/common/monitoring/grpcinterceptor.go index d23b1e3e..bc4d36ca 100644 --- a/common/monitoring/grpcinterceptor.go +++ b/common/monitoring/grpcinterceptor.go @@ -30,8 +30,20 @@ import ( "google.golang.org/grpc" ) +type ( + EnvIDKey struct{} + RunTypeKey struct{} +) + +func AddEnvAndRunType(ctx context.Context, envId, runType string) context.Context { + ctx = context.WithValue(ctx, EnvIDKey{}, envId) + ctx = context.WithValue(ctx, RunTypeKey{}, runType) + return ctx +} + type measuredClientStream struct { grpc.ClientStream + ctx context.Context method string metricName string } @@ -39,6 +51,12 @@ type measuredClientStream struct { func (t *measuredClientStream) RecvMsg(m interface{}) error { metric := NewMetric(t.metricName) metric.AddTag("method", t.method) + if env, ok := t.ctx.Value(EnvIDKey{}).(string); ok { + metric.AddTag("envId", env) + } + if rt, ok := t.ctx.Value(RunTypeKey{}).(string); ok { + metric.AddTag("runtype", rt) + } defer TimerSendSingle(&metric, Millisecond)() err := t.ClientStream.RecvMsg(m) @@ -63,6 +81,7 @@ func SetupStreamClientInterceptor(metricName string, convert NameConvertType) gr return &measuredClientStream{ ClientStream: clientStream, + ctx: ctx, method: convert(method), metricName: metricName, }, nil @@ -73,6 +92,12 @@ func SetupUnaryClientInterceptor(name string, convert NameConvertType) grpc.Unar return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { metric := NewMetric(name) metric.AddTag("method", convert(method)) + if env, ok := ctx.Value(EnvIDKey{}).(string); ok { + metric.AddTag("envId", env) + } + if rt, ok := ctx.Value(RunTypeKey{}).(string); ok { + metric.AddTag("runtype", rt) + } defer TimerSendSingle(&metric, Millisecond)() return invoker(ctx, method, req, reply, cc, opts...) } diff --git a/core/environment/environment.go b/core/environment/environment.go index 50bf356c..3c8bfe99 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -669,6 +669,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig metric := monitoring.NewMetric("hooks") metric.AddTag("trigger", trigger) metric.AddTag("envId", env.id.String()) + metric.AddTag("runtype", env.GetRunType().String()) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() allWeightsSet := make(callable.HooksMap) diff --git a/core/integration/ccdb/plugin.go b/core/integration/ccdb/plugin.go index b420e3be..763fd488 100644 --- a/core/integration/ccdb/plugin.go +++ b/core/integration/ccdb/plugin.go @@ -481,7 +481,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } p.existingRuns[grp.runNumber] = types.Nil{} - err := p.uploadCurrentGRP(grp, envId, true) + err := p.uploadCurrentGRP(grp, envId, true, varStack, "RunStart") if err != nil { log.WithField("call", "RunStop"). WithField("run", grp.runNumber). @@ -506,7 +506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { _, runExists := p.existingRuns[grp.runNumber] if runExists { delete(p.existingRuns, grp.runNumber) - err := p.uploadCurrentGRP(grp, envId, false) + err := p.uploadCurrentGRP(grp, envId, false, varStack, "RunStop") if err != nil { log.WithField("call", "RunStop"). WithField("run", grp.runNumber). @@ -525,7 +525,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } -func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error { +func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool, varStack map[string]string, callName string) error { if grp == nil { return errors.New(fmt.Sprintf("Failed to create a GRP object")) } @@ -550,6 +550,8 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre metric := monitoring.NewMetric("ccdb") metric.AddTag("envId", envId) + metric.AddTag("runtype", integration.ExtractRunTypeOrUndefined(varStack)) + metric.AddTag("call", callName) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr) diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index 2982fcf5..8886f649 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -289,18 +289,18 @@ func (p *Plugin) Init(instanceId string) error { in := &dcspb.SubscriptionRequest{ InstanceId: instanceId, } - + // Always start the goroutine, even if initial subscription fails go func() { var evStream dcspb.Configurator_SubscribeClient var err error - + for { // Try to establish subscription if we don't have one if evStream == nil { log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). Debug("attempting to subscribe to DCS service") - + evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{}) if err != nil { log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")). @@ -699,7 +699,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var stream dcspb.Configurator_StartOfRunClient timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState) @@ -746,7 +746,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload) + err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, + detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack)) dcsFailedEcsDetectors := make([]string, 0) dcsopOk := true @@ -1064,7 +1065,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var stream dcspb.Configurator_StartOfRunClient timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState) @@ -1112,7 +1113,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later - err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload) + err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, + detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack)) dcsFailedEcsDetectors := make([]string, 0) dcsopOk := true @@ -1298,7 +1300,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var stream dcspb.Configurator_EndOfRunClient timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -1356,7 +1358,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE } - err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload) + err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, + detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack)) dcsFailedEcsDetectors := make([]string, 0) dcsopOk := true @@ -1452,17 +1455,19 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } -func newMetric(method string) monitoring.Metric { +func newMetric(runType, envId, method string) monitoring.Metric { metric := monitoring.NewMetric("dcsecs") metric.AddTag("method", method) + metric.AddTag("envId", envId) + metric.AddTag("runtype", runType) return metric } func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, payloadJsonForKafka []byte, stream dcspb.Configurator_EndOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, - callFailedStr string, payload map[string]interface{}, + callFailedStr string, payload map[string]interface{}, runType string, ) (error, []byte) { - metric := newMetric("EOR") + metric := newMetric(runType, envId, "EOR") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() var dcsEvent *dcspb.RunEvent @@ -1692,9 +1697,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, - callFailedStr string, payload map[string]interface{}, + callFailedStr string, payload map[string]interface{}, runType string, ) (error, []byte) { - metric := newMetric("SOR") + metric := newMetric(runType, envId, "SOR") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() var dcsEvent *dcspb.RunEvent @@ -1961,9 +1966,9 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call * func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string, payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, - callFailedStr string, payload map[string]interface{}, + callFailedStr string, payload map[string]interface{}, runType string, ) (error, []byte) { - metric := newMetric("PFR") + metric := newMetric(runType, envId, "PFR") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() var err error diff --git a/core/integration/ddsched/plugin.go b/core/integration/ddsched/plugin.go index 917b1a6e..c6d3e1dd 100644 --- a/core/integration/ddsched/plugin.go +++ b/core/integration/ddsched/plugin.go @@ -40,6 +40,7 @@ import ( "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/environment" @@ -163,6 +164,7 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]map[string]s EnvironmentId: envId.String(), } ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("ddschedStatusTimeout")) + ctx = monitoring.AddEnvAndRunType(ctx, envId.String(), "none") state, err := p.ddSchedClient.PartitionStatus(ctx, &in, grpc.EmptyCallOption{}) cancel() if err != nil { @@ -321,11 +323,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - var ( - response *ddpb.PartitionResponse - ) + var response *ddpb.PartitionResponse timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -574,11 +574,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - var ( - response *ddpb.PartitionResponse - ) + var response *ddpb.PartitionResponse timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -821,16 +819,14 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - var ( - response *ddpb.PartitionResponse - ) + var response *ddpb.PartitionResponse infoReq := ddpb.PartitionInfo{ EnvironmentId: envId, PartitionId: envId, } timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ diff --git a/core/integration/odc/plugin.go b/core/integration/odc/plugin.go index 94c093d7..11a1d9fa 100644 --- a/core/integration/odc/plugin.go +++ b/core/integration/odc/plugin.go @@ -183,7 +183,7 @@ func (p *Plugin) GetConnectionState() string { func (p *Plugin) queryPartitionStatus() { defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient")) - ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT) + ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT) defer cancel() statusRep := &odc.StatusReply{} @@ -238,7 +238,7 @@ func (p *Plugin) queryPartitionStatus() { go func(idx int, partId uid.ID) { defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT) + ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT) defer cancel() odcPartStateRep, err := p.odcClient.GetState(ctx, &odc.StateRequest{ @@ -1179,7 +1179,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{ @@ -1292,7 +1292,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } } - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err := handleConfigure(ctx, p.odcClient, arguments, paddingTimeout, envId, call) if err != nil { @@ -1314,7 +1314,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { callFailedStr := "EPN Reset call failed" - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, call.VarStack, timeout) defer cancel() err := handleReset(ctx, p.odcClient, nil, paddingTimeout, envId, call) if err != nil { @@ -1343,7 +1343,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { callFailedStr := "EPN PartitionTerminate call failed" - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err := handlePartitionTerminate(ctx, p.odcClient, nil, paddingTimeout, envId, call) if err != nil { @@ -1414,7 +1414,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { arguments["original_run_number"] = originalRunNumber } - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err = handleStart(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call) if err != nil { @@ -1462,7 +1462,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call) if err != nil { @@ -1486,7 +1486,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "EnsureStop", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() state, err := handleGetState(ctx, p.odcClient, envId) @@ -1551,7 +1551,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { callFailedStr := "EPN EnsureCleanup call failed" - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, envId, call) if err != nil { @@ -1572,7 +1572,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { callFailedStr := "EPN PreDeploymentCleanup call failed" - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, "", call) if err != nil { @@ -1593,7 +1593,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { callFailedStr := "EPN EnsureCleanupLegacy call failed" - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() err := handleCleanupLegacy(ctx, p.odcClient, nil, paddingTimeout, envId, call) if err != nil { diff --git a/core/integration/plugin.go b/core/integration/plugin.go index 12bd01fb..48e87626 100644 --- a/core/integration/plugin.go +++ b/core/integration/plugin.go @@ -27,9 +27,12 @@ package integration import ( + "context" "sync" + "time" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -121,7 +124,7 @@ func (p Plugins) CallStack(data interface{}) (stack map[string]interface{}) { func (p Plugins) ObjectStack(varStack map[string]string, baseConfigStack map[string]string) (stack map[string]interface{}) { stack = make(map[string]interface{}) - //HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role + // HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role stack["odc"] = map[string]interface{}{ "GenerateEPNTopologyFullname": func() string { return "" @@ -221,3 +224,26 @@ func Reset() { loaderOnce = sync.Once{} pluginLoaders = make(map[string]func() Plugin) } + +func ExtractRunTypeOrUndefined(varStack map[string]string) string { + runType, ok := varStack["run_type"] + if !ok { + runType = "undefined" + } + return runType +} + +func NewContext(envId string, varStack map[string]string, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout( + monitoring.AddEnvAndRunType(context.Background(), + envId, + ExtractRunTypeOrUndefined(varStack), + ), + timeout) +} + +func NewContextEmptyEnvIdRunType(timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout( + monitoring.AddEnvAndRunType(context.Background(), "none", "none"), + timeout) +} diff --git a/core/integration/trg/plugin.go b/core/integration/trg/plugin.go index 2ebdc84d..556706f9 100644 --- a/core/integration/trg/plugin.go +++ b/core/integration/trg/plugin.go @@ -40,6 +40,7 @@ import ( "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" + "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/environment" @@ -124,7 +125,7 @@ func (p *Plugin) GetConnectionState() string { } func (p *Plugin) queryRunList() { - ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgPollingTimeout")) + ctx, cancel := integration.NewContextEmptyEnvIdRunType(viper.GetDuration("trgPollingTimeout")) defer cancel() runReply, err := p.trgClient.RunList(ctx, &trgpb.Empty{}, grpc.EmptyCallOption{}) @@ -324,7 +325,7 @@ func (p *Plugin) reconcile() { } }*/ - ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout")) + ctx, cancel := integration.NewContextEmptyEnvIdRunType(viper.GetDuration("trgReconciliationTimeout")) _, err := p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{}) cancel() @@ -349,7 +350,8 @@ func (p *Plugin) reconcile() { } } if trgRun.State == CTP_LOADED && trgRun.Cardinality == CTP_GLOBAL { - ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout")) + ctx, cancel := integration.NewContextEmptyEnvIdRunType(viper.GetDuration("trgReconciliationTimeout")) + monitoring.AddEnvAndRunType(ctx, "none", "none") _, err := p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{}) cancel() if err != nil { @@ -371,7 +373,6 @@ func (p *Plugin) reconcile() { } } p.cachedStatusMu.RUnlock() - } func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) { @@ -464,7 +465,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_PFR_TIMEOUT, varStack, "PrepareForRun", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -665,7 +666,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_LOAD_TIMEOUT, varStack, "RunLoad", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -843,7 +844,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_START_TIMEOUT, varStack, "RunStart", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() payload := map[string]interface{}{ @@ -1259,7 +1260,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Info("ALIECS EOR operation : performing TRG Run Stop ") timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "RunStop", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() return runStopFunc(ctx, runNumber64) @@ -1280,7 +1281,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Info("ALIECS EOR operation : performing TRG Run Unload ") timeout := callable.AcquireTimeout(TRG_UNLOAD_TIMEOUT, varStack, "RunUnload", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() return runUnloadFunc(ctx, runNumber64) @@ -1296,7 +1297,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_CLEANUP_TIMEOUT, varStack, "Cleanup", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() // runStop if found pending @@ -1354,7 +1355,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunStop", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() // runStop if found pending @@ -1396,7 +1397,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunUnload", envId) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := integration.NewContext(envId, varStack, timeout) defer cancel() // runUnload if found pending diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index f0a04e47..502688c1 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -47,7 +47,10 @@ import ( "github.com/sirupsen/logrus" ) -var log = logger.New(logrus.StandardLogger(), "callable") +var ( + log = logger.New(logrus.StandardLogger(), "callable") + none = "none" +) type Call struct { Func string @@ -226,6 +229,7 @@ func (c *Call) Call() error { func (c *Call) newMetric(name string) monitoring.Metric { metric := monitoring.NewMetric(name) + metric.AddTag("runtype", c.getRunTypeTag()) metric.AddTag("name", c.GetName()) metric.AddTag("trigger", c.GetTraits().Trigger) metric.AddTag("envId", c.parentRole.GetEnvironmentId().String()) @@ -266,6 +270,17 @@ func (c *Call) Cancel() bool { return false } +func (c *Call) getRunTypeTag() string { + varStack, err := c.parentRole.ConsolidatedVarStack() + if err != nil { + return none + } + if rt, ok := varStack["run_type"]; ok { + return rt + } + return none +} + func (c *Call) GetParentRole() interface{} { return c.parentRole }