From debaae988b3da14333494680e80ea35b85bc2b4d Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 18 Sep 2025 16:39:40 +0200 Subject: [PATCH] OCTRL-678 Pass the error message to BKP in case of GO_ERROR An error message is now propagated to kafka events just before scheduling a GO_ERROR transition. --- core/environment/environment.go | 19 +++++++++++++++++ core/environment/manager.go | 12 +++++++++++ core/environment/utils.go | 38 +++++++++++++++++++++++++++++++++ core/server.go | 4 ++++ core/task/task.go | 7 ++++++ 5 files changed, 80 insertions(+) diff --git a/core/environment/environment.go b/core/environment/environment.go index 3c8bfe99..824f5168 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -1176,6 +1176,15 @@ func (env *Environment) QueryRoles(pathSpec string) (rs []workflow.Role) { return } +func (env *Environment) GetId() uid.ID { + if env == nil { + return "" + } + env.Mu.RLock() + defer env.Mu.RUnlock() + return env.id +} + func (env *Environment) GetPath() string { return "" } @@ -1222,7 +1231,12 @@ func (env *Environment) subscribeToWfState(taskman *task.Manager) { log.WithField("partition", env.id). WithField("level", infologger.IL_Ops). Error("one of the critical tasks went into ERROR state, transitioning the environment into ERROR") + + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, newCriticalTasksErrorMessage(env)), + ) err := env.TryTransition(NewGoErrorTransition(taskman)) + if err != nil { if env.Sm.Current() == "ERROR" { log.WithField("partition", env.id). @@ -1471,6 +1485,11 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t log.WithField("partition", env.id). WithField("run", env.currentRunNumber). Errorf("Scheduled auto stop transition failed: %s, Transitioning into ERROR", err.Error()) + + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, fmt.Sprintf("scheduled auto stop transition failed: %s", err.Error())), + ) + err = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman)) if err != nil { log.WithField("partition", env.id). diff --git a/core/environment/manager.go b/core/environment/manager.go index d6a81b01..5e7588f3 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -488,6 +488,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string] WithError(err). Warnf("auto-transitioning environment failed %s, cleanup in progress", op) + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)), + ) err := env.TryTransition(NewGoErrorTransition( envs.taskman), ) @@ -592,6 +595,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string] WithField("level", infologger.IL_Devel). Error("environment deployment and configuration error, cleanup in progress") + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, fmt.Sprintf("deployment or configuration failed: %v", err)), + ) errTxErr := env.TryTransition(NewGoErrorTransition( envs.taskman), ) @@ -1052,6 +1058,9 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve } if env.CurrentState() != "ERROR" { + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, "ODC partition went to ERROR during RUNNING"), + ) err = env.TryTransition(NewGoErrorTransition(envs.taskman)) if err != nil { log.WithPrefix("scheduler"). @@ -1376,6 +1385,9 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str WithError(err). Warnf("auto-transitioning environment failed %s, cleanup in progress", op) + the.EventWriterWithTopic(topic.Environment).WriteEvent( + NewEnvGoErrorEvent(env, fmt.Sprintf("%s failed: %v", op, err)), + ) err := env.TryTransition(NewGoErrorTransition( envs.taskman), ) diff --git a/core/environment/utils.go b/core/environment/utils.go index 7cb236b7..229fde3e 100644 --- a/core/environment/utils.go +++ b/core/environment/utils.go @@ -29,6 +29,10 @@ import ( "encoding/json" "fmt" "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" + "github.com/AliceO2Group/Control/core/task" + "github.com/AliceO2Group/Control/core/task/sm" + "github.com/AliceO2Group/Control/core/workflow" "os" "sort" @@ -101,3 +105,37 @@ func sortMapToString(m map[string]string) string { } return b.String() } + +func NewEnvGoErrorEvent(env *Environment, err string) *pb.Ev_EnvironmentEvent { + return &pb.Ev_EnvironmentEvent{ + EnvironmentId: env.GetId().String(), + State: env.Sm.Current(), + RunNumber: env.GetCurrentRunNumber(), + Error: err, + Message: "a critical error occurred, GO_ERROR transition imminent", + LastRequestUser: env.GetLastRequestUser(), + WorkflowTemplateInfo: env.GetWorkflowInfo(), + } +} + +func newCriticalTasksErrorMessage(env *Environment) string { + criticalTasksInError := env.workflow.GetTasks().Filtered(func(t *task.Task) bool { + return t.GetTraits().Critical && t.GetState() == sm.ERROR + }) + + if len(criticalTasksInError) == 0 { + return "no critical tasks in ERROR" + } else if len(criticalTasksInError) == 1 { + t := criticalTasksInError[0] + name := t.GetName() + + // if available, we prefer role name, because it does not have a long hash for JIT-generated DPL tasks + role, ok := t.GetParentRole().(workflow.Role) + if ok { + name = role.GetName() + } + return fmt.Sprintf("critical task '%s' on host '%s' transitioned to ERROR", name, t.GetHostname()) + } else { + return fmt.Sprintf("%d critical tasks transitioned to ERROR, could not determine the first one to fail", len(criticalTasksInError)) + } +} diff --git a/core/server.go b/core/server.go index f14c1e45..86bacb28 100644 --- a/core/server.go +++ b/core/server.go @@ -28,6 +28,7 @@ package core import ( "encoding/json" + "fmt" "maps" "runtime" "sort" @@ -646,6 +647,9 @@ func (m *RpcServer) ControlEnvironment(cxt context.Context, req *pb.ControlEnvir WithField("level", infologger.IL_Ops). WithError(err). Errorf("transition '%s' failed, transitioning into ERROR.", req.GetType().String()) + the.EventWriterWithTopic(topic.Environment).WriteEvent( + environment.NewEnvGoErrorEvent(env, fmt.Sprintf("transition %s failed: %v", req.GetType().String(), err)), + ) err = env.TryTransition(environment.NewGoErrorTransition(m.state.taskman)) if err != nil { log.WithField("partition", env.Id()).Warnf("could not complete requested GO_ERROR transition, forcing move to ERROR: %s", err.Error()) diff --git a/core/task/task.go b/core/task/task.go index 9e379c31..6e704086 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -151,6 +151,13 @@ func (t *Task) SetSafeToStop(done bool) { t.safeToStop = done } +func (t *Task) GetState() sm.State { + t.mu.Lock() + defer t.mu.Unlock() + + return t.state +} + func (t *Task) GetParentRole() interface{} { t.mu.RLock() defer t.mu.RUnlock()