From 592ae8ab5e7bd2997bfe22ec77885413b100c125 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 30 May 2025 10:35:05 +0200 Subject: [PATCH 1/5] Handle a bunch of unhandled errors --- apricot/local/local_test.go | 17 +++++++++-- apricot/local/service.go | 5 +++- apricot/local/service_test.go | 6 ++++ apricot/local/service_test.yaml | 4 ++- apricot/local/servicehttp.go | 29 ++++++++++++++++--- common/event/writer.go | 6 +++- common/event/writer_test.go | 3 +- common/monitoring/common.go | 6 ++-- common/monitoring/metricsreservoirsampling.go | 2 +- common/monitoring/monitoring.go | 5 +++- .../cfgbackend/configuration_suite_test.go | 17 +++++++++-- configuration/template/template_test.go | 17 +++++++++-- core/environment/utils.go | 6 +++- core/task/channel/outbound.go | 6 +++- core/task/manager.go | 16 +++++++--- core/task/scheduler.go | 7 ++++- core/task/schedulerstate.go | 14 +++++++-- 17 files changed, 135 insertions(+), 31 deletions(-) diff --git a/apricot/local/local_test.go b/apricot/local/local_test.go index ea48c3a4..49fc1124 100644 --- a/apricot/local/local_test.go +++ b/apricot/local/local_test.go @@ -31,11 +31,21 @@ var _ = BeforeSuite(func() { for _, configFile := range configFiles { from, err := os.Open("./" + configFile) Expect(err).NotTo(HaveOccurred()) - defer from.Close() + defer func() { + err := from.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() to, err := os.OpenFile(*tmpDir+"/"+configFile, os.O_RDWR|os.O_CREATE, 0666) Expect(err).NotTo(HaveOccurred()) - defer to.Close() + defer func() { + err := to.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() _, err = io.Copy(to, from) Expect(err).NotTo(HaveOccurred()) @@ -45,5 +55,6 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { - os.RemoveAll(*tmpDir) + err := os.RemoveAll(*tmpDir) + Expect(err).NotTo(HaveOccurred()) }) diff --git a/apricot/local/service.go b/apricot/local/service.go index 0fe042dd..53e11152 100644 --- a/apricot/local/service.go +++ b/apricot/local/service.go @@ -508,7 +508,10 @@ func (s *Service) GetCRUCardsForHost(hostname string) ([]string, error) { if err != nil { return nil, err } - json.Unmarshal([]byte(cfgCards), &cards) + err = json.Unmarshal([]byte(cfgCards), &cards) + if err != nil { + return nil, err + } unique := make(map[string]bool) for _, card := range cards { if _, value := unique[card.Serial]; !value { diff --git a/apricot/local/service_test.go b/apricot/local/service_test.go index 4f611222..b3d8e1ed 100644 --- a/apricot/local/service_test.go +++ b/apricot/local/service_test.go @@ -461,6 +461,12 @@ var _ = Describe("local service", func() { Expect(err).To(HaveOccurred()) }) }) + When("retrieving the CRU cards for an FLP with invalid \"cards\" JSON", func() { + It("should produce an error", func() { + cards, err = svc.GetCRUCardsForHost("flp500") + Expect(err).To(HaveOccurred()) + }) + }) }) Describe("getting endpoints for a CRU card", func() { var ( diff --git a/apricot/local/service_test.yaml b/apricot/local/service_test.yaml index 07047525..4d15acac 100644 --- a/apricot/local/service_test.yaml +++ b/apricot/local/service_test.yaml @@ -192,4 +192,6 @@ o2: flp003: cards: "{ \"key\" : \"value\" }" flp100: - cards: "{ \"key\" : \"value\" }" \ No newline at end of file + cards: "{ \"key\" : \"value\" }" + flp500: + cards: "invalid json{" \ No newline at end of file diff --git a/apricot/local/servicehttp.go b/apricot/local/servicehttp.go index 50e66995..c7f3c9fe 100644 --- a/apricot/local/servicehttp.go +++ b/apricot/local/servicehttp.go @@ -615,7 +615,13 @@ func (httpsvc *HttpService) ApiPrintClusterInformation(w http.ResponseWriter, r log.WithError(err).Warn("Error, could not marshal inventory.") } } - fmt.Fprintln(w, string(result)) + _, err = fmt.Fprintln(w, string(result)) + if err != nil { + log.WithField("result", string(result)). + WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warn("Error, could not write a part of HTTP response to response writer.") + } case "text": fallthrough default: @@ -623,13 +629,28 @@ func (httpsvc *HttpService) ApiPrintClusterInformation(w http.ResponseWriter, r w.WriteHeader(http.StatusOK) if hosts != nil { for _, hostname := range hosts { - fmt.Fprintf(w, "%s\n", hostname) + _, err := fmt.Fprintf(w, "%s\n", hostname) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warn("Error, could not write a part of HTTP response to response writer.") + } } } else if inventory != nil { for detector, flps := range inventory { - fmt.Fprintf(w, "%s\n", detector) + _, err := fmt.Fprintf(w, "%s\n", detector) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warn("Error, could not write a part of HTTP response to response writer.") + } for _, hostname := range flps { - fmt.Fprintf(w, "\t%s\n", hostname) + _, err = fmt.Fprintf(w, "\t%s\n", hostname) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warn("Error, could not write a part of HTTP response to response writer.") + } } } } diff --git a/common/event/writer.go b/common/event/writer.go index 7d4c9e51..534c7731 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -133,7 +133,11 @@ func (w *KafkaWriter) Close() { w.runningWorkers.Add(2) close(w.toBatchMessagesChan) w.runningWorkers.Wait() - w.Writer.Close() + err := w.Writer.Close() + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + Errorf("failed to close writer: %v", err) + } } } diff --git a/common/event/writer_test.go b/common/event/writer_test.go index 48b9fd8b..1a6363a8 100644 --- a/common/event/writer_test.go +++ b/common/event/writer_test.go @@ -49,7 +49,8 @@ var _ = Describe("Writer", func() { writer.writeFunction = func(messages []kafka.Message) { Expect(len(messages)).To(Equal(1)) event := &pb.Event{} - proto.Unmarshal(messages[0].Value, event) + err := proto.Unmarshal(messages[0].Value, event) + Expect(err).To(BeNil()) Expect(event.GetCoreStartEvent().FrameworkId).To(Equal("FrameworkId")) channel <- struct{}{} } diff --git a/common/monitoring/common.go b/common/monitoring/common.go index 9f21f856..e5c0b1a8 100644 --- a/common/monitoring/common.go +++ b/common/monitoring/common.go @@ -35,11 +35,11 @@ type key struct { } func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) { - hash.WriteString(metric.name) + _, _ = hash.WriteString(metric.name) for _, tag := range metric.tags { - hash.WriteString(tag.name) - hash.WriteString(tag.value) + _, _ = hash.WriteString(tag.name) + _, _ = hash.WriteString(tag.value) } } diff --git a/common/monitoring/metricsreservoirsampling.go b/common/monitoring/metricsreservoirsampling.go index 1e5cbe4d..f4b0eb4b 100644 --- a/common/monitoring/metricsreservoirsampling.go +++ b/common/monitoring/metricsreservoirsampling.go @@ -66,7 +66,7 @@ func metricFieldToFloat64(field any) float64 { func (this *MetricsReservoirSampling) AddMetric(metric *Metric) { for fieldName, field := range metric.fields { metricNameTagsToHash(&this.hash, metric) - this.hash.WriteString(fieldName) + _, _ = this.hash.WriteString(fieldName) k := key{nameTagsHash: hashValueAndReset(&this.hash), timestamp: time.Unix(metric.timestamp.Unix(), 0)} if storedMetric, ok := this.metricsBuckets[k]; !ok { newReservoir := newReservoirSampling(fieldName, reservoirSize) diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index 54b48b1c..47cf9dc2 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -120,7 +120,10 @@ func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { if metricsToConvert == nil { metricsToConvert = make([]Metric, 0) } - Format(w, metricsToConvert) + err := Format(w, metricsToConvert) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel).Errorf("Failed to export metrics: %v", err) + } } func Send(metric *Metric) { diff --git a/configuration/cfgbackend/configuration_suite_test.go b/configuration/cfgbackend/configuration_suite_test.go index 55bcc0ee..0c7e61fa 100644 --- a/configuration/cfgbackend/configuration_suite_test.go +++ b/configuration/cfgbackend/configuration_suite_test.go @@ -28,16 +28,27 @@ var _ = BeforeSuite(func() { // copy config file from, err := os.Open("./" + configFile) Expect(err).NotTo(HaveOccurred()) - defer from.Close() + defer func() { + err := from.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() to, err := os.OpenFile(*tmpDir+"/"+configFile, os.O_RDWR|os.O_CREATE, 0666) Expect(err).NotTo(HaveOccurred()) - defer to.Close() + defer func() { + err := to.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() _, err = io.Copy(to, from) Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { - os.RemoveAll(*tmpDir) + err := os.RemoveAll(*tmpDir) + Expect(err).NotTo(HaveOccurred()) }) diff --git a/configuration/template/template_test.go b/configuration/template/template_test.go index 91b90c45..5c4d6d42 100644 --- a/configuration/template/template_test.go +++ b/configuration/template/template_test.go @@ -28,11 +28,21 @@ var _ = BeforeSuite(func() { for _, configFile := range configFiles { from, err := os.Open("./" + configFile) Expect(err).NotTo(HaveOccurred()) - defer from.Close() + defer func() { + err := from.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() to, err := os.OpenFile(*tmpDir+"/"+configFile, os.O_RDWR|os.O_CREATE, 0666) Expect(err).NotTo(HaveOccurred()) - defer to.Close() + defer func() { + err := to.Close() + if err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() _, err = io.Copy(to, from) Expect(err).NotTo(HaveOccurred()) @@ -40,5 +50,6 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { - os.RemoveAll(*tmpDir) + err := os.RemoveAll(*tmpDir) + Expect(err).NotTo(HaveOccurred()) }) diff --git a/core/environment/utils.go b/core/environment/utils.go index 3270f24c..7cb236b7 100644 --- a/core/environment/utils.go +++ b/core/environment/utils.go @@ -28,6 +28,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/AliceO2Group/Control/common/logger/infologger" "os" "sort" @@ -93,7 +94,10 @@ func sortMapToString(m map[string]string) string { b := new(bytes.Buffer) for _, k := range keys { - fmt.Fprintf(b, "%s=\"%s\"\n", k, m[k]) + _, err := fmt.Fprintf(b, "%s=\"%s\"\n", k, m[k]) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel).Errorf("Error formatting or buffering string for key %s: %v", k, err) + } } return b.String() } diff --git a/core/task/channel/outbound.go b/core/task/channel/outbound.go index a9855df0..040ca95d 100644 --- a/core/task/channel/outbound.go +++ b/core/task/channel/outbound.go @@ -26,6 +26,7 @@ package channel import ( "fmt" + "github.com/AliceO2Group/Control/common/logger/infologger" "strconv" "strings" @@ -173,7 +174,10 @@ func MergeOutbound(hp, lp []Outbound) (channels []Outbound) { updated := false for _, pCh := range channels { if v.Name == pCh.Name { - mergo.Merge(&pCh, v) + err := mergo.Merge(&pCh, v) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel).Errorf("error merging outbound channel '%s': %v", v.Name, err) + } updated = true break } diff --git a/core/task/manager.go b/core/task/manager.go index f9727ec7..4ac46fcd 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -666,7 +666,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e return } -func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error { +func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) { taskReleaseErrors := make(map[string]error) taskIdsReleased := make([]string, 0) @@ -685,8 +685,6 @@ func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error { } m.internalEventCh <- event.NewTasksReleasedEvent(envId, taskIdsReleased, taskReleaseErrors) - - return nil } func (m *Manager) releaseTask(envId uid.ID, task *Task) error { @@ -1276,7 +1274,17 @@ func (m *Manager) handleMessage(tm *TaskmanMessage) error { mesosState == mesos.TASK_KILLING || mesosState == mesos.TASK_UNKNOWN) { killCall := calls.Kill(mesosStatus.TaskID.GetValue(), mesosStatus.AgentID.GetValue()) - calls.CallNoData(context.TODO(), m.schedulerState.cli, killCall) + err := calls.CallNoData(context.TODO(), m.schedulerState.cli, killCall) + if err != nil { + log.WithPrefix("taskman"). + WithField("taskId", mesosStatus.GetTaskID().Value). + WithField("state", mesosState.String()). + WithField("source", mesosStatus.GetSource().String()). + WithField("message", mesosStatus.GetMessage()). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Errorf("could not kill task '%s' after reconciliation", mesosStatus.GetTaskID().Value) + } } else { // Enqueue task state update go m.updateTaskStatus(&mesosStatus) diff --git a/core/task/scheduler.go b/core/task/scheduler.go index c5088cc4..02a32c42 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -93,7 +93,12 @@ func runSchedulerController(ctx context.Context, switch { case receivedEvent == scheduler.Event_SUBSCRIBED: if state.sm.Is("INITIAL") { - state.sm.Event(context.Background(), "CONNECT") + err := state.sm.Event(context.Background(), "CONNECT") + if err != nil { + log.WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Error("scheduler state CONNECT event failed") + } } } } diff --git a/core/task/schedulerstate.go b/core/task/schedulerstate.go index 86970754..946ca741 100644 --- a/core/task/schedulerstate.go +++ b/core/task/schedulerstate.go @@ -208,10 +208,20 @@ func (state *schedulerState) Start(ctx context.Context) { if state.err != nil { err = state.err log.WithField("error", err.Error()).Debug("scheduler quit with error, main state machine GO_ERROR") - state.sm.Event(context.Background(), "GO_ERROR", err) // TODO: use error information in GO_ERROR + err = state.sm.Event(context.Background(), "GO_ERROR", err) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("scheduler state GO_ERROR event failed") + } } else { log.Debug("scheduler quit, no errors") - state.sm.Event(context.Background(), "EXIT") + err := state.sm.Event(context.Background(), "EXIT") + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("scheduler state EXIT event failed") + } } }() } From d036b6850900e0f7622f7ef4cfd4a536f74a85ef Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 30 May 2025 11:58:38 +0200 Subject: [PATCH 2/5] fix reserved words used as variable names --- common/gera/map_test.go | 24 +++++++++---------- common/monitoring/metricsreservoirsampling.go | 6 ++--- common/monitoring/monitoring_test.go | 18 +++++++------- core/server.go | 12 +++++----- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/common/gera/map_test.go b/common/gera/map_test.go index 8434fa1b..187c7446 100644 --- a/common/gera/map_test.go +++ b/common/gera/map_test.go @@ -250,18 +250,18 @@ pdp_workflow_parameters: "" }) It("should correctly return a copy of the whole structure", func() { - copy := stringMap.Copy() - Expect(copy).NotTo(BeNil()) - Expect(copy == stringMap).NotTo(BeTrue()) - Expect(copy).To(Equal(stringMap)) - Expect(copy.Raw()).To(Equal(stringMap.Raw())) - Expect(copy.Len()).To(Equal(stringMap.Len())) - copy.Set("detector", "test") - Expect(copy).NotTo(Equal(stringMap)) - Expect(copy.Raw()).NotTo(Equal(stringMap.Raw())) - Expect(copy.Len()).To(Equal(stringMap.Len())) - copy.Set("detector2", "test2") - Expect(copy.Len()).To(Equal(stringMap.Len() + 1)) + mapCopy := stringMap.Copy() + Expect(mapCopy).NotTo(BeNil()) + Expect(mapCopy == stringMap).NotTo(BeTrue()) + Expect(mapCopy).To(Equal(stringMap)) + Expect(mapCopy.Raw()).To(Equal(stringMap.Raw())) + Expect(mapCopy.Len()).To(Equal(stringMap.Len())) + mapCopy.Set("detector", "test") + Expect(mapCopy).NotTo(Equal(stringMap)) + Expect(mapCopy.Raw()).NotTo(Equal(stringMap.Raw())) + Expect(mapCopy.Len()).To(Equal(stringMap.Len())) + mapCopy.Set("detector2", "test2") + Expect(mapCopy.Len()).To(Equal(stringMap.Len() + 1)) }) It("should correctly return a copy of the underlying map", func() { diff --git a/common/monitoring/metricsreservoirsampling.go b/common/monitoring/metricsreservoirsampling.go index f4b0eb4b..853715dd 100644 --- a/common/monitoring/metricsreservoirsampling.go +++ b/common/monitoring/metricsreservoirsampling.go @@ -88,16 +88,16 @@ func (this *MetricsReservoirSampling) GetMetrics() []Metric { for key, reservoirMetric := range this.metricsBuckets { m := Metric{name: reservoirMetric.metric.name, tags: reservoirMetric.metric.tags, timestamp: key.timestamp} - mean, median, min, p10, p30, p70, p90, max, count, poolSize := reservoirMetric.reservoir.GetStats() + mean, median, minimum, p10, p30, p70, p90, maximum, count, poolSize := reservoirMetric.reservoir.GetStats() m.SetFieldFloat64(reservoirMetric.reservoir.name+"_mean", mean) m.SetFieldFloat64(reservoirMetric.reservoir.name+"_median", median) - m.SetFieldFloat64(reservoirMetric.reservoir.name+"_min", min) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_min", minimum) m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p10", p10) m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p30", p30) m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p70", p70) m.SetFieldFloat64(reservoirMetric.reservoir.name+"_p90", p90) - m.SetFieldFloat64(reservoirMetric.reservoir.name+"_max", max) + m.SetFieldFloat64(reservoirMetric.reservoir.name+"_max", maximum) m.SetFieldUInt64(reservoirMetric.reservoir.name+"_count", count) m.SetFieldUInt64(reservoirMetric.reservoir.name+"_poolsize", poolSize) diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go index ba5e719f..1f68a9bd 100644 --- a/common/monitoring/monitoring_test.go +++ b/common/monitoring/monitoring_test.go @@ -382,7 +382,7 @@ func TestApproximateHistogram(t *testing.T) { histo.AddPoint(10) } - mean, median, min, p10, p30, p70, p90, max, count, poolSize := histo.GetStats() + mean, median, minimum, p10, p30, p70, p90, maximum, count, poolSize := histo.GetStats() if count != 20 { t.Errorf("wrong count before reset, expected 20, got %d", count) @@ -392,7 +392,7 @@ func TestApproximateHistogram(t *testing.T) { t.Errorf("wrong poolSize, expected 20, got %d", poolSize) } - if mean != 10 || median != 10 || min != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || max != 10 { + if mean != 10 || median != 10 || minimum != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || maximum != 10 { t.Errorf("one of the values is not 10 when it should be 10, mean %v, median %v, 10p %v, 30p %v, 70p %v, 90p %v", mean, median, p10, p30, p70, p90) } @@ -410,7 +410,7 @@ func TestApproximateHistogram(t *testing.T) { histo.AddPoint(10) } - mean, median, min, p10, p30, p70, p90, max, count, poolSize = histo.GetStats() + mean, median, minimum, p10, p30, p70, p90, maximum, count, poolSize = histo.GetStats() if count != 2000 { t.Errorf("wrong count before reset, expected 2000, got %d", count) @@ -420,7 +420,7 @@ func TestApproximateHistogram(t *testing.T) { t.Errorf("wrong poolSize, expected 500, got %d", poolSize) } - if mean != 10 || median != 10 || min != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || max != 10 { + if mean != 10 || median != 10 || minimum != 10 || p10 != 10 || p30 != 10 || p70 != 10 || p90 != 10 || maximum != 10 { t.Errorf("one of the values is not 10 when it should be 10, mean %v, median %v, 10p %v, 30p %v, 70p %v, 90p %v", mean, median, p10, p30, p70, p90) } @@ -438,7 +438,7 @@ func TestApproximateHistogram(t *testing.T) { histo.AddPoint((float64(rand.Int63n(100)))) } - mean, median, min, p10, p30, p70, p90, max, count, poolSize = histo.GetStats() + mean, median, minimum, p10, p30, p70, p90, maximum, count, poolSize = histo.GetStats() if count != 10000 { t.Errorf("wrong count before reset, expected 10000, got %d", count) @@ -456,8 +456,8 @@ func TestApproximateHistogram(t *testing.T) { t.Errorf("wrong median value, expected 50+-10 got %v", median) } - if float64(min) > 10 { - t.Errorf("wrong min value, expected 0+-10 got %v", min) + if float64(minimum) > 10 { + t.Errorf("wrong min value, expected 0+-10 got %v", minimum) } if math.Abs(float64(p10-10)) > 10 { @@ -476,8 +476,8 @@ func TestApproximateHistogram(t *testing.T) { t.Errorf("wrong 90p value, expected 90+-10 got %v", p90) } - if math.Abs(float64(max-100)) > 10 { - t.Errorf("wrong max value, expected 100+-10 got %v", max) + if math.Abs(float64(maximum-100)) > 10 { + t.Errorf("wrong max value, expected 100+-10 got %v", maximum) } } diff --git a/core/server.go b/core/server.go index 3d9ff63f..2067cd47 100644 --- a/core/server.go +++ b/core/server.go @@ -171,9 +171,9 @@ func (m *RpcServer) GetFrameworkInfo(context.Context, *pb.GetFrameworkInfoReques m.logMethod() defer m.logMethodHandled() - maj, _ := strconv.ParseInt(product.VERSION_MAJOR, 10, 32) - min, _ := strconv.ParseInt(product.VERSION_MINOR, 10, 32) - pat, _ := strconv.ParseInt(product.VERSION_PATCH, 10, 32) + major, _ := strconv.ParseInt(product.VERSION_MAJOR, 10, 32) + minor, _ := strconv.ParseInt(product.VERSION_MINOR, 10, 32) + patch, _ := strconv.ParseInt(product.VERSION_PATCH, 10, 32) availableDetectors, activeDetectors, allDetectors, err := m.listDetectors() if err != nil { @@ -190,9 +190,9 @@ func (m *RpcServer) GetFrameworkInfo(context.Context, *pb.GetFrameworkInfoReques HostsCount: int32(m.state.taskman.AgentCache.Count()), InstanceName: viper.GetString("instanceName"), Version: &pb.Version{ - Major: int32(maj), - Minor: int32(min), - Patch: int32(pat), + Major: int32(major), + Minor: int32(minor), + Patch: int32(patch), Build: product.BUILD, VersionStr: product.VERSION, ProductName: product.PRETTY_SHORTNAME, From bd8505168c1e23237ded43fdfb965b4c844f2d97 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 30 May 2025 12:18:03 +0200 Subject: [PATCH 3/5] fix missing 'case' statements for 'iota' consts in 'switch' --- configuration/template/fields.go | 1 + core/task/manager.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/configuration/template/fields.go b/configuration/template/fields.go index a10a99c3..71246376 100644 --- a/configuration/template/fields.go +++ b/configuration/template/fields.go @@ -218,6 +218,7 @@ func (vs *VarStack) consolidated(stage Stage) (consolidatedStack map[string]stri if err != nil { return } + case _STAGE_MAX: } consolidated := gera.MakeMapWithMap(vs.Locals).Wrap(gera.MakeMapWithMap(userVars).Wrap(gera.MakeMapWithMap(vars).Wrap(gera.MakeMapWithMap(defaults)))) diff --git a/core/task/manager.go b/core/task/manager.go index 4ac46fcd..7a92cf57 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -1293,6 +1293,21 @@ func (m *Manager) handleMessage(tm *TaskmanMessage) error { go m.updateTaskState(tm.taskId, tm.state) case taskop.ReleaseTasks: go m.releaseTasks(tm.GetEnvironmentId(), tm.GetTasks()) + case taskop.KillTasks: + log.WithPrefix("taskman"). + WithField("partition", tm.GetEnvironmentId().String()). + WithField("level", infologger.IL_Devel). + WithField("status", tm.status.String()). + WithField("source", tm.status.GetSource().String()). + WithField("message", tm.status.GetMessage()). + Warn("unexpected KillTasks message received") + case taskop.Error: + log.WithPrefix("taskman"). + WithField("partition", tm.GetEnvironmentId().String()). + WithField("level", infologger.IL_Devel). + WithField("status", tm.status.String()). + WithField("source", tm.status.GetSource().String()). + Warn("taskman received error: %s", tm.GetError()) } return nil From 04b19a97a6f67ef8492d87ac00c1e12be151e55a Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 30 May 2025 14:42:18 +0200 Subject: [PATCH 4/5] fix wrong int to string conversion in BKP plugin --- core/integration/bookkeeping/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/integration/bookkeeping/plugin.go b/core/integration/bookkeeping/plugin.go index 3096ae73..3cd605f4 100644 --- a/core/integration/bookkeeping/plugin.go +++ b/core/integration/bookkeeping/plugin.go @@ -160,7 +160,7 @@ func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string { for _, envId := range envIds { if runStop, ok := p.pendingRunStops[envId.String()]; ok { - out[envId] = string(runStop) + out[envId] = strconv.FormatInt(runStop, 10) } } return out From c8dbd02f785be91242a0fc77959910bf73e5ad00 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 30 May 2025 14:49:40 +0200 Subject: [PATCH 5/5] fix variable names collding with imported package names --- configuration/template/fields.go | 4 ++-- core/server.go | 18 +++++++++--------- core/task/manager.go | 4 ++-- core/task/scheduler.go | 10 +++++----- core/workflow/callable/call.go | 12 ++++++------ executor/executable/task.go | 8 ++++---- executor/executor.go | 6 +++--- executor/executorcmd/client.go | 4 ++-- 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/configuration/template/fields.go b/configuration/template/fields.go index 71246376..3a0114fe 100644 --- a/configuration/template/fields.go +++ b/configuration/template/fields.go @@ -343,7 +343,7 @@ func (fields Fields) Execute(confSvc ConfigurationService, parentPath string, va func copyMap(src map[string]interface{}, dest map[string]interface{}) { for k, v := range src { - vm, ok := v.(map[string]interface{}) + vmap, ok := v.(map[string]interface{}) if ok { var destk map[string]interface{} if _, exists := dest[k]; exists { @@ -354,7 +354,7 @@ func copyMap(src map[string]interface{}, dest map[string]interface{}) { } else { destk = make(map[string]interface{}) } - copyMap(vm, destk) + copyMap(vmap, destk) dest[k] = destk } else { dest[k] = v diff --git a/core/server.go b/core/server.go index 2067cd47..d8d3a6a1 100644 --- a/core/server.go +++ b/core/server.go @@ -860,24 +860,24 @@ func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.Ge m.logMethod() defer m.logMethodHandled() - task := m.state.taskman.GetTask(req.TaskId) - if task == nil { + requestedTask := m.state.taskman.GetTask(req.TaskId) + if requestedTask == nil { return &pb.GetTaskReply{Timestamp: currentUnixMilli()}, status.New(codes.NotFound, "task not found").Err() } - taskClass := task.GetTaskClass() - commandInfo := task.GetTaskCommandInfo() + taskClass := requestedTask.GetTaskClass() + commandInfo := requestedTask.GetTaskCommandInfo() var outbound []channel.Outbound var inbound []channel.Inbound taskPath := "" // TODO: probably not the nicest way to do this... the outbound assignments should be cached // in the Task - if task.IsLocked() { + if requestedTask.IsLocked() { type parentRole interface { CollectOutboundChannels() []channel.Outbound GetPath() string CollectInboundChannels() []channel.Inbound } - parent, ok := task.GetParentRole().(parentRole) + parent, ok := requestedTask.GetParentRole().(parentRole) if ok { outbound = channel.MergeOutbound(parent.CollectOutboundChannels(), taskClass.Connect) taskPath = parent.GetPath() @@ -891,13 +891,13 @@ func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.Ge rep := &pb.GetTaskReply{ Task: &pb.TaskInfo{ - ShortInfo: taskToShortTaskInfo(task, m.state.taskman), + ShortInfo: taskToShortTaskInfo(requestedTask, m.state.taskman), InboundChannels: inboundChannelsToPbChannels(inbound), OutboundChannels: outboundChannelsToPbChannels(outbound), CommandInfo: commandInfoToPbCommandInfo(commandInfo), TaskPath: taskPath, - EnvId: task.GetEnvironmentId().String(), - Properties: task.GetProperties(), + EnvId: requestedTask.GetEnvironmentId().String(), + Properties: requestedTask.GetProperties(), }, Timestamp: currentUnixMilli(), } diff --git a/core/task/manager.go b/core/task/manager.go index 7a92cf57..28d1d0d2 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -744,7 +744,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { Debug("generated inbound bindMap for environment configuration") src := sm.STANDBY.String() - event := "CONFIGURE" + evt := "CONFIGURE" dest := sm.CONFIGURED.String() args := make(controlcommands.PropertyMapsMap) args, err = tasks.BuildPropertyMaps(bindMap) @@ -755,7 +755,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { WithField("partition", envId.String()). Debug("pushing configuration to tasks") - cmd := controlcommands.NewMesosCommand_Transition(envId, receivers, src, event, dest, args) + cmd := controlcommands.NewMesosCommand_Transition(envId, receivers, src, evt, dest, args) cmd.ResponseTimeout = 120 * time.Second // The default timeout is 90 seconds, but we need more time for the tasks to configure _ = m.cq.Enqueue(cmd, notify) diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 02a32c42..5e2d60fe 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -1516,12 +1516,12 @@ func makeTaskForMesosResources( WithField("inboundChannels", func() string { accu := make([]string, len(wants.InboundChannels)) for i := 0; i < len(wants.InboundChannels); i++ { - channel := wants.InboundChannels[i] - accu[i] = channel.Name - if len(channel.Global) > 0 { - accu[i] += fmt.Sprintf(" (global: %s)", channel.Global) + inboundChannel := wants.InboundChannels[i] + accu[i] = inboundChannel.Name + if len(inboundChannel.Global) > 0 { + accu[i] += fmt.Sprintf(" (global: %s)", inboundChannel.Global) } - if endpoint, ok := bindMap[channel.Name]; ok { + if endpoint, ok := bindMap[inboundChannel.Name]; ok { accu[i] += fmt.Sprintf(" -> %s", endpoint.GetAddress()) } } diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index 7b1282da..0af9946c 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -72,14 +72,14 @@ func NewCall(funcCall string, returnVar string, parent ParentRole) (call *Call) } func (s Calls) CallAll() map[*Call]error { - errors := make(map[*Call]error) + errs := make(map[*Call]error) for _, v := range s { err := v.Call() if err != nil { - errors[v] = err + errs[v] = err } } - return errors + return errs } func (s Calls) StartAll() { @@ -90,7 +90,7 @@ func (s Calls) StartAll() { func (s Calls) AwaitAll() map[*Call]error { // Since each Await call blocks, we call it in parallel and then collect - errors := make(map[*Call]error) + errs := make(map[*Call]error) wg := &sync.WaitGroup{} wg.Add(len(s)) for _, v := range s { @@ -98,12 +98,12 @@ func (s Calls) AwaitAll() map[*Call]error { defer wg.Done() err := v.Await() if err != nil { - errors[v] = err + errs[v] = err } }(v) } wg.Wait() - return errors + return errs } func (c *Call) Call() error { diff --git a/executor/executable/task.go b/executor/executable/task.go index 337be37f..0376fe8f 100644 --- a/executor/executable/task.go +++ b/executor/executable/task.go @@ -208,11 +208,11 @@ func prepareTaskCmd(commandInfo *common.TaskCommandInfo) (*exec.Cmd, error) { return nil, err } - uid, err := strconv.ParseUint(targetUser.Uid, 10, 32) + userid, err := strconv.ParseUint(targetUser.Uid, 10, 32) if err != nil { return nil, err } - gid, err := strconv.ParseUint(targetUser.Gid, 10, 32) + groupid, err := strconv.ParseUint(targetUser.Gid, 10, 32) if err != nil { return nil, err } @@ -220,8 +220,8 @@ func prepareTaskCmd(commandInfo *common.TaskCommandInfo) (*exec.Cmd, error) { gids, gidStrings := executorutil.GetGroupIDs(targetUser) credential := &syscall.Credential{ - Uid: uint32(uid), - Gid: uint32(gid), + Uid: uint32(userid), + Gid: uint32(groupid), Groups: gids, NoSetGroups: false, } diff --git a/executor/executor.go b/executor/executor.go index c570a95f..ba44333c 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -111,7 +111,7 @@ func Run(cfg config.Config) { Host: cfg.AgentEndpoint, Path: apiPath, } - http = httpcli.New( + client = httpcli.New( httpcli.Endpoint(apiURL.String()), httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]), httpcli.Do(httpcli.With( @@ -132,7 +132,7 @@ func Run(cfg config.Config) { state = &internalState{ // With this we inject the callOptions into all outgoing calls cli: calls.SenderWith( - httpexec.NewSender(http.Send), + httpexec.NewSender(client.Send), callOptions..., ), // The executor keeps lists of unacknowledged tasks and unacknowledged updates. In case of unexpected @@ -151,7 +151,7 @@ func Run(cfg config.Config) { } subscriber = calls.SenderWith( // Here too, callOptions for all outgoing subscriber calls - httpexec.NewSender(http.Send, httpcli.Close(true)), + httpexec.NewSender(client.Send, httpcli.Close(true)), callOptions..., ) diff --git a/executor/executorcmd/client.go b/executor/executorcmd/client.go index ae400c50..2fa55c0b 100644 --- a/executor/executorcmd/client.go +++ b/executor/executorcmd/client.go @@ -43,7 +43,7 @@ import ( "github.com/AliceO2Group/Control/executor/executorcmd/transitioner" pb "github.com/AliceO2Group/Control/executor/protos" "github.com/sirupsen/logrus" - "google.golang.org/grpc/status" + grpcstatus "google.golang.org/grpc/status" ) type ControlTransport uint32 @@ -164,7 +164,7 @@ func (r *RpcClient) doTransition(ei transitioner.EventInfo) (newState string, er }, grpc.EmptyCallOption{}) if err != nil { - status, ok := status.FromError(err) + status, ok := grpcstatus.FromError(err) if ok { r.Log.WithFields(logrus.Fields{ "code": status.Code().String(),