Skip to content

Commit fe380b9

Browse files
committed
Add phase 1 metrics
1 parent 9cf404c commit fe380b9

File tree

10 files changed

+466
-13
lines changed

10 files changed

+466
-13
lines changed

extra/redisotel-native/metrics.go

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ const (
2020

2121
// metricsRecorder implements the otel.Recorder interface
2222
type metricsRecorder struct {
23-
operationDuration metric.Float64Histogram
24-
connectionCount metric.Int64UpDownCounter
25-
connectionCreateTime metric.Float64Histogram
23+
operationDuration metric.Float64Histogram
24+
connectionCount metric.Int64UpDownCounter
25+
connectionCreateTime metric.Float64Histogram
26+
connectionRelaxedTimeout metric.Int64UpDownCounter
27+
connectionHandoff metric.Int64Counter
28+
clientErrors metric.Int64Counter
29+
maintenanceNotifications metric.Int64Counter
2630

2731
// Client configuration for attributes (used for operation metrics only)
2832
serverAddr string
@@ -54,7 +58,7 @@ func (r *metricsRecorder) RecordOperationDuration(
5458
attribute.Bool("redis.client.operation.blocking", isBlockingCommand(cmd)),
5559

5660
// Recommended attributes
57-
attribute.String("db.system", "redis"),
61+
attribute.String("db.system.name", "redis"),
5862
attribute.String("server.address", r.serverAddr),
5963
}
6064

@@ -326,6 +330,12 @@ func extractServerInfo(cn redis.ConnInfo) (addr, port string) {
326330
return host, portStr
327331
}
328332

333+
// extractPeerInfo extracts peer network address and port from connection info
334+
// For client connections, this is the same as the server address (remote endpoint)
335+
func extractPeerInfo(cn redis.ConnInfo) (addr, port string) {
336+
return extractServerInfo(cn)
337+
}
338+
329339
// RecordConnectionCreateTime records the time it took to create a new connection
330340
func (r *metricsRecorder) RecordConnectionCreateTime(
331341
ctx context.Context,
@@ -364,3 +374,154 @@ func (r *metricsRecorder) RecordConnectionCreateTime(
364374
// Record the histogram
365375
r.connectionCreateTime.Record(ctx, durationSeconds, metric.WithAttributes(attrs...))
366376
}
377+
378+
// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed
379+
func (r *metricsRecorder) RecordConnectionRelaxedTimeout(
380+
ctx context.Context,
381+
delta int,
382+
cn redis.ConnInfo,
383+
poolName, notificationType string,
384+
) {
385+
if r.connectionRelaxedTimeout == nil {
386+
return
387+
}
388+
389+
// Extract server address from connection
390+
serverAddr, serverPort := extractServerInfo(cn)
391+
392+
// Build attributes
393+
attrs := []attribute.KeyValue{
394+
attribute.String("db.system.name", "redis"),
395+
attribute.String("server.address", serverAddr),
396+
attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())),
397+
attribute.String("db.client.connection.pool.name", poolName),
398+
attribute.String("redis.client.connection.notification", notificationType),
399+
}
400+
401+
// Add server.port if not default
402+
if serverPort != "" && serverPort != "6379" {
403+
attrs = append(attrs, attribute.String("server.port", serverPort))
404+
}
405+
406+
// Record the counter (delta can be +1 or -1)
407+
r.connectionRelaxedTimeout.Add(ctx, int64(delta), metric.WithAttributes(attrs...))
408+
}
409+
410+
// RecordConnectionHandoff records when a connection is handed off to another node
411+
func (r *metricsRecorder) RecordConnectionHandoff(
412+
ctx context.Context,
413+
cn redis.ConnInfo,
414+
poolName string,
415+
) {
416+
if r.connectionHandoff == nil {
417+
return
418+
}
419+
420+
// Extract server address from connection
421+
serverAddr, serverPort := extractServerInfo(cn)
422+
423+
// Build attributes
424+
attrs := []attribute.KeyValue{
425+
attribute.String("db.system", "redis"),
426+
attribute.String("server.address", serverAddr),
427+
attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())),
428+
attribute.String("db.client.connection.pool.name", poolName),
429+
}
430+
431+
// Add server.port if not default
432+
if serverPort != "" && serverPort != "6379" {
433+
attrs = append(attrs, attribute.String("server.port", serverPort))
434+
}
435+
436+
// Record the counter
437+
r.connectionHandoff.Add(ctx, 1, metric.WithAttributes(attrs...))
438+
}
439+
440+
// RecordError records client errors (ASK, MOVED, handshake failures, etc.)
441+
func (r *metricsRecorder) RecordError(
442+
ctx context.Context,
443+
errorType string,
444+
cn redis.ConnInfo,
445+
statusCode string,
446+
isInternal bool,
447+
retryAttempts int,
448+
) {
449+
if r.clientErrors == nil {
450+
return
451+
}
452+
453+
// Extract server address and peer address from connection (may be nil for some errors)
454+
var serverAddr, serverPort, peerAddr, peerPort string
455+
if cn != nil {
456+
serverAddr, serverPort = extractServerInfo(cn)
457+
peerAddr, peerPort = extractPeerInfo(cn)
458+
}
459+
460+
// Build attributes
461+
attrs := []attribute.KeyValue{
462+
attribute.String("db.system.name", "redis"),
463+
attribute.String("error.type", errorType),
464+
attribute.String("db.response.status_code", statusCode),
465+
attribute.Bool("redis.client.errors.internal", isInternal),
466+
attribute.Int("redis.client.operation.retry_attempts", retryAttempts),
467+
attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())),
468+
}
469+
470+
// Add server info if available
471+
if serverAddr != "" {
472+
attrs = append(attrs, attribute.String("server.address", serverAddr))
473+
if serverPort != "" && serverPort != "6379" {
474+
attrs = append(attrs, attribute.String("server.port", serverPort))
475+
}
476+
}
477+
478+
// Add peer info if available
479+
if peerAddr != "" {
480+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
481+
if peerPort != "" {
482+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
483+
}
484+
}
485+
486+
// Record the counter
487+
r.clientErrors.Add(ctx, 1, metric.WithAttributes(attrs...))
488+
}
489+
490+
// RecordMaintenanceNotification records when a maintenance notification is received
491+
func (r *metricsRecorder) RecordMaintenanceNotification(
492+
ctx context.Context,
493+
cn redis.ConnInfo,
494+
notificationType string,
495+
) {
496+
if r.maintenanceNotifications == nil {
497+
return
498+
}
499+
500+
// Extract server address and peer address from connection
501+
serverAddr, serverPort := extractServerInfo(cn)
502+
peerAddr, peerPort := extractPeerInfo(cn)
503+
504+
// Build attributes
505+
attrs := []attribute.KeyValue{
506+
attribute.String("db.system.name", "redis"),
507+
attribute.String("server.address", serverAddr),
508+
attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())),
509+
attribute.String("redis.client.connection.notification", notificationType),
510+
}
511+
512+
// Add server.port if not default
513+
if serverPort != "" && serverPort != "6379" {
514+
attrs = append(attrs, attribute.String("server.port", serverPort))
515+
}
516+
517+
// Add peer info if available
518+
if peerAddr != "" {
519+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
520+
if peerPort != "" {
521+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
522+
}
523+
}
524+
525+
// Record the counter
526+
r.maintenanceNotifications.Add(ctx, 1, metric.WithAttributes(attrs...))
527+
}

extra/redisotel-native/redisotel.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,57 @@ func initOnce(client redis.UniversalClient, opts ...Option) error {
145145
return fmt.Errorf("failed to create connection create time histogram: %w", err)
146146
}
147147

148+
// Create UpDownCounter for relaxed timeout tracking
149+
connectionRelaxedTimeout, err := meter.Int64UpDownCounter(
150+
"redis.client.connection.relaxed_timeout",
151+
metric.WithDescription("How many times the connection timeout has been increased/decreased (after a server maintenance notification)"),
152+
metric.WithUnit("{relaxation}"),
153+
)
154+
if err != nil {
155+
return fmt.Errorf("failed to create connection relaxed timeout metric: %w", err)
156+
}
157+
158+
// Create Counter for connection handoffs
159+
connectionHandoff, err := meter.Int64Counter(
160+
"redis.client.connection.handoff",
161+
metric.WithDescription("Connections that have been handed off to another node (e.g after a MOVING notification)"),
162+
)
163+
if err != nil {
164+
return fmt.Errorf("failed to create connection handoff metric: %w", err)
165+
}
166+
167+
// Create Counter for client errors
168+
clientErrors, err := meter.Int64Counter(
169+
"redis.client.errors",
170+
metric.WithDescription("Number of errors handled by the Redis client"),
171+
metric.WithUnit("{error}"),
172+
)
173+
if err != nil {
174+
return fmt.Errorf("failed to create client errors metric: %w", err)
175+
}
176+
177+
// Create Counter for maintenance notifications
178+
maintenanceNotifications, err := meter.Int64Counter(
179+
"redis.client.maintenance.notifications",
180+
metric.WithDescription("Number of maintenance notifications received"),
181+
metric.WithUnit("{notification}"),
182+
)
183+
if err != nil {
184+
return fmt.Errorf("failed to create maintenance notifications metric: %w", err)
185+
}
186+
148187
// Create recorder
149188
recorder := &metricsRecorder{
150-
operationDuration: operationDuration,
151-
connectionCount: connectionCount,
152-
connectionCreateTime: connectionCreateTime,
153-
serverAddr: serverAddr,
154-
serverPort: serverPort,
155-
dbIndex: dbIndex,
189+
operationDuration: operationDuration,
190+
connectionCount: connectionCount,
191+
connectionCreateTime: connectionCreateTime,
192+
connectionRelaxedTimeout: connectionRelaxedTimeout,
193+
connectionHandoff: connectionHandoff,
194+
clientErrors: clientErrors,
195+
maintenanceNotifications: maintenanceNotifications,
196+
serverAddr: serverAddr,
197+
serverPort: serverPort,
198+
dbIndex: dbIndex,
156199
}
157200

158201
// Register global recorder

internal/otel/metrics.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,27 @@ type Recorder interface {
2727

2828
// RecordConnectionCreateTime records the time it took to create a new connection
2929
RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn)
30+
31+
// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed
32+
// delta: +1 for relaxed, -1 for unrelaxed
33+
// poolName: name of the connection pool (e.g., "main", "pubsub")
34+
// notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING")
35+
RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string)
36+
37+
// RecordConnectionHandoff records when a connection is handed off to another node
38+
// poolName: name of the connection pool (e.g., "main", "pubsub")
39+
RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string)
40+
41+
// RecordError records client errors (ASK, MOVED, handshake failures, etc.)
42+
// errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED")
43+
// statusCode: Redis response status code if available (e.g., "MOVED", "ASK")
44+
// isInternal: whether this is an internal error
45+
// retryAttempts: number of retry attempts made
46+
RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int)
47+
48+
// RecordMaintenanceNotification records when a maintenance notification is received
49+
// notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.)
50+
RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string)
3051
}
3152

3253
// Global recorder instance (initialized by extra/redisotel-native)
@@ -39,6 +60,10 @@ func SetGlobalRecorder(r Recorder) {
3960
// Unregister pool callbacks
4061
pool.SetConnectionStateChangeCallback(nil)
4162
pool.SetConnectionCreateTimeCallback(nil)
63+
pool.SetConnectionRelaxedTimeoutCallback(nil)
64+
pool.SetConnectionHandoffCallback(nil)
65+
pool.SetErrorCallback(nil)
66+
pool.SetMaintenanceNotificationCallback(nil)
4267
return
4368
}
4469
globalRecorder = r
@@ -52,6 +77,26 @@ func SetGlobalRecorder(r Recorder) {
5277
pool.SetConnectionCreateTimeCallback(func(ctx context.Context, duration time.Duration, cn *pool.Conn) {
5378
globalRecorder.RecordConnectionCreateTime(ctx, duration, cn)
5479
})
80+
81+
// Register pool callback to forward connection relaxed timeout changes to recorder
82+
pool.SetConnectionRelaxedTimeoutCallback(func(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) {
83+
globalRecorder.RecordConnectionRelaxedTimeout(ctx, delta, cn, poolName, notificationType)
84+
})
85+
86+
// Register pool callback to forward connection handoffs to recorder
87+
pool.SetConnectionHandoffCallback(func(ctx context.Context, cn *pool.Conn, poolName string) {
88+
globalRecorder.RecordConnectionHandoff(ctx, cn, poolName)
89+
})
90+
91+
// Register pool callback to forward errors to recorder
92+
pool.SetErrorCallback(func(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) {
93+
globalRecorder.RecordError(ctx, errorType, cn, statusCode, isInternal, retryAttempts)
94+
})
95+
96+
// Register pool callback to forward maintenance notifications to recorder
97+
pool.SetMaintenanceNotificationCallback(func(ctx context.Context, cn *pool.Conn, notificationType string) {
98+
globalRecorder.RecordMaintenanceNotification(ctx, cn, notificationType)
99+
})
55100
}
56101

57102
// RecordOperationDuration records the total operation duration.
@@ -76,5 +121,10 @@ func RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn
76121
type noopRecorder struct{}
77122

78123
func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {}
79-
func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {}
80-
func (noopRecorder) RecordConnectionCreateTime(context.Context, time.Duration, *pool.Conn) {}
124+
func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {}
125+
func (noopRecorder) RecordConnectionCreateTime(context.Context, time.Duration, *pool.Conn) {}
126+
func (noopRecorder) RecordConnectionRelaxedTimeout(context.Context, int, *pool.Conn, string, string) {
127+
}
128+
func (noopRecorder) RecordConnectionHandoff(context.Context, *pool.Conn, string) {}
129+
func (noopRecorder) RecordError(context.Context, string, *pool.Conn, string, bool, int) {}
130+
func (noopRecorder) RecordMaintenanceNotification(context.Context, *pool.Conn, string) {}

internal/pool/conn.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ func (cn *Conn) SetUsable(usable bool) {
241241
// SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades.
242242
// These timeouts will be used for all subsequent commands until the deadline expires.
243243
// Uses atomic operations for lock-free access.
244+
// Note: Metrics should be recorded by the caller (notification handler) which has context about
245+
// the notification type and pool name.
244246
func (cn *Conn) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
245247
cn.relaxedCounter.Add(1)
246248
cn.relaxedReadTimeoutNs.Store(int64(readTimeout))
@@ -275,6 +277,11 @@ func (cn *Conn) clearRelaxedTimeout() {
275277
cn.relaxedWriteTimeoutNs.Store(0)
276278
cn.relaxedDeadlineNs.Store(0)
277279
cn.relaxedCounter.Store(0)
280+
281+
// Note: Metrics for timeout unrelaxing are not recorded here because we don't have
282+
// context about which notification type or pool triggered the relaxation.
283+
// In practice, relaxed timeouts expire automatically via deadline, so explicit
284+
// unrelaxing metrics are less critical than the initial relaxation metrics.
278285
}
279286

280287
// HasRelaxedTimeout returns true if relaxed timeouts are currently active on this connection.

0 commit comments

Comments
 (0)