From 5b7e431e5f8488d68ed2d4755d8407c984579732 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 1 Dec 2025 16:54:02 -0800 Subject: [PATCH] Add CPU and mem metrics --- contrib/resourcetuner/resourcetuner.go | 14 +++++- contrib/resourcetuner/resourcetuner_test.go | 50 +++++++++++++++++++-- internal/common/metrics/constants.go | 2 + 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/contrib/resourcetuner/resourcetuner.go b/contrib/resourcetuner/resourcetuner.go index f2be32150..e52fb0621 100644 --- a/contrib/resourcetuner/resourcetuner.go +++ b/contrib/resourcetuner/resourcetuner.go @@ -3,6 +3,7 @@ package resourcetuner import ( "context" "errors" + "go.temporal.io/sdk/internal/common/metrics" "runtime" "sync" "time" @@ -157,7 +158,7 @@ func (r *ResourceBasedSlotSupplier) TryReserveSlot(info worker.SlotReservationIn numIssued := info.NumIssuedSlots() if numIssued < r.options.MinSlots || (numIssued < r.options.MaxSlots && time.Since(r.lastSlotIssuedAt) > r.options.RampThrottle) { - decision, err := r.controller.pidDecision(info.Logger()) + decision, err := r.controller.pidDecision(info.Logger(), info.MetricsHandler()) if err != nil { info.Logger().Error("Error calculating resource usage", "error", err) return nil @@ -276,7 +277,7 @@ func NewResourceController(options ResourceControllerOptions) *ResourceControlle } } -func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) { +func (rc *ResourceController) pidDecision(logger log.Logger, metricsHandler metrics.Handler) (bool, error) { rc.mu.Lock() defer rc.mu.Unlock() @@ -288,6 +289,7 @@ func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) { if err != nil { return false, err } + rc.publishResourceMetrics(metricsHandler, memUsage, cpuUsage) if memUsage >= rc.options.MemTargetPercent { // Never allow going over the memory target return false, nil @@ -314,6 +316,14 @@ func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) { rc.cpuPid.State.ControlSignal > rc.options.CpuOutputThreshold, nil } +func (rc *ResourceController) publishResourceMetrics(metricsHandler metrics.Handler, memUsage, cpuUsage float64) { + if metricsHandler == nil { + return + } + metricsHandler.Gauge(metrics.ResourceSlotsMemUsage).Update(memUsage * 100) + metricsHandler.Gauge(metrics.ResourceSlotsCPUUsage).Update(cpuUsage * 100) +} + type psUtilSystemInfoSupplier struct { logger log.Logger mu sync.Mutex diff --git a/contrib/resourcetuner/resourcetuner_test.go b/contrib/resourcetuner/resourcetuner_test.go index d993f7b36..0ffcfac96 100644 --- a/contrib/resourcetuner/resourcetuner_test.go +++ b/contrib/resourcetuner/resourcetuner_test.go @@ -1,6 +1,7 @@ package resourcetuner import ( + "go.temporal.io/sdk/internal/common/metrics" "testing" "github.com/stretchr/testify/assert" @@ -22,6 +23,7 @@ func (f FakeSystemInfoSupplier) GetCpuUsage(_ *SystemInfoContext) (float64, erro func TestPidDecisions(t *testing.T) { logger := &log.NoopLogger{} + metricsHandler := metrics.NopHandler fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.5, cpuUse: 0.5} rcOpts := DefaultResourceControllerOptions() rcOpts.MemTargetPercent = 0.8 @@ -30,7 +32,7 @@ func TestPidDecisions(t *testing.T) { rc := NewResourceController(rcOpts) for i := 0; i < 10; i++ { - decision, err := rc.pidDecision(logger) + decision, err := rc.pidDecision(logger, metricsHandler) assert.NoError(t, err) assert.True(t, decision) @@ -41,7 +43,7 @@ func TestPidDecisions(t *testing.T) { fakeSupplier.memUse = 0.8 fakeSupplier.cpuUse = 0.9 for i := 0; i < 10; i++ { - decision, err := rc.pidDecision(logger) + decision, err := rc.pidDecision(logger, metricsHandler) assert.NoError(t, err) assert.False(t, decision) } @@ -49,7 +51,7 @@ func TestPidDecisions(t *testing.T) { fakeSupplier.memUse = 0.7 fakeSupplier.cpuUse = 0.9 for i := 0; i < 10; i++ { - decision, err := rc.pidDecision(logger) + decision, err := rc.pidDecision(logger, metricsHandler) assert.NoError(t, err) assert.False(t, decision) } @@ -57,8 +59,48 @@ func TestPidDecisions(t *testing.T) { fakeSupplier.memUse = 0.7 fakeSupplier.cpuUse = 0.7 for i := 0; i < 10; i++ { - decision, err := rc.pidDecision(logger) + decision, err := rc.pidDecision(logger, metricsHandler) assert.NoError(t, err) assert.True(t, decision) } } + +func TestPidDecisionEmitsUsageMetrics(t *testing.T) { + logger := &log.NoopLogger{} + metricsHandler := metrics.NewCapturingHandler() + fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.25, cpuUse: 0.75} + + rcOpts := DefaultResourceControllerOptions() + rcOpts.InfoSupplier = fakeSupplier + rc := NewResourceController(rcOpts) + + _, err := rc.pidDecision(logger, metricsHandler) + assert.NoError(t, err) + + gauges := metricsHandler.Gauges() + assert.Len(t, gauges, 2) + + gaugesByName := make(map[string]float64) + for _, gauge := range gauges { + gaugesByName[gauge.Name] = gauge.Value() + } + + assert.Equal(t, 25.0, gaugesByName[metrics.ResourceSlotsMemUsage]) + assert.Equal(t, 75.0, gaugesByName[metrics.ResourceSlotsCPUUsage]) + + fakeSupplier.memUse = 0.7 + fakeSupplier.cpuUse = 0.9 + _, err = rc.pidDecision(logger, metricsHandler) + assert.NoError(t, err) + + gauges = metricsHandler.Gauges() + assert.Len(t, gauges, 2) + + gaugesByName = make(map[string]float64) + for _, gauge := range gauges { + gaugesByName[gauge.Name] = gauge.Value() + } + + assert.Equal(t, 70.0, gaugesByName[metrics.ResourceSlotsMemUsage]) + assert.Equal(t, 90.0, gaugesByName[metrics.ResourceSlotsCPUUsage]) +} diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 6434c1939..1ec5d498c 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -40,6 +40,8 @@ const ( WorkerStartCounter = TemporalMetricsPrefix + "worker_start" WorkerTaskSlotsAvailable = TemporalMetricsPrefix + "worker_task_slots_available" WorkerTaskSlotsUsed = TemporalMetricsPrefix + "worker_task_slots_used" + ResourceSlotsCPUUsage = TemporalMetricsPrefix + "resource_slots_cpu_usage" + ResourceSlotsMemUsage = TemporalMetricsPrefix + "resource_slots_mem_usage" PollerStartCounter = TemporalMetricsPrefix + "poller_start" NumPoller = TemporalMetricsPrefix + "num_pollers"