From 37f8aa3c7e7a48751ef219af148005ccec4963be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 21 Aug 2025 13:05:01 +0200 Subject: [PATCH 1/2] [monitoring] fixed racecondition on Run monitoring and Stop --- common/monitoring/monitoring.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index ca21056b..26e336ad 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -30,6 +30,7 @@ import ( "context" "fmt" "net/http" + "sync/atomic" "time" "github.com/AliceO2Group/Control/common/logger" @@ -38,8 +39,8 @@ import ( ) var ( - // scraping endpoint implementation - server *http.Server + // atomic holder for the HTTP server instance + server atomic.Pointer[http.Server] // objects to store incoming metrics metricsInternal *MetricsAggregate metricsHistogramInternal *MetricsReservoirSampling @@ -154,33 +155,30 @@ func handleFunc(endpointName string) { // // If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged. func Run(port uint16, endpointName string) error { - if IsRunning() { + srv := &http.Server{Addr: fmt.Sprintf(":%d", port)} + // only one Run should initialize and serve + if !server.CompareAndSwap(nil, srv) { return nil } - initChannels() - go eventLoop() - - server = &http.Server{Addr: fmt.Sprintf(":%d", port)} handleFunc(endpointName) - return server.ListenAndServe() + // block until Shutdown is called + return srv.ListenAndServe() } func Stop() { - if !IsRunning() { + srv := server.Swap(nil) + if srv == nil { return } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - server.Shutdown(ctx) - + srv.Shutdown(ctx) endChannel <- struct{}{} <-endChannel - server = nil } func IsRunning() bool { - return server != nil + return server.Load() != nil } From 3494a46d566aa1c34855edb07e88fcd6d49c3dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 26 Aug 2025 15:44:24 +0200 Subject: [PATCH 2/2] fixup! [monitoring] fixed racecondition on Run monitoring and Stop --- common/monitoring/monitoring.go | 12 ++++++------ common/monitoring/monitoring_test.go | 12 ++---------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index 26e336ad..91dbd417 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -155,26 +155,26 @@ func handleFunc(endpointName string) { // // If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged. func Run(port uint16, endpointName string) error { - srv := &http.Server{Addr: fmt.Sprintf(":%d", port)} + localServer := &http.Server{Addr: fmt.Sprintf(":%d", port)} // only one Run should initialize and serve - if !server.CompareAndSwap(nil, srv) { + if !server.CompareAndSwap(nil, localServer) { return nil } initChannels() go eventLoop() handleFunc(endpointName) // block until Shutdown is called - return srv.ListenAndServe() + return localServer.ListenAndServe() } func Stop() { - srv := server.Swap(nil) - if srv == nil { + localServer := server.Swap(nil) + if localServer == nil { return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - srv.Shutdown(ctx) + localServer.Shutdown(ctx) endChannel <- struct{}{} <-endChannel } diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go index 67540d0f..b0da3843 100644 --- a/common/monitoring/monitoring_test.go +++ b/common/monitoring/monitoring_test.go @@ -81,20 +81,12 @@ func TestStartMultipleStop(t *testing.T) { Stop() } -func cleaningUpAfterTest() { - Stop() -} - -func initTest() { - go Run(12345, "notimportant") -} - // decorator function that properly inits and cleans after higher level test of Monitoring package func testFunction(t *testing.T, testToRun func(*testing.T)) { - initTest() + go Run(12345, "notimportant") isRunningWithTimeout(t, time.Second) testToRun(t) - cleaningUpAfterTest() + Stop() } func TestSendingSingleMetric(t *testing.T) {