Skip to content

Commit c17657c

Browse files
Adds connection state metrics
Signed-off-by: Elena Kolevska <elena@kolevska.com>
1 parent 65d0922 commit c17657c

File tree

5 files changed

+135
-2
lines changed

5 files changed

+135
-2
lines changed

extra/redisotel-native/metrics.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ const (
2121
// metricsRecorder implements the otel.Recorder interface
2222
type metricsRecorder struct {
2323
operationDuration metric.Float64Histogram
24+
connectionCount metric.Int64UpDownCounter
2425

25-
// Client configuration for attributes
26+
// Client configuration for attributes (used for operation metrics only)
2627
serverAddr string
2728
serverPort string
2829
dbIndex string
@@ -267,3 +268,59 @@ func formatDBIndex(db int) string {
267268
}
268269
return strconv.Itoa(db)
269270
}
271+
272+
// RecordConnectionStateChange records a change in connection state
273+
// This is called from the pool when connections transition between states
274+
func (r *metricsRecorder) RecordConnectionStateChange(
275+
ctx context.Context,
276+
cn redis.ConnInfo,
277+
fromState, toState string,
278+
) {
279+
if r.connectionCount == nil {
280+
return
281+
}
282+
283+
// Extract server address from connection
284+
serverAddr, serverPort := extractServerInfo(cn)
285+
286+
// Build base attributes
287+
attrs := []attribute.KeyValue{
288+
attribute.String("db.system", "redis"),
289+
attribute.String("server.address", serverAddr),
290+
}
291+
292+
// Add server.port if not default
293+
if serverPort != "" && serverPort != "6379" {
294+
attrs = append(attrs, attribute.String("server.port", serverPort))
295+
}
296+
297+
// Decrement old state (if not empty)
298+
if fromState != "" {
299+
fromAttrs := append([]attribute.KeyValue{}, attrs...)
300+
fromAttrs = append(fromAttrs, attribute.String("state", fromState))
301+
r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...))
302+
}
303+
304+
// Increment new state
305+
if toState != "" {
306+
toAttrs := append([]attribute.KeyValue{}, attrs...)
307+
toAttrs = append(toAttrs, attribute.String("state", toState))
308+
r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...))
309+
}
310+
}
311+
312+
// extractServerInfo extracts server address and port from connection info
313+
func extractServerInfo(cn redis.ConnInfo) (addr, port string) {
314+
if cn == nil {
315+
return "", ""
316+
}
317+
318+
remoteAddr := cn.RemoteAddr()
319+
if remoteAddr == nil {
320+
return "", ""
321+
}
322+
323+
addrStr := remoteAddr.String()
324+
host, portStr := parseAddr(addrStr)
325+
return host, portStr
326+
}

extra/redisotel-native/redisotel.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,20 @@ func initOnce(client redis.UniversalClient, opts ...Option) error {
125125
return fmt.Errorf("failed to create operation duration histogram: %w", err)
126126
}
127127

128+
// Create synchronous UpDownCounter for connection count
129+
connectionCount, err := meter.Int64UpDownCounter(
130+
"db.client.connection.count",
131+
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
132+
metric.WithUnit("{connection}"),
133+
)
134+
if err != nil {
135+
return fmt.Errorf("failed to create connection count metric: %w", err)
136+
}
137+
128138
// Create recorder
129139
recorder := &metricsRecorder{
130140
operationDuration: operationDuration,
141+
connectionCount: connectionCount,
131142
serverAddr: serverAddr,
132143
serverPort: serverPort,
133144
dbIndex: dbIndex,

internal/otel/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ type Cmder interface {
2121
type Recorder interface {
2222
// RecordOperationDuration records the total operation duration (including all retries)
2323
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn)
24+
25+
// RecordConnectionStateChange records when a connection changes state
26+
RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string)
2427
}
2528

2629
// Global recorder instance (initialized by extra/redisotel-native)
@@ -30,9 +33,16 @@ var globalRecorder Recorder = noopRecorder{}
3033
func SetGlobalRecorder(r Recorder) {
3134
if r == nil {
3235
globalRecorder = noopRecorder{}
36+
// Unregister pool callback
37+
pool.SetConnectionStateChangeCallback(nil)
3338
return
3439
}
3540
globalRecorder = r
41+
42+
// Register pool callback to forward state changes to recorder
43+
pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) {
44+
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
45+
})
3646
}
3747

3848
// RecordOperationDuration records the total operation duration.
@@ -41,7 +51,14 @@ func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cm
4151
globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn)
4252
}
4353

54+
// RecordConnectionStateChange records when a connection changes state.
55+
// This is called from pool.go when connections transition between states.
56+
func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
57+
globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState)
58+
}
59+
4460
// noopRecorder is a no-op implementation (zero overhead when metrics disabled)
4561
type noopRecorder struct{}
4662

4763
func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {}
64+
func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {}

internal/pool/pool.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ var (
2424
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
2525
ErrPoolTimeout = errors.New("redis: connection pool timeout")
2626

27+
// Global callback for connection state changes (set by otel package)
28+
connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string)
29+
2730
// popAttempts is the maximum number of attempts to find a usable connection
2831
// when popping from the idle connection pool. This handles cases where connections
2932
// are temporarily marked as unusable (e.g., during maintenanceNotifications upgrades or network issues).
@@ -42,6 +45,12 @@ var (
4245
noExpiration = maxTime
4346
)
4447

48+
// SetConnectionStateChangeCallback sets the global callback for connection state changes.
49+
// This is called by the otel package to register metrics recording.
50+
func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) {
51+
connectionStateChangeCallback = fn
52+
}
53+
4554
var timers = sync.Pool{
4655
New: func() interface{} {
4756
t := time.NewTimer(time.Hour)
@@ -468,6 +477,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
468477
}
469478

470479
atomic.AddUint32(&p.stats.Hits, 1)
480+
481+
// Notify metrics: connection moved from idle to used
482+
if connectionStateChangeCallback != nil {
483+
connectionStateChangeCallback(ctx, cn, "idle", "used")
484+
}
485+
471486
return cn, nil
472487
}
473488

@@ -492,6 +507,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
492507
return nil, err
493508
}
494509
}
510+
511+
// Notify metrics: new connection is created and used
512+
if connectionStateChangeCallback != nil {
513+
connectionStateChangeCallback(ctx, newcn, "", "used")
514+
}
515+
495516
return newcn, nil
496517
}
497518

@@ -659,9 +680,19 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
659680
p.connsMu.Unlock()
660681
}
661682
p.idleConnsLen.Add(1)
683+
684+
// Notify metrics: connection moved from used to idle
685+
if connectionStateChangeCallback != nil {
686+
connectionStateChangeCallback(ctx, cn, "used", "idle")
687+
}
662688
} else {
663689
p.removeConnWithLock(cn)
664690
shouldCloseConn = true
691+
692+
// Notify metrics: connection removed (used -> nothing)
693+
if connectionStateChangeCallback != nil {
694+
connectionStateChangeCallback(ctx, cn, "used", "")
695+
}
665696
}
666697

667698
p.freeTurn()
@@ -671,11 +702,16 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
671702
}
672703
}
673704

674-
func (p *ConnPool) Remove(_ context.Context, cn *Conn, reason error) {
705+
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
675706
p.removeConnWithLock(cn)
676707

677708
p.freeTurn()
678709

710+
// Notify metrics: connection removed (assume from used state)
711+
if connectionStateChangeCallback != nil {
712+
connectionStateChangeCallback(ctx, cn, "used", "")
713+
}
714+
679715
_ = p.closeConn(cn)
680716

681717
// Check if we need to create new idle connections to maintain MinIdleConns

otel.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type ConnInfo interface {
2424
type OTelRecorder interface {
2525
// RecordOperationDuration records the total operation duration (including all retries)
2626
RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo)
27+
28+
// RecordConnectionStateChange records when a connection changes state (e.g., idle -> used)
29+
RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string)
2730
}
2831

2932
// SetOTelRecorder sets the global OpenTelemetry recorder.
@@ -55,3 +58,12 @@ func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, durat
5558
}
5659
}
5760

61+
func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) {
62+
// Convert internal pool.Conn to public ConnInfo
63+
var connInfo ConnInfo
64+
if cn != nil {
65+
connInfo = cn
66+
}
67+
a.recorder.RecordConnectionStateChange(ctx, connInfo, fromState, toState)
68+
}
69+

0 commit comments

Comments
 (0)