Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions contrib/resourcetuner/resourcetuner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package resourcetuner
import (
"context"
"errors"
"go.temporal.io/sdk/internal/common/metrics"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we should be using internal in a external package

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I am surprised this isn't failing a lint error

"runtime"
"sync"
"time"
Expand Down Expand Up @@ -157,7 +158,7 @@ func (r *ResourceBasedSlotSupplier) TryReserveSlot(info worker.SlotReservationIn
numIssued := info.NumIssuedSlots()
if numIssued < r.options.MinSlots || (numIssued < r.options.MaxSlots &&
time.Since(r.lastSlotIssuedAt) > r.options.RampThrottle) {
decision, err := r.controller.pidDecision(info.Logger())
decision, err := r.controller.pidDecision(info.Logger(), info.MetricsHandler())
if err != nil {
info.Logger().Error("Error calculating resource usage", "error", err)
return nil
Expand Down Expand Up @@ -276,7 +277,7 @@ func NewResourceController(options ResourceControllerOptions) *ResourceControlle
}
}

func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) {
func (rc *ResourceController) pidDecision(logger log.Logger, metricsHandler metrics.Handler) (bool, error) {
rc.mu.Lock()
defer rc.mu.Unlock()

Expand All @@ -288,6 +289,7 @@ func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) {
if err != nil {
return false, err
}
rc.publishResourceMetrics(metricsHandler, memUsage, cpuUsage)
if memUsage >= rc.options.MemTargetPercent {
// Never allow going over the memory target
return false, nil
Expand All @@ -314,6 +316,14 @@ func (rc *ResourceController) pidDecision(logger log.Logger) (bool, error) {
rc.cpuPid.State.ControlSignal > rc.options.CpuOutputThreshold, nil
}

func (rc *ResourceController) publishResourceMetrics(metricsHandler metrics.Handler, memUsage, cpuUsage float64) {
if metricsHandler == nil {
return
}
metricsHandler.Gauge(metrics.ResourceSlotsMemUsage).Update(memUsage * 100)
metricsHandler.Gauge(metrics.ResourceSlotsCPUUsage).Update(cpuUsage * 100)
}

type psUtilSystemInfoSupplier struct {
logger log.Logger
mu sync.Mutex
Expand Down
50 changes: 46 additions & 4 deletions contrib/resourcetuner/resourcetuner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package resourcetuner

import (
"go.temporal.io/sdk/internal/common/metrics"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -22,6 +23,7 @@ func (f FakeSystemInfoSupplier) GetCpuUsage(_ *SystemInfoContext) (float64, erro

func TestPidDecisions(t *testing.T) {
logger := &log.NoopLogger{}
metricsHandler := metrics.NopHandler
fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.5, cpuUse: 0.5}
rcOpts := DefaultResourceControllerOptions()
rcOpts.MemTargetPercent = 0.8
Expand All @@ -30,7 +32,7 @@ func TestPidDecisions(t *testing.T) {
rc := NewResourceController(rcOpts)

for i := 0; i < 10; i++ {
decision, err := rc.pidDecision(logger)
decision, err := rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)
assert.True(t, decision)

Expand All @@ -41,24 +43,64 @@ func TestPidDecisions(t *testing.T) {
fakeSupplier.memUse = 0.8
fakeSupplier.cpuUse = 0.9
for i := 0; i < 10; i++ {
decision, err := rc.pidDecision(logger)
decision, err := rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)
assert.False(t, decision)
}

fakeSupplier.memUse = 0.7
fakeSupplier.cpuUse = 0.9
for i := 0; i < 10; i++ {
decision, err := rc.pidDecision(logger)
decision, err := rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)
assert.False(t, decision)
}

fakeSupplier.memUse = 0.7
fakeSupplier.cpuUse = 0.7
for i := 0; i < 10; i++ {
decision, err := rc.pidDecision(logger)
decision, err := rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)
assert.True(t, decision)
}
}

func TestPidDecisionEmitsUsageMetrics(t *testing.T) {
logger := &log.NoopLogger{}
metricsHandler := metrics.NewCapturingHandler()
fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.25, cpuUse: 0.75}

rcOpts := DefaultResourceControllerOptions()
rcOpts.InfoSupplier = fakeSupplier
rc := NewResourceController(rcOpts)

_, err := rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)

gauges := metricsHandler.Gauges()
assert.Len(t, gauges, 2)

gaugesByName := make(map[string]float64)
for _, gauge := range gauges {
gaugesByName[gauge.Name] = gauge.Value()
}

assert.Equal(t, 25.0, gaugesByName[metrics.ResourceSlotsMemUsage])
assert.Equal(t, 75.0, gaugesByName[metrics.ResourceSlotsCPUUsage])

fakeSupplier.memUse = 0.7
fakeSupplier.cpuUse = 0.9
_, err = rc.pidDecision(logger, metricsHandler)
assert.NoError(t, err)

gauges = metricsHandler.Gauges()
assert.Len(t, gauges, 2)

gaugesByName = make(map[string]float64)
for _, gauge := range gauges {
gaugesByName[gauge.Name] = gauge.Value()
}

assert.Equal(t, 70.0, gaugesByName[metrics.ResourceSlotsMemUsage])
assert.Equal(t, 90.0, gaugesByName[metrics.ResourceSlotsCPUUsage])
}
2 changes: 2 additions & 0 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
WorkerStartCounter = TemporalMetricsPrefix + "worker_start"
WorkerTaskSlotsAvailable = TemporalMetricsPrefix + "worker_task_slots_available"
WorkerTaskSlotsUsed = TemporalMetricsPrefix + "worker_task_slots_used"
ResourceSlotsCPUUsage = TemporalMetricsPrefix + "resource_slots_cpu_usage"
ResourceSlotsMemUsage = TemporalMetricsPrefix + "resource_slots_mem_usage"
PollerStartCounter = TemporalMetricsPrefix + "poller_start"
NumPoller = TemporalMetricsPrefix + "num_pollers"

Expand Down
Loading