Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add metrics to LLMInvocation traces
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891))

## Version 0.2b0 (2025-10-14)

- Add jsonlines support to fsspec uploader
Expand Down
4 changes: 2 additions & 2 deletions util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-semantic-conventions ~= 0.57b0",
"opentelemetry-instrumentation ~= 0.58b0",
"opentelemetry-semantic-conventions ~= 0.58b0",
"opentelemetry-api>=1.31.0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@
from typing import Iterator, Optional

from opentelemetry import context as otel_context
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import (
Span,
SpanKind,
TracerProvider,
get_tracer,
set_span_in_context,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.span_utils import (
_apply_error_attributes,
_apply_finish_attributes,
Expand All @@ -88,13 +91,41 @@ class TelemetryHandler:
them as spans, metrics, and events.
"""

def __init__(self, tracer_provider: TracerProvider | None = None):
def __init__(
self,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
):
self._tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_36_0.value,
schema_url=Schemas.V1_37_0.value,
)
self._metrics_recorder: Optional[InvocationMetricsRecorder] = None
try:
meter = get_meter(__name__, meter_provider=meter_provider)
self._metrics_recorder = InvocationMetricsRecorder(meter)
except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught
self._metrics_recorder = None

def _record_llm_metrics(
self,
invocation: LLMInvocation,
span: Optional[Span],
*,
error_type: Optional[str] = None,
) -> None:
if self._metrics_recorder is None or span is None:
return
try:
self._metrics_recorder.record(
span,
invocation,
error_type=error_type,
)
except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught
pass

def start_llm(
self,
Expand All @@ -118,10 +149,12 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab
# TODO: Provide feedback that this invocation was not started
return invocation

_apply_finish_attributes(invocation.span, invocation)
span = invocation.span
_apply_finish_attributes(span, invocation)
self._record_llm_metrics(invocation, span)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

def fail_llm( # pylint: disable=no-self-use
Expand All @@ -132,10 +165,13 @@ def fail_llm( # pylint: disable=no-self-use
# TODO: Provide feedback that this invocation was not started
return invocation

_apply_error_attributes(invocation.span, error)
span = invocation.span
_apply_error_attributes(span, error)
error_type = getattr(error.type, "__qualname__", None)
self._record_llm_metrics(invocation, span, error_type=error_type)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

@contextmanager
Expand Down Expand Up @@ -165,6 +201,7 @@ def llm(

def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
) -> TelemetryHandler:
"""
Returns a singleton TelemetryHandler instance.
Expand All @@ -173,6 +210,9 @@ def get_telemetry_handler(
get_telemetry_handler, "_default_handler", None
)
if handler is None:
handler = TelemetryHandler(tracer_provider=tracer_provider)
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
setattr(get_telemetry_handler, "_default_handler", handler)
return handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
0.04,
0.08,
0.16,
0.32,
0.64,
1.28,
2.56,
5.12,
10.24,
20.48,
40.96,
81.92,
]

_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
1,
4,
16,
64,
256,
1024,
4096,
16384,
65536,
262144,
1048576,
4194304,
16777216,
67108864,
]


class Instruments:
def __init__(self, meter: Meter):
self.operation_duration_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION,
description="Duration of GenAI client operation",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
self.token_usage_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE,
description="Number of input and output tokens used by GenAI clients",
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Helpers for emitting GenAI metrics from LLM invocations."""

from __future__ import annotations

import time
from numbers import Number
from typing import Dict, Optional

from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.util.genai.instruments import Instruments
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.types import AttributeValue

_NS_PER_SECOND = 1_000_000_000


def _now_ns() -> int:
return time.time_ns()


def _get_span_start_time_ns(span: Optional[Span]) -> Optional[int]:
if span is None:
return None
for attr in ("start_time", "_start_time"):
value = getattr(span, attr, None)
if isinstance(value, int):
return value
return None


def _calculate_duration_seconds(span: Optional[Span]) -> Optional[float]:
"""Calculate duration in seconds from span start time to now."""
start_time_ns = _get_span_start_time_ns(span)
if start_time_ns is None:
return None
elapsed_ns = max(_now_ns() - start_time_ns, 0)
return elapsed_ns / _NS_PER_SECOND


class InvocationMetricsRecorder:
"""Records duration and token usage histograms for GenAI invocations."""

def __init__(self, meter: Meter):
instruments = Instruments(meter)
self._duration_histogram: Histogram = (
instruments.operation_duration_histogram
)
self._token_histogram: Histogram = instruments.token_usage_histogram

def record(
self,
span: Optional[Span],
invocation: LLMInvocation,
*,
error_type: Optional[str] = None,
) -> None:
"""Record duration and token metrics for an invocation if possible."""
if span is None:
return

tokens: list[tuple[int, str]] = []
if isinstance(invocation.input_tokens, int):
tokens.append(
(
invocation.input_tokens,
GenAI.GenAiTokenTypeValues.INPUT.value,
)
)
if isinstance(invocation.output_tokens, int):
tokens.append(
(
invocation.output_tokens,
GenAI.GenAiTokenTypeValues.COMPLETION.value,
)
)

if not tokens:
return

attributes: Dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value
}
if invocation.request_model:
attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model
if invocation.provider:
attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider
if invocation.response_model_name:
attributes[GenAI.GEN_AI_RESPONSE_MODEL] = (
invocation.response_model_name
)

# Calculate duration from span timing
duration_seconds = _calculate_duration_seconds(span)

span_context = set_span_in_context(span)
if error_type:
attributes["error.type"] = error_type

if (
duration_seconds is not None
and isinstance(duration_seconds, Number)
and duration_seconds >= 0
):
duration_attributes: Dict[str, AttributeValue] = dict(attributes)
self._duration_histogram.record(
float(duration_seconds),
attributes=duration_attributes,
context=span_context,
)

for token in tokens:
token_attributes: Dict[str, AttributeValue] = dict(attributes)
token_attributes[GenAI.GEN_AI_TOKEN_TYPE] = token[1]
self._token_histogram.record(
token[0],
attributes=token_attributes,
context=span_context,
)


__all__ = ["InvocationMetricsRecorder"]
Loading