diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d41798b..7f4e33dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,3 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 > Use [this search for a list of all CHANGELOG.md files in this repo](https://github.com/search?q=repo%3Aalibaba%2Floongsuite-python-agent+path%3A**%2FCHANGELOG.md&type=code). ## Unreleased + +### Added + +- **loongsuite-instrumentation-google-adk**: Add initial support for Google Agent Development Kit (ADK) #35 diff --git a/instrumentation-loongsuite/README.md b/instrumentation-loongsuite/README.md index 53a89c7c..0eccb23c 100644 --- a/instrumentation-loongsuite/README.md +++ b/instrumentation-loongsuite/README.md @@ -4,5 +4,6 @@ | [loongsuite-instrumentation-agentscope](./loongsuite-instrumentation-agentscope) | agentscope >= 0.1.5.dev0 | No | development | [loongsuite-instrumentation-agno](./loongsuite-instrumentation-agno) | agno >= 1.5.0 | No | development | [loongsuite-instrumentation-dify](./loongsuite-instrumentation-dify) | dify | No | development +| [loongsuite-instrumentation-google-adk](./loongsuite-instrumentation-google-adk) | google-adk >= 0.1.0 | Yes | experimental | [loongsuite-instrumentation-langchain](./loongsuite-instrumentation-langchain) | langchain_core >= 0.1.0 | Yes | development | [loongsuite-instrumentation-mcp](./loongsuite-instrumentation-mcp) | mcp>=1.3.0 | Yes | development \ No newline at end of file diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/README.md b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/README.md new file mode 100644 index 00000000..3a2b2089 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/README.md @@ -0,0 +1,331 @@ +# OpenTelemetry Google ADK Instrumentation + +[![OpenTelemetry](https://img.shields.io/badge/OpenTelemetry-GenAI_Semantic_Conventions-blue)](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/) + +Google ADK (Agent Development Kit) Python Agent provides comprehensive observability for Google ADK applications using OpenTelemetry. + +## Features + +- ✅ **Automatic Instrumentation**: Zero-code integration via `opentelemetry-instrument` +- ✅ **Manual Instrumentation**: Programmatic control via `GoogleAdkInstrumentor` +- ✅ **GenAI Semantic Conventions**: Full compliance with OpenTelemetry GenAI standards +- ✅ **Comprehensive Spans**: `invoke_agent`, `chat`, `execute_tool` +- ✅ **Standard Metrics**: Operation duration and token usage +- ✅ **Content Capture**: Optional message and response content capture +- ✅ **Google ADK native instrumentation Compatible**: Works seamlessly with ADK native instrumentation + +## Quick Start + +```bash +# Install +pip install google-adk litellm +pip install ./instrumentation-loongsuite/loongsuite-instrumentation-google-adk + +# Configure +export DASHSCOPE_API_KEY=your-api-key + +# Run with auto instrumentation +opentelemetry-instrument \ + --traces_exporter console \ + --service_name my-adk-app \ + python your_app.py +``` + +For details on LoongSuite and Jaeger setup, refer to [LoongSuite Documentation](https://github.com/alibaba/loongsuite-python-agent/blob/main/README.md). + +## Installing Google ADK Instrumentation + +```shell +# OpenTelemetry Core +pip install opentelemetry-distro opentelemetry-exporter-otlp +opentelemetry-bootstrap -a install + +# Google ADK and LLM Dependencies +pip install google-adk>=0.1.0 +pip install litellm + +# Demo Application Dependencies (optional, only if running examples) +pip install fastapi uvicorn pydantic + +# GoogleAdkInstrumentor +git clone https://github.com/alibaba/loongsuite-python-agent.git +cd loongsuite-python-agent +pip install ./instrumentation-loongsuite/loongsuite-instrumentation-google-adk +``` + +## Collect Data + +Here's a simple demonstration of Google ADK instrumentation. The demo uses: + +- A [Google ADK application](examples/main.py) that demonstrates agent interactions with multiple tools + +### Running the Demo + +> **Note**: The demo uses DashScope (Alibaba Cloud LLM service) by default. You need to set the `DASHSCOPE_API_KEY` environment variable. + +#### Option 1: Using OpenTelemetry Auto Instrumentation + +```bash +# Set your DashScope API key +export DASHSCOPE_API_KEY=your-dashscope-api-key + +# Enable content capture (optional, for debugging) +export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true + +# Run with auto instrumentation +opentelemetry-instrument \ + --traces_exporter console \ + --service_name demo-google-adk \ + python examples/main.py +``` + +#### Option 2: Using Loongsuite + +```bash +# Set your DashScope API key +export DASHSCOPE_API_KEY=your-dashscope-api-key + +# Enable content capture (optional, for debugging) +export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true + +# Run with loongsuite instrumentation +loongsuite-instrument \ + --traces_exporter console \ + --service_name demo-google-adk \ + python examples/main.py +``` + +#### Option 3: Export to Jaeger + +```bash +# Set your DashScope API key +export DASHSCOPE_API_KEY=your-dashscope-api-key + +# Configure OTLP exporter +export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true +export OTEL_TRACES_EXPORTER=otlp +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +export OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Run the application +opentelemetry-instrument \ + --service_name demo-google-adk \ + python examples/main.py +``` + +### Expected Results + +The instrumentation will generate traces showing the Google ADK operations: + +#### Tool Execution Span Example + +```json +{ + "name": "execute_tool get_current_time", + "context": { + "trace_id": "xxx", + "span_id": "xxx", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": "xxx", + "start_time": "2025-10-23T06:36:33.858459Z", + "end_time": "2025-10-23T06:36:33.858779Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.name": "get_current_time", + "gen_ai.tool.description": "xxx", + "input.value": "{xxx}", + "output.value": "{xxx}" + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "telemetry.sdk.language": "python", + "telemetry.sdk.name": "opentelemetry", + "telemetry.sdk.version": "1.37.0", + "service.name": "demo-google-adk" + }, + "schema_url": "" + } +} +``` + +#### LLM Chat Span Example + +```json +{ + "name": "chat qwen-max", + "kind": "SpanKind.CLIENT", + "attributes": { + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "qwen-max", + "gen_ai.response.model": "qwen-max", + "gen_ai.usage.input_tokens": 150, + "gen_ai.usage.output_tokens": 45 + } +} +``` + +#### Agent Invocation Span Example + +```json +{ + "name": "invoke_agent ToolAgent", + "kind": "SpanKind.CLIENT", + "attributes": { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.name": "ToolAgent", + "input.value": "[{\"role\": \"user\", \"parts\": [{\"type\": \"text\", \"content\": \"现在几点了?\"}]}]", + "output.value": "[{\"role\": \"assistant\", \"parts\": [{\"type\": \"text\", \"content\": \"当前时间是 2025-11-27 14:36:33\"}]}]" + } +} +``` + +### Viewing in Jaeger + +After [setting up Jaeger](https://www.jaegertracing.io/docs/latest/getting-started/), you can visualize the complete trace hierarchy in the Jaeger UI, showing the relationships between Runner, Agent, LLM, and Tool spans + +## Configuration + +### Environment Variables + +The following environment variables can be used to configure the Google ADK instrumentation: + +| Variable | Description | Default | +|----------|-------------|---------| +| `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` | Capture message content in traces | `false` | +| `DASHSCOPE_API_KEY` | DashScope API key (required for demo) | - | + +### Programmatic Configuration + +You can also configure the instrumentation programmatically: + +```python +from opentelemetry.instrumentation.google_adk import GoogleAdkInstrumentor + +# Configure the instrumentor +instrumentor = GoogleAdkInstrumentor() + +# Enable instrumentation with custom configuration +instrumentor.instrument( + tracer_provider=your_tracer_provider, + meter_provider=your_meter_provider +) +``` + +## Supported Features + +### Traces + +The Google ADK instrumentation automatically creates traces for: + +- **Agent Runs**: Complete agent execution cycles +- **Tool Calls**: Individual tool invocations +- **Model Interactions**: LLM requests and responses +- **Session Management**: User session tracking +- **Error Handling**: Exception and error tracking + +### Metrics + +The instrumentation follows the [OpenTelemetry GenAI Semantic Conventions for Metrics](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-metrics.md) and provides the following **standard client metrics**: + +#### 1. `gen_ai.client.operation.duration` (Histogram) + +Records the duration of GenAI operations in seconds. + +**Instrument Type**: Histogram +**Unit**: `s` (seconds) +**Status**: Development + +**Required Attributes**: +- `gen_ai.operation.name`: Operation being performed (e.g., `chat`, `invoke_agent`, `execute_tool`) +- `gen_ai.provider.name`: Provider name (e.g., `google_adk`) + +**Conditionally Required Attributes**: +- `error.type`: Error type (only if operation ended in error) +- `gen_ai.request.model`: Model name (if available) + +**Recommended Attributes**: +- `gen_ai.response.model`: Response model name +- `server.address`: Server address +- `server.port`: Server port + +**Example Values**: +- LLM operation: `gen_ai.operation.name="chat"`, `gen_ai.request.model="gemini-pro"`, `duration=1.5s` +- Agent operation: `gen_ai.operation.name="invoke_agent"`, `gen_ai.request.model="math_tutor"`, `duration=2.3s` +- Tool operation: `gen_ai.operation.name="execute_tool"`, `gen_ai.request.model="calculator"`, `duration=0.5s` + +#### 2. `gen_ai.client.token.usage` (Histogram) + +Records the number of tokens used in GenAI operations. + +**Instrument Type**: Histogram +**Unit**: `{token}` +**Status**: Development + +**Required Attributes**: +- `gen_ai.operation.name`: Operation being performed +- `gen_ai.provider.name`: Provider name +- `gen_ai.token.type`: Token type (`input` or `output`) + +**Conditionally Required Attributes**: +- `gen_ai.request.model`: Model name (if available) + +**Recommended Attributes**: +- `gen_ai.response.model`: Response model name +- `server.address`: Server address +- `server.port`: Server port + +**Example Values**: +- Input tokens: `gen_ai.token.type="input"`, `gen_ai.request.model="gemini-pro"`, `count=100` +- Output tokens: `gen_ai.token.type="output"`, `gen_ai.request.model="gemini-pro"`, `count=50` + +**Note**: These metrics use **Histogram** instrument type (not Counter) and follow the standard OpenTelemetry GenAI semantic conventions. All other metrics (like `genai.agent.runs.count`, etc.) are non-standard and have been removed to ensure compliance with the latest OTel specifications. + +### Semantic Conventions + +This instrumentation follows the OpenTelemetry GenAI semantic conventions: + +- [GenAI Spans](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-spans.md) +- [GenAI Agent Spans](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md) +- [GenAI Metrics](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-metrics.md) + + + +## Troubleshooting + +### Common Issues + +1. **Module Import Error**: If you encounter `No module named 'google.adk.runners'`, ensure that `google-adk` is properly installed: + ```bash + pip install google-adk>=0.1.0 + ``` + +2. **DashScope API Error**: If you see authentication errors, verify your API key is correctly set: + ```bash + export DASHSCOPE_API_KEY=your-api-key + # Verify it's set + echo $DASHSCOPE_API_KEY + ``` + +3. **Instrumentation Not Working**: + - Check that the instrumentation is enabled and the Google ADK application is using the `Runner` class + - Verify you see the log message: `Plugin 'opentelemetry_adk_observability' registered` + - For manual instrumentation, ensure you call `GoogleAdkInstrumentor().instrument()` before creating the Runner + +4. **Missing Traces**: + - Verify that the OpenTelemetry exporters are properly configured + - Check the `OTEL_TRACES_EXPORTER` environment variable is set (e.g., `console`, `otlp`) + - For OTLP exporter, ensure the endpoint is reachable + + +## References + +- [OpenTelemetry Project](https://opentelemetry.io/) +- [Google ADK Documentation](https://google.github.io/adk-docs/) +- [GenAI Semantic Conventions](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/main.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/main.py new file mode 100644 index 00000000..24dcc3bb --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/main.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 +""" +工具使用示例 (HTTP 服务版本) +展示如何在 ADK Agent 中使用各种工具函数并部署为 HTTP 服务 +""" + +import asyncio +import logging +import os +import sys +from datetime import datetime +from typing import Any, Dict, Optional + +import uvicorn +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +# 设置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# 检查环境变量 +api_key = os.getenv("DASHSCOPE_API_KEY") +if not api_key: + print("❌ 请设置 DASHSCOPE_API_KEY 环境变量:") + print(" export DASHSCOPE_API_KEY='your-dashscope-api-key'") + print("🔗 获取地址: https://dashscope.console.aliyun.com/") + sys.exit(1) + +try: + # 导入 ADK 相关模块 + from google.adk.agents import LlmAgent + from google.adk.models.lite_llm import LiteLlm + from google.adk.runners import Runner + from google.adk.sessions.in_memory_session_service import ( + InMemorySessionService, + ) + from google.adk.tools import FunctionTool + from google.genai import types +except ImportError as e: + print(f"❌ 导入 ADK 模块失败: {e}") + print("📦 请确保已正确安装 Google ADK:") + print(" pip install google-adk") + sys.exit(1) + +# 导入自定义工具 +try: + from tools import ( + calculate_math, + check_prime_numbers, + get_current_time, + get_weather_info, + roll_dice, + search_web, + translate_text, + ) +except ImportError as e: + print(f"❌ 导入自定义工具失败: {e}") + sys.exit(1) + +# 配置阿里云百炼模型 +DASHSCOPE_CONFIG = { + "model": "dashscope/qwen-plus", + "api_key": api_key, + "temperature": 0.7, + "max_tokens": 1000, +} + +# 设置LiteLLM的环境变量 +os.environ["DASHSCOPE_API_KEY"] = api_key + +# ==================== 数据模型定义 ==================== + + +class ToolsRequest(BaseModel): + """工具使用请求模型""" + + task: str + session_id: Optional[str] = None + user_id: Optional[str] = "default_user" + + +class ApiResponse(BaseModel): + """API 响应模型""" + + success: bool + message: str + data: Optional[Dict[str, Any]] = None + timestamp: str + session_id: Optional[str] = None + + +def extract_content_text(content) -> str: + """ + 从 Content 对象中提取文本内容 + + Args: + content: Content 对象,包含 parts 列表 + + Returns: + 提取到的文本内容 + """ + if not content: + return "" + + # 如果 content 是字符串,直接返回 + if isinstance(content, str): + return content + + # 如果 content 有 parts 属性 + if hasattr(content, "parts") and content.parts: + text_parts = [] + for part in content.parts: + if hasattr(part, "text") and part.text: + text_parts.append(part.text) + return "".join(text_parts) + + # 如果 content 有 text 属性 + if hasattr(content, "text") and content.text: + return content.text + + # 如果都没有,返回空字符串 + return "" + + +async def create_agent() -> LlmAgent: + """创建带工具的 LLM Agent 实例""" + + # 创建 LiteLlm 模型实例 + dashscope_model = LiteLlm( + model=DASHSCOPE_CONFIG["model"], + api_key=DASHSCOPE_CONFIG["api_key"], + temperature=DASHSCOPE_CONFIG["temperature"], + max_tokens=DASHSCOPE_CONFIG["max_tokens"], + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + + # 创建工具 + time_tool = FunctionTool(func=get_current_time) + calc_tool = FunctionTool(func=calculate_math) + dice_tool = FunctionTool(func=roll_dice) + prime_tool = FunctionTool(func=check_prime_numbers) + weather_tool = FunctionTool(func=get_weather_info) + search_tool = FunctionTool(func=search_web) + translate_tool = FunctionTool(func=translate_text) + + # 创建 Agent + agent = LlmAgent( + name="tools_assistant", + model=dashscope_model, + instruction="""你是一个功能丰富的智能助手,可以使用多种工具来帮助用户。 + +你可以使用的工具包括: +1. 🕐 get_current_time - 获取当前时间 +2. 🧮 calculate_math - 进行数学计算 +3. 🎲 roll_dice - 掷骰子 +4. 🔢 check_prime_numbers - 检查质数 +5. 🌤️ get_weather_info - 获取天气信息 +6. 🔍 search_web - 搜索网络信息 +7. 🌍 translate_text - 翻译文本 + +使用原则: +- 用中文与用户交流 +- 对用户友好和专业 +- 当需要使用工具时,主动调用相应的工具函数 +- 基于工具返回的结果给出完整回答 +- 如果用户请求的功能没有对应工具,要诚实说明""", + description="一个可以使用多种工具的智能助手", + tools=[ + time_tool, + calc_tool, + dice_tool, + prime_tool, + weather_tool, + search_tool, + translate_tool, + ], + ) + + return agent + + +# ==================== 服务实现 ==================== + +# 全局变量存储服务组件 +session_service = None +runner = None +agent = None + + +async def initialize_services(): + """初始化服务组件""" + global session_service, runner, agent + + if session_service is None: + logger.info("🔧 初始化服务组件...") + session_service = InMemorySessionService() + agent = await create_agent() + runner = Runner( + app_name="tools_agent_demo", + agent=agent, + session_service=session_service, + ) + logger.info("✅ 服务组件初始化完成") + + +async def run_conversation( + user_input: str, user_id: str, session_id: str = "default_session" +) -> str: + """运行对话并返回回复""" + try: + # 初始化服务 + await initialize_services() + + # 直接创建新会话,不检查是否存在 + logger.info(f"创建新会话: {session_id}") + session = await session_service.create_session( + app_name="tools_agent_demo", user_id=user_id, session_id=session_id + ) + + logger.info(f"使用会话: {session.id}") + + # 创建用户消息 + user_message = types.Content( + role="user", parts=[types.Part(text=user_input)] + ) + + # 运行对话 + events = [] + async for event in runner.run_async( + user_id=user_id, session_id=session.id, new_message=user_message + ): + events.append(event) + + # 获取回复 + for event in events: + if hasattr(event, "content") and event.content: + # 提取 Content 对象中的文本 + content_text = extract_content_text(event.content) + if content_text: + logger.info(f"收到回复: {content_text[:100]}...") + return content_text + + logger.warning("未收到有效回复") + return "抱歉,我没有收到有效的回复。" + + except Exception as e: + logger.error(f"处理消息时出错: {e}") + import traceback + + logger.error(f"详细错误信息: {traceback.format_exc()}") + raise HTTPException(status_code=500, detail=f"处理消息失败: {str(e)}") + + +# ==================== FastAPI 应用 ==================== + +# 创建 FastAPI 应用 +app = FastAPI( + title="ADK 工具使用 Agent HTTP 服务", + description="基于 Google ADK 框架的工具使用 Agent HTTP 服务", + version="1.0.0", +) + + +@app.on_event("startup") +async def startup_event(): + """应用启动时初始化服务""" + logger.info("🚀 启动工具使用 Agent HTTP 服务...") + await initialize_services() + logger.info("✅ 服务启动完成") + + +@app.get("/") +async def root(): + """服务状态检查""" + return ApiResponse( + success=True, + message="工具使用 Agent HTTP 服务运行正常", + data={ + "service": "ADK Tools Agent HTTP Service", + "version": "1.0.0", + "available_tools": [ + "get_current_time: 获取当前时间", + "calculate_math: 数学计算", + "roll_dice: 投骰子", + "check_prime_numbers: 质数检查", + "get_weather_info: 天气信息", + "search_web: 网络搜索", + "translate_text: 文本翻译", + ], + "capabilities": [ + "工具自动调用", + "多种实用功能", + "智能任务处理", + "结果整合分析", + ], + }, + timestamp=datetime.now().isoformat(), + ) + + +@app.post("/tools") +async def tools(request: ToolsRequest): + """工具使用任务处理接口""" + try: + session_id = ( + request.session_id + or f"tools_{request.user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + ) + + response = await run_conversation( + user_input=request.task, + user_id=request.user_id or "default_user", + session_id=session_id, + ) + + return ApiResponse( + success=True, + message="工具任务处理成功", + data={"task": request.task, "response": response}, + timestamp=datetime.now().isoformat(), + session_id=session_id, + ) + + except Exception as e: + logger.error(f"工具任务处理错误: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """全局异常处理""" + logger.error(f"全局异常: {exc}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "message": f"服务器内部错误: {str(exc)}", + "timestamp": datetime.now().isoformat(), + }, + ) + + +def main(): + """主函数 - 启动 HTTP 服务""" + print("🚀 ADK 工具使用 Agent HTTP 服务") + print("=" * 50) + print("🔑 API Key 已设置") + print("🔧 可用工具:") + print(" 1. get_current_time - 获取当前时间") + print(" 2. calculate_math - 数学计算") + print(" 3. roll_dice - 投骰子") + print(" 4. check_prime_numbers - 质数检查") + print(" 5. get_weather_info - 天气信息") + print(" 6. search_web - 网络搜索") + print(" 7. translate_text - 文本翻译") + print("=" * 50) + print("\n📡 HTTP 接口说明:") + print(" GET / - 服务状态检查") + print(" POST /tools - 工具使用任务处理接口") + print("\n💡 示例请求:") + print(" curl -X POST http://localhost:8000/tools \\") + print(" -H 'Content-Type: application/json' \\") + print(' -d \'{"task": "现在几点了?"}\'') + print("\n🌐 启动服务...") + + # 启动 FastAPI 服务 + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8000, + log_level="info", + access_log=True, + ) + + +# 保留原有的命令行测试功能 +async def run_test_conversation(): + """运行测试对话""" + print("🚀 启动工具使用示例") + print("=" * 50) + print("🔑 API Key 已设置") + print(f"🤖 模型: {DASHSCOPE_CONFIG['model']}") + print("=" * 50) + + try: + # 初始化服务 + await initialize_services() + print("✅ Agent 初始化成功") + + # 示例对话 + test_inputs = [ + "现在几点了?", + "计算 123 乘以 456", + "掷一个六面骰子", + "检查 17, 25, 29, 33 是否为质数", + "北京的天气怎么样?", + "搜索人工智能的定义", + "翻译'你好'成英文", + ] + + session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + for i, user_input in enumerate(test_inputs, 1): + print(f"\n💬 测试 {i}: {user_input}") + print("-" * 30) + + response = await run_conversation( + user_input, "default_user", session_id + ) + print(f"🤖 回复: {response}") + + # 添加延迟避免请求过快 + await asyncio.sleep(1) + + print("\n✅ 所有测试已完成,程序结束") + + except Exception as e: + print(f"❌ 运行失败: {e}") + logger.exception("运行失败") + + +def run_test(): + """运行测试对话""" + asyncio.run(run_test_conversation()) + + +if __name__ == "__main__": + # 检查是否要运行测试模式 + if len(sys.argv) > 1 and sys.argv[1] == "test": + run_test() + else: + # 启动 HTTP 服务 + try: + main() + except KeyboardInterrupt: + print("\n👋 服务已停止") + except Exception as e: + print(f"❌ 服务启动失败: {e}") + logger.exception("Service startup failed") diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/tools.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/tools.py new file mode 100644 index 00000000..9ce4d027 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/examples/tools.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +""" +工具函数定义 +包含各种类型的工具函数供 Agent 使用 +""" + +import math +import random +from datetime import datetime +from typing import Any, Dict, List + + +def get_current_time() -> str: + """ + 获取当前时间 + + Returns: + 当前时间的字符串表示 + """ + return f"当前时间是: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}" + + +def calculate_math(expression: str) -> str: + """ + 数学计算工具函数 + + Args: + expression: 数学表达式字符串 + + Returns: + 计算结果的字符串 + """ + try: + # 安全的数学表达式计算 + allowed_names = { + k: v for k, v in math.__dict__.items() if not k.startswith("__") + } + allowed_names.update( + {"abs": abs, "round": round, "pow": pow, "min": min, "max": max} + ) + + result = eval(expression, {"__builtins__": {}}, allowed_names) + return f"🔢 计算结果:{expression} = {result}" + except Exception as e: + return f"❌ 计算错误:{str(e)}" + + +def roll_dice(sides: int = 6) -> int: + """ + 掷骰子工具函数 + + Args: + sides: 骰子面数,默认为6 + + Returns: + 掷骰子的结果 + """ + if sides < 2: + sides = 6 + return random.randint(1, sides) + + +def check_prime_numbers(numbers: List[int]) -> Dict[str, Any]: + """ + 检查数字是否为质数 + + Args: + numbers: 要检查的数字列表 + + Returns: + 包含检查结果的字典 + """ + + def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + for i in range(3, int(math.sqrt(n)) + 1, 2): + if n % i == 0: + return False + return True + + results = {} + primes = [] + non_primes = [] + + for num in numbers: + if is_prime(num): + primes.append(num) + else: + non_primes.append(num) + results[str(num)] = is_prime(num) + + return { + "results": results, + "primes": primes, + "non_primes": non_primes, + "summary": f"在 {numbers} 中,质数有: {primes},非质数有: {non_primes}", + } + + +def get_weather_info(city: str) -> str: + """ + 获取天气信息工具函数(模拟) + + Args: + city: 城市名称 + + Returns: + 天气信息字符串 + """ + # 模拟天气数据 + weather_data = { + "北京": "晴朗,温度 15°C,湿度 45%,微风", + "上海": "多云,温度 18°C,湿度 60%,东南风", + "深圳": "小雨,温度 25°C,湿度 80%,南风", + "杭州": "阴天,温度 20°C,湿度 55%,西北风", + "广州": "晴朗,温度 28°C,湿度 65%,东风", + } + + weather = weather_data.get(city, f"{city}的天气信息暂时无法获取") + return f"📍 {city}的天气:{weather}" + + +def search_web(query: str) -> str: + """ + 网络搜索工具函数(模拟) + + Args: + query: 搜索查询 + + Returns: + 搜索结果字符串 + """ + # 模拟搜索结果 + mock_results = { + "人工智能": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。", + "机器学习": "机器学习是人工智能的一个分支,是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。", + "深度学习": "深度学习是机器学习的一个分支,它基于人工神经网络,利用多层非线性变换对数据进行特征提取和转换。", + "自然语言处理": "自然语言处理是计算机科学领域与人工智能领域中的一个重要方向,它研究能实现人与计算机之间用自然语言进行有效通信的各种理论和方法。", + } + + for key, value in mock_results.items(): + if key in query: + return value + + return f"🔍 关于'{query}'的搜索结果:这是模拟的搜索结果,实际应用中会连接真实的搜索引擎API。" + + +def translate_text(text: str, target_language: str = "en") -> str: + """ + 文本翻译工具函数(模拟) + + Args: + text: 要翻译的文本 + target_language: 目标语言代码 + + Returns: + 翻译结果字符串 + """ + # 模拟翻译结果 + translations = { + "你好": "Hello", + "谢谢": "Thank you", + "再见": "Goodbye", + "人工智能": "Artificial Intelligence", + "机器学习": "Machine Learning", + } + + if target_language.lower() == "en": + return translations.get(text, f"Translated: {text}") + else: + return f"翻译到{target_language}:{text}" + + +# 导出所有工具函数 +__all__ = [ + "get_current_time", + "calculate_math", + "roll_dice", + "check_prime_numbers", + "get_weather_info", + "search_web", + "translate_text", +] diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/pyproject.toml new file mode 100644 index 00000000..6138262f --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/pyproject.toml @@ -0,0 +1,85 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "loongsuite-instrumentation-google-adk" +dynamic = ["version"] +description = "OpenTelemetry instrumentation for Google Agent Development Kit (ADK)" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.8" +authors = [ + { name = "LoongSuite Python Agent Authors" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "opentelemetry-api ~= 1.27", + "opentelemetry-sdk ~= 1.27", + "opentelemetry-semantic-conventions ~= 0.48b0", + "wrapt >= 1.0.0, < 2.0.0", + "google-adk >= 0.1.0", +] + +[project.optional-dependencies] +test = [ + "pytest >= 7.0.0", + "pytest-asyncio >= 0.21.0", + "pytest-cov >= 4.0.0", + "google-adk >= 0.1.0", +] +instruments = [ + "google-adk >= 0.1.0", +] + +[project.urls] +Homepage = "https://github.com/alibaba/loongsuite-python-agent/tree/main/instrumentation-loongsuite/loongsuite-instrumentation-google-adk" +Repository = "https://github.com/alibaba/loongsuite-python-agent" + +[project.entry-points.opentelemetry_instrumentor] +google-adk = "opentelemetry.instrumentation.google_adk:GoogleAdkInstrumentor" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/google_adk/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "src", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.pytest.ini_options] +minversion = "7.0" +testpaths = ["tests"] +asyncio_mode = "auto" +addopts = "--cov=opentelemetry/instrumentation/google_adk --cov-report=term-missing --cov-report=html" + +[tool.coverage.run] +source = ["src"] +branch = true + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "@abstractmethod", +] + + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/__init__.py new file mode 100644 index 00000000..4d950a58 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/__init__.py @@ -0,0 +1,3 @@ +"""OpenTelemetry namespace package.""" + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/__init__.py new file mode 100644 index 00000000..6dfadabb --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/__init__.py @@ -0,0 +1,3 @@ +"""OpenTelemetry instrumentation namespace package.""" + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/__init__.py new file mode 100644 index 00000000..4b5a0f4f --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/__init__.py @@ -0,0 +1,215 @@ +""" +OpenTelemetry Instrumentation for Google ADK. + +This package provides OpenTelemetry instrumentation for Google Agent Development Kit (ADK) +applications, following the OpenTelemetry GenAI semantic conventions. + +Usage: + # Manual instrumentation + from opentelemetry.instrumentation.google_adk import GoogleAdkInstrumentor + GoogleAdkInstrumentor().instrument() + + # Auto instrumentation (via opentelemetry-instrument) + # opentelemetry-instrument python your_app.py +""" + +import logging +from typing import Collection, Optional + +from wrapt import wrap_function_wrapper + +from opentelemetry import metrics as metrics_api +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv.schemas import Schemas + +from .internal._plugin import GoogleAdkObservabilityPlugin +from .version import __version__ + +_logger = logging.getLogger(__name__) + +# Module-level storage for the plugin instance +# This ensures the plugin persists across different instrumentor instances +# and supports both manual and auto instrumentation modes +_global_plugin: Optional[GoogleAdkObservabilityPlugin] = None + + +def _create_plugin_if_needed(tracer_provider=None, meter_provider=None): + """ + Create or get the global plugin instance. + + This function ensures that only one plugin instance exists for the + entire process, which is necessary for auto instrumentation to work + correctly when the instrumentor may be instantiated multiple times. + + Args: + tracer_provider: Optional tracer provider + meter_provider: Optional meter provider + + Returns: + GoogleAdkObservabilityPlugin instance + """ + global _global_plugin + + if _global_plugin is None: + # Get tracer and meter + tracer = trace_api.get_tracer( + __name__, + __version__, + tracer_provider, + schema_url=Schemas.V1_28_0.value, + ) + + meter = metrics_api.get_meter( + __name__, + __version__, + meter_provider, + schema_url=Schemas.V1_28_0.value, + ) + + _global_plugin = GoogleAdkObservabilityPlugin(tracer, meter) + _logger.debug("Created global GoogleAdkObservabilityPlugin instance") + + return _global_plugin + + +def _runner_init_wrapper(wrapped, instance, args, kwargs): + """ + Wrapper for Runner.__init__ to auto-inject the observability plugin. + + This is a module-level function (not a method) to avoid issues with + instance state in auto instrumentation scenarios where the instrumentor + may be instantiated multiple times. + + Args: + wrapped: Original wrapped function + instance: Runner instance + args: Positional arguments + kwargs: Keyword arguments + + Returns: + Result of the original function + """ + # Get or create the plugin + plugin = _create_plugin_if_needed() + + if plugin: + # Get or create plugins list + plugins = kwargs.get("plugins", []) + if not isinstance(plugins, list): + plugins = [plugins] if plugins else [] + + # Add our plugin if not already present + if plugin not in plugins: + plugins.append(plugin) + kwargs["plugins"] = plugins + _logger.debug( + "Injected OpenTelemetry observability plugin into Runner" + ) + + # Call the original __init__ + return wrapped(*args, **kwargs) + + +class GoogleAdkInstrumentor(BaseInstrumentor): + """ + OpenTelemetry instrumentor for Google ADK. + + This instrumentor automatically injects observability into Google ADK applications + following OpenTelemetry GenAI semantic conventions. + + Supports both manual and auto instrumentation modes: + - Manual: GoogleAdkInstrumentor().instrument() + - Auto: opentelemetry-instrument python your_app.py + """ + + def __init__(self): + """Initialize the instrumentor.""" + super().__init__() + + @property + def _plugin(self): + """ + Get the global plugin instance. + + This property provides backward compatibility with code that accesses + self.instrumentor._plugin (e.g., in tests). + + Returns: + The global plugin instance + """ + return _global_plugin + + def instrumentation_dependencies(self) -> Collection[str]: + """ + Return the list of instrumentation dependencies. + + Returns: + Collection of required packages + """ + return ["google-adk >= 0.1.0"] + + def _instrument(self, **kwargs): + """ + Instrument the Google ADK library. + + This method works in both manual and auto instrumentation modes by + using a module-level global plugin instance that persists across + multiple instrumentor instantiations. + + Args: + **kwargs: Optional keyword arguments: + - tracer_provider: Custom tracer provider + - meter_provider: Custom meter provider + """ + # Check if google-adk is installed + import importlib.util + + if importlib.util.find_spec("google.adk.runners") is None: + _logger.warning( + "google-adk not found, instrumentation will not be applied" + ) + return + + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + + # Create or get the global plugin instance + _create_plugin_if_needed(tracer_provider, meter_provider) + + # Wrap the Runner initialization to auto-inject our plugin + try: + wrap_function_wrapper( + "google.adk.runners", + "Runner.__init__", + _runner_init_wrapper, # Use module-level function + ) + _logger.info("Google ADK instrumentation enabled") + except Exception as e: + _logger.exception(f"Failed to instrument Google ADK: {e}") + + def _uninstrument(self, **kwargs): + """ + Uninstrument the Google ADK library. + + Args: + **kwargs: Optional keyword arguments + """ + global _global_plugin + + try: + # Unwrap the Runner initialization + from google.adk.runners import Runner + + unwrap(Runner, "__init__") + + # Clear the global plugin + _global_plugin = None + + _logger.info("Google ADK instrumentation disabled") + except Exception as e: + _logger.exception(f"Failed to uninstrument Google ADK: {e}") + + +__all__ = ["GoogleAdkInstrumentor"] diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/__init__.py new file mode 100644 index 00000000..1d78902e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/__init__.py @@ -0,0 +1 @@ +"""Internal implementation modules for Google ADK instrumentation.""" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_extractors.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_extractors.py new file mode 100644 index 00000000..bfd7abda --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_extractors.py @@ -0,0 +1,561 @@ +""" +ADK Attribute Extractors following OpenTelemetry GenAI Semantic Conventions. + +This module extracts trace attributes from Google ADK objects according +to OpenTelemetry GenAI semantic conventions (latest version). +""" + +import logging +from typing import Any, Dict, Optional + +from google.adk.agents.base_agent import BaseAgent +from google.adk.agents.callback_context import CallbackContext +from google.adk.agents.invocation_context import InvocationContext +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.adk.tools.base_tool import BaseTool +from google.adk.tools.tool_context import ToolContext + +from ._utils import ( + extract_content_safely_for_input_output, + process_content, + safe_json_dumps, + safe_json_dumps_for_input_output, + safe_json_dumps_large, + should_capture_content, +) + +_logger = logging.getLogger(__name__) + + +class AdkAttributeExtractors: + """ + Attribute extractors for Google ADK following OpenTelemetry GenAI semantic conventions. + + Extracts trace attributes from ADK objects according to: + - gen_ai.* attributes for GenAI-specific information + - Standard OpenTelemetry attributes for general information + """ + + def extract_common_attributes( + self, + operation_name: str, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Extract common GenAI attributes required for all spans. + + Args: + operation_name: Operation name (chat, invoke_agent, execute_tool, etc.) + conversation_id: Conversation/session ID (optional) + user_id: User ID (optional) + + Returns: + Dictionary of common attributes + """ + attrs = { + "gen_ai.operation.name": operation_name, + "gen_ai.provider.name": "google_adk", # ✅ 使用 provider.name 而非 system + } + + # ✅ conversation.id 而非 session.id + if conversation_id and isinstance(conversation_id, str): + attrs["gen_ai.conversation.id"] = conversation_id + + # ✅ 使用标准 enduser.id 而非 gen_ai.user.id + if user_id and isinstance(user_id, str): + attrs["enduser.id"] = user_id + + return attrs + + def extract_runner_attributes( + self, invocation_context: InvocationContext + ) -> Dict[str, Any]: + """ + Extract attributes for Runner spans (top-level invoke_agent span). + + Args: + invocation_context: ADK invocation context + + Returns: + Dictionary of runner attributes + """ + try: + _logger.debug("Extracting runner attributes") + + # Extract conversation_id and user_id from invocation_context + conversation_id = None + user_id = None + + try: + conversation_id = invocation_context.session.id + except AttributeError: + _logger.debug( + "Failed to extract conversation_id from invocation_context" + ) + + try: + user_id = getattr(invocation_context, "user_id", None) + if not user_id and hasattr(invocation_context, "session"): + user_id = getattr( + invocation_context.session, "user_id", None + ) + except AttributeError: + _logger.debug( + "Failed to extract user_id from invocation_context" + ) + + if conversation_id is None: + _logger.debug( + "conversation_id not found on invocation_context" + ) + if user_id is None: + _logger.debug("user_id not found on invocation_context") + + # ✅ 使用 invoke_agent 操作名称 + attrs = self.extract_common_attributes( + operation_name="invoke_agent", + conversation_id=conversation_id, + user_id=user_id, + ) + + # Add ADK-specific attributes (非标准,作为自定义扩展) + if hasattr(invocation_context, "app_name"): + attrs["google_adk.runner.app_name"] = ( + invocation_context.app_name + ) + + if hasattr(invocation_context, "invocation_id"): + attrs["google_adk.runner.invocation_id"] = ( + invocation_context.invocation_id + ) + + # Agent spans use input.value/output.value + attrs["input.mime_type"] = "application/json" + attrs["output.mime_type"] = "application/json" + + return attrs + + except Exception as e: + _logger.exception(f"Error extracting runner attributes: {e}") + return self.extract_common_attributes("invoke_agent") + + def extract_agent_attributes( + self, agent: BaseAgent, callback_context: CallbackContext + ) -> Dict[str, Any]: + """ + Extract attributes for Agent spans. + + Args: + agent: ADK agent instance + callback_context: ADK callback context + + Returns: + Dictionary of agent attributes + """ + try: + _logger.debug("Extracting agent attributes") + + # Extract conversation_id and user_id from callback_context + conversation_id = None + user_id = None + + try: + conversation_id = ( + callback_context._invocation_context.session.id + ) + except AttributeError: + _logger.debug( + "Failed to extract conversation_id from callback_context" + ) + + try: + user_id = getattr(callback_context, "user_id", None) + if not user_id: + user_id = getattr( + callback_context._invocation_context, "user_id", None + ) + except AttributeError: + _logger.debug( + "Failed to extract user_id from callback_context" + ) + + if conversation_id is None: + _logger.debug("conversation_id not found on callback_context") + if user_id is None: + _logger.debug("user_id not found on callback_context") + + # ✅ 使用 invoke_agent 操作名称(无论是 agent 还是 chain) + attrs = self.extract_common_attributes( + operation_name="invoke_agent", + conversation_id=conversation_id, + user_id=user_id, + ) + + # ✅ 使用 gen_ai.agent.* 属性(带前缀) + if hasattr(agent, "name") and agent.name: + attrs["gen_ai.agent.name"] = agent.name + + # ✅ 尝试获取 agent.id(如果可用) + if hasattr(agent, "id") and agent.id: + attrs["gen_ai.agent.id"] = agent.id + + if hasattr(agent, "description") and agent.description: + attrs["gen_ai.agent.description"] = agent.description + + # Add input/output placeholder + attrs["input.mime_type"] = "application/json" + attrs["output.mime_type"] = "application/json" + + return attrs + + except Exception as e: + _logger.exception(f"Error extracting agent attributes: {e}") + return self.extract_common_attributes("invoke_agent") + + def extract_llm_request_attributes( + self, llm_request: LlmRequest, callback_context: CallbackContext + ) -> Dict[str, Any]: + """ + Extract attributes for LLM request spans. + + Args: + llm_request: ADK LLM request + callback_context: ADK callback context + + Returns: + Dictionary of LLM request attributes + """ + try: + # Extract conversation_id and user_id + conversation_id = None + user_id = None + + try: + conversation_id = ( + callback_context._invocation_context.session.id + ) + except AttributeError: + _logger.debug( + "Failed to extract conversation_id from callback_context" + ) + + try: + user_id = getattr(callback_context, "user_id", None) + if not user_id: + user_id = getattr( + callback_context._invocation_context, "user_id", None + ) + except AttributeError: + _logger.debug( + "Failed to extract user_id from callback_context" + ) + + # ✅ 使用 chat 操作名称 + attrs = self.extract_common_attributes( + operation_name="chat", + conversation_id=conversation_id, + user_id=user_id, + ) + + # Add LLM request attributes according to GenAI conventions + if hasattr(llm_request, "model") and llm_request.model: + # ✅ 只使用 gen_ai.request.model(移除冗余的 model_name) + attrs["gen_ai.request.model"] = llm_request.model + # ✅ 使用 _extract_provider_name 而非 _extract_system_from_model + attrs["gen_ai.provider.name"] = self._extract_provider_name( + llm_request.model + ) + + # Extract request parameters + if hasattr(llm_request, "config") and llm_request.config: + config = llm_request.config + + if hasattr(config, "max_tokens") and config.max_tokens: + attrs["gen_ai.request.max_tokens"] = config.max_tokens + + if ( + hasattr(config, "temperature") + and config.temperature is not None + ): + if isinstance(config.temperature, (int, float)): + attrs["gen_ai.request.temperature"] = ( + config.temperature + ) + + if hasattr(config, "top_p") and config.top_p is not None: + if isinstance(config.top_p, (int, float)): + attrs["gen_ai.request.top_p"] = config.top_p + + if hasattr(config, "top_k") and config.top_k is not None: + if isinstance(config.top_k, (int, float)): + attrs["gen_ai.request.top_k"] = config.top_k + + # Extract input messages (with content capture control) + if ( + should_capture_content() + and hasattr(llm_request, "contents") + and llm_request.contents + ): + try: + input_messages = [] + for content in llm_request.contents: + if hasattr(content, "role") and hasattr( + content, "parts" + ): + # Convert to GenAI message format + message = {"role": content.role, "parts": []} + for part in content.parts: + if hasattr(part, "text"): + message["parts"].append( + { + "type": "text", + "content": process_content( + part.text + ), + } + ) + input_messages.append(message) + + if input_messages: + attrs["gen_ai.input.messages"] = safe_json_dumps_large( + input_messages + ) + + except Exception as e: + _logger.debug(f"Failed to extract input messages: {e}") + + attrs["input.mime_type"] = "application/json" + # ❌ 移除 gen_ai.request.is_stream (非标准属性) + + return attrs + + except Exception as e: + _logger.exception(f"Error extracting LLM request attributes: {e}") + return self.extract_common_attributes("chat") + + def extract_llm_response_attributes( + self, llm_response: LlmResponse + ) -> Dict[str, Any]: + """ + Extract attributes for LLM response. + + Args: + llm_response: ADK LLM response + + Returns: + Dictionary of LLM response attributes + """ + try: + attrs = {} + + # Add response model + if hasattr(llm_response, "model") and llm_response.model: + attrs["gen_ai.response.model"] = llm_response.model + + # ✅ finish_reasons (复数数组) + if hasattr(llm_response, "finish_reason"): + finish_reason = llm_response.finish_reason or "stop" + attrs["gen_ai.response.finish_reasons"] = [ + finish_reason + ] # 必须是数组 + + # Add token usage + if ( + hasattr(llm_response, "usage_metadata") + and llm_response.usage_metadata + ): + usage = llm_response.usage_metadata + + if ( + hasattr(usage, "prompt_token_count") + and usage.prompt_token_count + ): + attrs["gen_ai.usage.input_tokens"] = ( + usage.prompt_token_count + ) + + if ( + hasattr(usage, "candidates_token_count") + and usage.candidates_token_count + ): + attrs["gen_ai.usage.output_tokens"] = ( + usage.candidates_token_count + ) + # ❌ 移除 gen_ai.usage.total_tokens (非标准,可自行计算) + + # Extract output messages (with content capture control) + if ( + should_capture_content() + and hasattr(llm_response, "content") + and llm_response.content + ): + try: + output_messages = [] + # Check if response has text content + if ( + hasattr(llm_response, "text") + and llm_response.text is not None + ): + extracted_text = ( + extract_content_safely_for_input_output( + llm_response.text + ) + ) + message = { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": process_content(extracted_text), + } + ], + "finish_reason": getattr( + llm_response, "finish_reason", None + ) + or "stop", + } + output_messages.append(message) + elif ( + hasattr(llm_response, "content") + and llm_response.content is not None + ): + extracted_text = ( + extract_content_safely_for_input_output( + llm_response.content + ) + ) + message = { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": process_content(extracted_text), + } + ], + "finish_reason": getattr( + llm_response, "finish_reason", None + ) + or "stop", + } + output_messages.append(message) + + if output_messages: + attrs["gen_ai.output.messages"] = ( + safe_json_dumps_large(output_messages) + ) + + except Exception as e: + _logger.debug(f"Failed to extract output messages: {e}") + + attrs["output.mime_type"] = "application/json" + + return attrs + + except Exception as e: + _logger.exception(f"Error extracting LLM response attributes: {e}") + return {} + + def extract_tool_attributes( + self, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + ) -> Dict[str, Any]: + """ + Extract attributes for Tool spans. + + Args: + tool: ADK tool instance + tool_args: Tool arguments + tool_context: Tool context + + Returns: + Dictionary of tool attributes + """ + try: + # 尝试从tool_context提取conversation_id + conversation_id = None + user_id = None + + if hasattr(tool_context, "session_id"): + conversation_id = tool_context.session_id + elif hasattr(tool_context, "context") and hasattr( + tool_context.context, "session_id" + ): + conversation_id = tool_context.context.session_id + + # ✅ 使用 execute_tool 操作名称 + attrs = self.extract_common_attributes( + operation_name="execute_tool", + conversation_id=conversation_id, + user_id=user_id, + ) + + # ✅ Tool 属性使用 gen_ai.tool.* 前缀 + if hasattr(tool, "name") and tool.name: + attrs["gen_ai.tool.name"] = tool.name + + if hasattr(tool, "description") and tool.description: + attrs["gen_ai.tool.description"] = tool.description + + # ✅ 默认 tool type 为 function + attrs["gen_ai.tool.type"] = "function" + + # ✅ 尝试获取 tool.call.id(如果可用) + if hasattr(tool_context, "call_id") and tool_context.call_id: + attrs["gen_ai.tool.call.id"] = tool_context.call_id + + # ✅ tool.call.arguments 而非 tool.parameters (Opt-In) + if should_capture_content() and tool_args: + attrs["gen_ai.tool.call.arguments"] = safe_json_dumps( + tool_args + ) + attrs["input.value"] = safe_json_dumps_for_input_output( + tool_args + ) + attrs["input.mime_type"] = "application/json" + + return attrs + + except Exception as e: + _logger.exception(f"Error extracting tool attributes: {e}") + return self.extract_common_attributes("execute_tool") + + def _extract_provider_name(self, model_name: str) -> str: + """ + Extract provider name from model name according to OTel GenAI conventions. + + Args: + model_name: Model name string + + Returns: + Provider name following OTel GenAI standard values + """ + if not model_name: + return "google_adk" + + model_lower = model_name.lower() + + # Google models - use standard values from OTel spec + if "gemini" in model_lower: + return "gcp.gemini" # AI Studio API + elif "vertex" in model_lower: + return "gcp.vertex_ai" # Vertex AI + # OpenAI models + elif "gpt" in model_lower or "openai" in model_lower: + return "openai" + # Anthropic models + elif "claude" in model_lower: + return "anthropic" + # Other providers + elif "llama" in model_lower or "meta" in model_lower: + return "meta" + elif "mistral" in model_lower: + return "mistral_ai" + elif "cohere" in model_lower: + return "cohere" + else: + # Default to google_adk for unknown models + return "google_adk" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_metrics.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_metrics.py new file mode 100644 index 00000000..ea3d1374 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_metrics.py @@ -0,0 +1,224 @@ +""" +OpenTelemetry GenAI Metrics Collector for Google ADK. + +This module implements standard OpenTelemetry GenAI metrics collection +following the latest GenAI semantic conventions. +""" + +import logging +from typing import Optional + +from opentelemetry.metrics import Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +_logger = logging.getLogger(__name__) + + +class Instruments: + """ + Standard OpenTelemetry GenAI instrumentation instruments. + + This class follows the same pattern as openai-v2/instruments.py + and implements only the 2 standard GenAI client metrics. + """ + + def __init__(self, meter: Meter): + """ + Initialize standard GenAI instruments. + + Args: + meter: OpenTelemetry meter instance + """ + # ✅ Standard GenAI client metric 1: Operation duration + self.operation_duration_histogram = ( + gen_ai_metrics.create_gen_ai_client_operation_duration(meter) + ) + + # ✅ Standard GenAI client metric 2: Token usage + self.token_usage_histogram = ( + gen_ai_metrics.create_gen_ai_client_token_usage(meter) + ) + + +class AdkMetricsCollector: + """ + Metrics collector for Google ADK following OpenTelemetry GenAI conventions. + + This collector implements ONLY the 2 standard GenAI client metrics: + - gen_ai.client.operation.duration (Histogram, unit: seconds) + - gen_ai.client.token.usage (Histogram, unit: tokens) + + All ARMS-specific metrics have been removed. + """ + + def __init__(self, meter: Meter): + """ + Initialize the metrics collector. + + Args: + meter: OpenTelemetry meter instance + """ + self._instruments = Instruments(meter) + _logger.debug( + "AdkMetricsCollector initialized with standard OTel GenAI metrics" + ) + + def record_llm_call( + self, + operation_name: str, + model_name: str, + duration: float, + error_type: Optional[str] = None, + prompt_tokens: int = 0, + completion_tokens: int = 0, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> None: + """ + Record LLM call metrics following standard OTel GenAI conventions. + + Args: + operation_name: Operation name (e.g., "chat") + model_name: Model name + duration: Duration in seconds + error_type: Error type if error occurred + prompt_tokens: Number of prompt tokens + completion_tokens: Number of completion tokens + conversation_id: Conversation ID (not used in metrics due to high cardinality) + user_id: User ID (not used in metrics due to high cardinality) + """ + try: + # ✅ Build standard attributes for operation.duration + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.provider.name": "google_adk", # ✅ Required attribute + "gen_ai.request.model": model_name, # ✅ Recommended attribute + } + + # ✅ Add error.type only if error occurred (Conditionally Required) + if error_type: + attributes["error.type"] = error_type + + # ✅ Record operation duration (Histogram, unit: seconds) + self._instruments.operation_duration_histogram.record( + duration, attributes=attributes + ) + + # ✅ Record token usage (Histogram, unit: tokens) + # Note: session_id and user_id are NOT included in metrics (high cardinality) + if prompt_tokens > 0: + self._instruments.token_usage_histogram.record( + prompt_tokens, + attributes={ + **attributes, + "gen_ai.token.type": "input", # ✅ Required for token.usage + }, + ) + + if completion_tokens > 0: + self._instruments.token_usage_histogram.record( + completion_tokens, + attributes={ + **attributes, + "gen_ai.token.type": "output", # ✅ Required for token.usage + }, + ) + + _logger.debug( + f"Recorded LLM metrics: operation={operation_name}, model={model_name}, " + f"duration={duration:.3f}s, prompt_tokens={prompt_tokens}, " + f"completion_tokens={completion_tokens}, error={error_type}" + ) + + except Exception as e: + _logger.exception(f"Error recording LLM metrics: {e}") + + def record_agent_call( + self, + operation_name: str, + agent_name: str, + duration: float, + error_type: Optional[str] = None, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> None: + """ + Record Agent call metrics following standard OTel GenAI conventions. + + Args: + operation_name: Operation name (e.g., "invoke_agent") + agent_name: Agent name + duration: Duration in seconds + error_type: Error type if error occurred + conversation_id: Conversation ID (not used in metrics due to high cardinality) + user_id: User ID (not used in metrics due to high cardinality) + """ + try: + # ✅ Build standard attributes + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.provider.name": "google_adk", # ✅ Required + "gen_ai.request.model": agent_name, # ✅ Agent name as model + } + + # ✅ Add error.type only if error occurred + if error_type: + attributes["error.type"] = error_type + + # ✅ Record operation duration (Histogram, unit: seconds) + self._instruments.operation_duration_histogram.record( + duration, attributes=attributes + ) + + _logger.debug( + f"Recorded Agent metrics: operation={operation_name}, agent={agent_name}, " + f"duration={duration:.3f}s, error={error_type}" + ) + + except Exception as e: + _logger.exception(f"Error recording Agent metrics: {e}") + + def record_tool_call( + self, + operation_name: str, + tool_name: str, + duration: float, + error_type: Optional[str] = None, + conversation_id: Optional[str] = None, + user_id: Optional[str] = None, + ) -> None: + """ + Record Tool call metrics following standard OTel GenAI conventions. + + Args: + operation_name: Operation name (e.g., "execute_tool") + tool_name: Tool name + duration: Duration in seconds + error_type: Error type if error occurred + conversation_id: Conversation ID (not used in metrics due to high cardinality) + user_id: User ID (not used in metrics due to high cardinality) + """ + try: + # ✅ Build standard attributes + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.provider.name": "google_adk", # ✅ Required + "gen_ai.request.model": tool_name, # ✅ Tool name as model + } + + # ✅ Add error.type only if error occurred + if error_type: + attributes["error.type"] = error_type + + # ✅ Record operation duration (Histogram, unit: seconds) + self._instruments.operation_duration_histogram.record( + duration, attributes=attributes + ) + + _logger.debug( + f"Recorded Tool metrics: operation={operation_name}, tool={tool_name}, " + f"duration={duration:.3f}s, error={error_type}" + ) + + except Exception as e: + _logger.exception(f"Error recording Tool metrics: {e}") diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_plugin.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_plugin.py new file mode 100644 index 00000000..a17cf455 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_plugin.py @@ -0,0 +1,777 @@ +""" +OpenTelemetry ADK Observability Plugin. + +This module implements the core observability plugin using Google ADK's +plugin mechanism with OpenTelemetry GenAI semantic conventions. +""" + +import logging +from typing import Any, Dict, Optional + +from google.adk.agents.base_agent import BaseAgent +from google.adk.agents.callback_context import CallbackContext +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events.event import Event +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.adk.plugins.base_plugin import BasePlugin +from google.adk.tools.base_tool import BaseTool +from google.adk.tools.tool_context import ToolContext +from google.genai import types + +from opentelemetry import trace as trace_api +from opentelemetry.metrics import Meter +from opentelemetry.trace import SpanKind + +from ._extractors import AdkAttributeExtractors +from ._metrics import AdkMetricsCollector +from ._utils import ( + extract_content_safely_for_input_output, + process_content, + safe_json_dumps_for_input_output, + should_capture_content, +) + +_logger = logging.getLogger(__name__) + + +class GoogleAdkObservabilityPlugin(BasePlugin): + """ + OpenTelemetry ADK Observability Plugin. + + Implements comprehensive observability for Google ADK applications + following OpenTelemetry GenAI semantic conventions. + """ + + def __init__(self, tracer: trace_api.Tracer, meter: Meter): + """ + Initialize the observability plugin. + + Args: + tracer: OpenTelemetry tracer instance + meter: OpenTelemetry meter instance + """ + super().__init__(name="opentelemetry_adk_observability") + self._tracer = tracer + self._metrics = AdkMetricsCollector(meter) + self._extractors = AdkAttributeExtractors() + + # Track active spans for proper nesting + self._active_spans: Dict[str, trace_api.Span] = {} + + # Track user messages and final responses for Runner spans + self._runner_inputs: Dict[str, types.Content] = {} + self._runner_outputs: Dict[str, str] = {} + + # Track llm_request -> model mapping to avoid fallback model names + self._llm_req_models: Dict[str, str] = {} + + # ===== Runner Level Callbacks - Top-level invoke_agent span ===== + + async def before_run_callback( + self, *, invocation_context: InvocationContext + ) -> Optional[Any]: + """ + Start Runner execution - create top-level invoke_agent span. + + According to OTel GenAI conventions, Runner is treated as a top-level agent. + Span name: "invoke_agent {app_name}" + """ + try: + # ✅ Span name follows GenAI conventions + span_name = f"invoke_agent {invocation_context.app_name}" + attributes = self._extractors.extract_runner_attributes( + invocation_context + ) + + # ✅ Use CLIENT span kind (recommended for GenAI) + span = self._tracer.start_span( + name=span_name, kind=SpanKind.CLIENT, attributes=attributes + ) + + # Store span for later use + self._active_spans[ + f"runner_{invocation_context.invocation_id}" + ] = span + + # Check if we already have a stored user message + runner_key = f"runner_{invocation_context.invocation_id}" + if runner_key in self._runner_inputs and should_capture_content(): + user_message = self._runner_inputs[runner_key] + input_messages = self._convert_user_message_to_genai_format( + user_message + ) + + if input_messages: + # For Agent spans, use input.value + span.set_attribute( + "input.value", + safe_json_dumps_for_input_output(input_messages), + ) + _logger.debug( + f"Set input.value on Agent span: {invocation_context.invocation_id}" + ) + + _logger.debug(f"Started Runner span: {span_name}") + + except Exception as e: + _logger.exception(f"Error in before_run_callback: {e}") + + return None + + async def on_user_message_callback( + self, + *, + invocation_context: InvocationContext, + user_message: types.Content, + ) -> Optional[types.Content]: + """ + Capture user input for Runner span. + + This callback is triggered when a user message is received. + """ + try: + # Store user message for later use in Runner span + runner_key = f"runner_{invocation_context.invocation_id}" + self._runner_inputs[runner_key] = user_message + + # Set input messages on active Runner span if it exists and content capture is enabled + span = self._active_spans.get(runner_key) + if span and should_capture_content(): + input_messages = self._convert_user_message_to_genai_format( + user_message + ) + + if input_messages: + # For Agent spans, use input.value + span.set_attribute( + "input.value", + safe_json_dumps_for_input_output(input_messages), + ) + + _logger.debug( + f"Captured user message for Runner: {invocation_context.invocation_id}" + ) + + except Exception as e: + _logger.exception(f"Error in on_user_message_callback: {e}") + + return None # Don't modify the user message + + async def on_event_callback( + self, *, invocation_context: InvocationContext, event: Event + ) -> Optional[Event]: + """ + Capture output events for Runner span. + + This callback is triggered for each event generated during execution. + """ + try: + if not should_capture_content(): + return None + + # Extract text content from event if available + event_content = "" + if hasattr(event, "content") and event.content: + event_content = extract_content_safely_for_input_output( + event.content + ) + elif hasattr(event, "data") and event.data: + event_content = extract_content_safely_for_input_output( + event.data + ) + + if event_content: + runner_key = f"runner_{invocation_context.invocation_id}" + + # Accumulate output content + if runner_key not in self._runner_outputs: + self._runner_outputs[runner_key] = "" + self._runner_outputs[runner_key] += event_content + + # Set output on active Runner span + span = self._active_spans.get(runner_key) + if span: + output_messages = [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": process_content( + self._runner_outputs[runner_key] + ), + } + ], + "finish_reason": "stop", + } + ] + + # For Agent spans, use output.value + span.set_attribute( + "output.value", + safe_json_dumps_for_input_output(output_messages), + ) + + _logger.debug( + f"Captured event for Runner: {invocation_context.invocation_id}" + ) + + except Exception as e: + _logger.exception(f"Error in on_event_callback: {e}") + + return None # Don't modify the event + + async def after_run_callback( + self, *, invocation_context: InvocationContext + ) -> Optional[None]: + """ + End Runner execution - finish top-level invoke_agent span. + """ + try: + span_key = f"runner_{invocation_context.invocation_id}" + span = self._active_spans.pop(span_key, None) + + if span: + # Record metrics + duration = self._calculate_span_duration(span) + + # Extract conversation_id and user_id + conversation_id = ( + invocation_context.session.id + if invocation_context.session + else None + ) + user_id = getattr(invocation_context, "user_id", None) + + self._metrics.record_agent_call( + operation_name="invoke_agent", + agent_name=invocation_context.app_name, + duration=duration, + error_type=None, + conversation_id=conversation_id, + user_id=user_id, + ) + + span.end() + _logger.debug( + f"Finished Runner span for {invocation_context.app_name}" + ) + + # Clean up stored data + runner_key = f"runner_{invocation_context.invocation_id}" + self._runner_inputs.pop(runner_key, None) + self._runner_outputs.pop(runner_key, None) + + except Exception as e: + _logger.exception(f"Error in after_run_callback: {e}") + + # ===== Agent Level Callbacks - invoke_agent span ===== + + async def before_agent_callback( + self, *, agent: BaseAgent, callback_context: CallbackContext + ) -> None: + """ + Start Agent execution - create invoke_agent span. + + Span name: "invoke_agent {agent.name}" + """ + try: + # ✅ Span name follows GenAI conventions + span_name = f"invoke_agent {agent.name}" + attributes = self._extractors.extract_agent_attributes( + agent, callback_context + ) + + # ✅ Use CLIENT span kind + span = self._tracer.start_span( + name=span_name, kind=SpanKind.CLIENT, attributes=attributes + ) + + # Store span + agent_key = f"agent_{id(agent)}_{callback_context._invocation_context.session.id}" + self._active_spans[agent_key] = span + + _logger.debug(f"Started Agent span: {span_name}") + + except Exception as e: + _logger.exception(f"Error in before_agent_callback: {e}") + + async def after_agent_callback( + self, *, agent: BaseAgent, callback_context: CallbackContext + ) -> None: + """ + End Agent execution - finish invoke_agent span and record metrics. + """ + try: + agent_key = f"agent_{id(agent)}_{callback_context._invocation_context.session.id}" + span = self._active_spans.pop(agent_key, None) + + if span: + # Record metrics + duration = self._calculate_span_duration(span) + + # Extract conversation_id and user_id + conversation_id = None + user_id = None + if callback_context and callback_context._invocation_context: + if callback_context._invocation_context.session: + conversation_id = ( + callback_context._invocation_context.session.id + ) + user_id = getattr( + callback_context._invocation_context, "user_id", None + ) + + self._metrics.record_agent_call( + operation_name="invoke_agent", + agent_name=agent.name, + duration=duration, + error_type=None, + conversation_id=conversation_id, + user_id=user_id, + ) + + span.end() + _logger.debug(f"Finished Agent span for {agent.name}") + + except Exception as e: + _logger.exception(f"Error in after_agent_callback: {e}") + + # ===== LLM Level Callbacks - chat span ===== + + async def before_model_callback( + self, *, callback_context: CallbackContext, llm_request: LlmRequest + ) -> None: + """ + Start LLM call - create chat span. + + Span name: "chat {model}" + """ + try: + # ✅ Span name follows GenAI conventions: "{operation_name} {request.model}" + span_name = f"chat {llm_request.model}" + attributes = self._extractors.extract_llm_request_attributes( + llm_request, callback_context + ) + + # ✅ Use CLIENT span kind for LLM calls + span = self._tracer.start_span( + name=span_name, kind=SpanKind.CLIENT, attributes=attributes + ) + + # Store span + session_id = callback_context._invocation_context.session.id + request_key = f"llm_{id(llm_request)}_{session_id}" + self._active_spans[request_key] = span + + # Store the requested model for reliable retrieval later + if hasattr(llm_request, "model") and llm_request.model: + self._llm_req_models[request_key] = llm_request.model + + _logger.debug(f"Started LLM span: {span_name}") + + except Exception as e: + _logger.exception(f"Error in before_model_callback: {e}") + + async def after_model_callback( + self, *, callback_context: CallbackContext, llm_response: LlmResponse + ) -> None: + """ + End LLM call - finish chat span and record metrics. + """ + try: + # Find the matching span + llm_span = None + request_key = None + session_id = callback_context._invocation_context.session.id + for key, span in list(self._active_spans.items()): + if key.startswith("llm_") and session_id in key: + llm_span = self._active_spans.pop(key) + request_key = key + break + + if llm_span: + # Add response attributes + response_attrs = ( + self._extractors.extract_llm_response_attributes( + llm_response + ) + ) + for key, value in response_attrs.items(): + llm_span.set_attribute(key, value) + + # Record metrics + duration = self._calculate_span_duration(llm_span) + + # Resolve model name with robust fallbacks + model_name = self._resolve_model_name( + llm_response, request_key, llm_span + ) + + # Extract conversation_id and user_id + conversation_id = None + user_id = None + if callback_context and callback_context._invocation_context: + if callback_context._invocation_context.session: + conversation_id = ( + callback_context._invocation_context.session.id + ) + user_id = getattr( + callback_context._invocation_context, "user_id", None + ) + + # Extract token usage + prompt_tokens = 0 + completion_tokens = 0 + if llm_response and llm_response.usage_metadata: + prompt_tokens = getattr( + llm_response.usage_metadata, "prompt_token_count", 0 + ) + completion_tokens = getattr( + llm_response.usage_metadata, + "candidates_token_count", + 0, + ) + + self._metrics.record_llm_call( + operation_name="chat", + model_name=model_name, + duration=duration, + error_type=None, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + conversation_id=conversation_id, + user_id=user_id, + ) + + llm_span.end() + _logger.debug(f"Finished LLM span for model {model_name}") + + except Exception as e: + _logger.exception(f"Error in after_model_callback: {e}") + + async def on_model_error_callback( + self, + *, + callback_context: CallbackContext, + llm_request: LlmRequest, + error: Exception, + ) -> Optional[LlmResponse]: + """ + Handle LLM call errors. + """ + try: + # Find and finish the span with error status + session_id = callback_context._invocation_context.session.id + for key, span in list(self._active_spans.items()): + if key.startswith("llm_") and session_id in key: + span = self._active_spans.pop(key) + + # Set error attributes + error_type = type(error).__name__ + span.set_attribute("error.type", error_type) + + # Record error metrics + duration = self._calculate_span_duration(span) + model_name = ( + llm_request.model if llm_request else "unknown" + ) + + # Extract conversation_id and user_id + conversation_id = None + user_id = None + if ( + callback_context + and callback_context._invocation_context + ): + if callback_context._invocation_context.session: + conversation_id = ( + callback_context._invocation_context.session.id + ) + user_id = getattr( + callback_context._invocation_context, + "user_id", + None, + ) + + self._metrics.record_llm_call( + operation_name="chat", + model_name=model_name, + duration=duration, + error_type=error_type, + prompt_tokens=0, + completion_tokens=0, + conversation_id=conversation_id, + user_id=user_id, + ) + + # ✅ Use standard OTel span status for errors + span.set_status( + trace_api.Status( + trace_api.StatusCode.ERROR, description=str(error) + ) + ) + span.end() + break + + _logger.debug(f"Handled LLM error: {error}") + + except Exception as e: + _logger.exception(f"Error in on_model_error_callback: {e}") + + return None + + # ===== Tool Level Callbacks - execute_tool span ===== + + async def before_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + ) -> None: + """ + Start Tool execution - create execute_tool span. + + Span name: "execute_tool {tool.name}" + """ + try: + # ✅ Span name follows GenAI conventions + span_name = f"execute_tool {tool.name}" + attributes = self._extractors.extract_tool_attributes( + tool, tool_args, tool_context + ) + + # ✅ Use INTERNAL span kind for tool execution (as per spec) + span = self._tracer.start_span( + name=span_name, kind=SpanKind.INTERNAL, attributes=attributes + ) + + # Store span + tool_key = f"tool_{id(tool)}_{id(tool_args)}" + self._active_spans[tool_key] = span + + _logger.debug(f"Started Tool span: {span_name}") + + except Exception as e: + _logger.exception(f"Error in before_tool_callback: {e}") + + async def after_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + result: dict, + ) -> None: + """ + End Tool execution - finish execute_tool span and record metrics. + """ + try: + tool_key = f"tool_{id(tool)}_{id(tool_args)}" + span = self._active_spans.pop(tool_key, None) + + if span: + # ✅ Add tool result as gen_ai.tool.call.result (Opt-In) + if should_capture_content() and result: + result_json = safe_json_dumps_for_input_output(result) + span.set_attribute("gen_ai.tool.call.result", result_json) + span.set_attribute("output.value", result_json) + span.set_attribute("output.mime_type", "application/json") + + # Record metrics + duration = self._calculate_span_duration(span) + + # Extract conversation_id and user_id from tool_context + conversation_id = ( + getattr(tool_context, "session_id", None) + if tool_context + else None + ) + user_id = ( + getattr(tool_context, "user_id", None) + if tool_context + else None + ) + + self._metrics.record_tool_call( + operation_name="execute_tool", + tool_name=tool.name, + duration=duration, + error_type=None, + conversation_id=conversation_id, + user_id=user_id, + ) + + span.end() + _logger.debug(f"Finished Tool span for {tool.name}") + + except Exception as e: + _logger.exception(f"Error in after_tool_callback: {e}") + + async def on_tool_error_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + error: Exception, + ) -> Optional[dict]: + """ + Handle Tool execution errors. + """ + try: + tool_key = f"tool_{id(tool)}_{id(tool_args)}" + span = self._active_spans.pop(tool_key, None) + + if span: + # Set error attributes + error_type = type(error).__name__ + span.set_attribute("error.type", error_type) + + # Record error metrics + duration = self._calculate_span_duration(span) + + # Extract conversation_id and user_id + conversation_id = ( + getattr(tool_context, "session_id", None) + if tool_context + else None + ) + user_id = ( + getattr(tool_context, "user_id", None) + if tool_context + else None + ) + + self._metrics.record_tool_call( + operation_name="execute_tool", + tool_name=tool.name, + duration=duration, + error_type=error_type, + conversation_id=conversation_id, + user_id=user_id, + ) + + # ✅ Use standard OTel span status for errors + span.set_status( + trace_api.Status( + trace_api.StatusCode.ERROR, description=str(error) + ) + ) + span.end() + + _logger.debug(f"Handled Tool error: {error}") + + except Exception as e: + _logger.exception(f"Error in on_tool_error_callback: {e}") + + return None + + # ===== Helper Methods ===== + + def _calculate_span_duration(self, span: trace_api.Span) -> float: + """ + Calculate span duration in seconds. + + Args: + span: OpenTelemetry span + + Returns: + Duration in seconds + """ + import time + + if hasattr(span, "start_time") and span.start_time: + current_time_ns = time.time_ns() + return ( + current_time_ns - span.start_time + ) / 1_000_000_000 # ns to s + return 0.0 + + def _resolve_model_name( + self, llm_response: LlmResponse, request_key: str, span: trace_api.Span + ) -> str: + """ + Resolve model name with robust fallbacks. + + Args: + llm_response: LLM response object + request_key: Request key for stored models + span: Current span + + Returns: + Model name string + """ + model_name = None + + # 1) Prefer llm_response.model if available + if ( + llm_response + and hasattr(llm_response, "model") + and getattr(llm_response, "model") + ): + model_name = getattr(llm_response, "model") + + # 2) Use stored request model by request_key + if ( + not model_name + and request_key + and request_key in self._llm_req_models + ): + model_name = self._llm_req_models.pop(request_key, None) + + # 3) Try span attributes if accessible + if ( + not model_name + and hasattr(span, "attributes") + and getattr(span, "attributes") + ): + model_name = span.attributes.get("gen_ai.request.model") + + # 4) Parse from span name like "chat " + if ( + not model_name + and hasattr(span, "name") + and isinstance(span.name, str) + ): + try: + name = span.name + if name.startswith("chat ") and len(name) > 5: + model_name = name[5:] # Remove "chat " prefix + except Exception: + pass + + # 5) Final fallback + if not model_name: + model_name = "unknown" + + return model_name + + def _convert_user_message_to_genai_format( + self, user_message: types.Content + ) -> list: + """ + Convert ADK user message to GenAI message format. + + Args: + user_message: ADK Content object + + Returns: + List of GenAI formatted messages + """ + input_messages = [] + if ( + user_message + and hasattr(user_message, "role") + and hasattr(user_message, "parts") + ): + message = {"role": user_message.role, "parts": []} + for part in user_message.parts: + if hasattr(part, "text"): + message["parts"].append( + {"type": "text", "content": process_content(part.text)} + ) + input_messages.append(message) + return input_messages diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_utils.py new file mode 100644 index 00000000..5490ba00 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/internal/_utils.py @@ -0,0 +1,255 @@ +""" +Utility functions for Google ADK instrumentation. + +This module provides common utility functions following OpenTelemetry standards. +""" + +import json +import os +from typing import Any, Optional + + +def should_capture_content() -> bool: + """ + Check if content capture is enabled via environment variable. + + Returns: + True if OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT is set to "true" + """ + return ( + os.getenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "false" + ).lower() + == "true" + ) + + +def get_max_content_length() -> Optional[int]: + """ + Get the configured maximum content length from environment variable. + + Returns: + Maximum length in characters, or None if not set + """ + limit_str = os.getenv( + "OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH" + ) + if limit_str: + try: + return int(limit_str) + except ValueError: + pass + return None + + +def process_content(content: str) -> str: + """ + Process content with length limit and truncation. + + This replaces the ARMS SDK process_content() function with standard OTel behavior. + + Args: + content: Content string to process + + Returns: + Processed content with truncation marker if needed + """ + if not content: + return "" + + if not should_capture_content(): + return "" + + max_length = get_max_content_length() + if max_length is None or len(content) <= max_length: + return content + + # Add truncation marker + truncation_marker = " [TRUNCATED]" + effective_limit = max_length - len(truncation_marker) + if effective_limit <= 0: + return truncation_marker[:max_length] + + return content[:effective_limit] + truncation_marker + + +def safe_json_dumps( + obj: Any, max_length: int = 1024, respect_env_limit: bool = False +) -> str: + """ + Safely serialize an object to JSON with error handling and length limits. + + Args: + obj: Object to serialize + max_length: Maximum length of the resulting string (used as fallback) + respect_env_limit: If True, use environment variable limit instead of max_length + + Returns: + JSON string representation of the object + """ + try: + json_str = json.dumps(obj, ensure_ascii=False, default=str) + + if respect_env_limit: + json_str = process_content(json_str) + elif len(json_str) > max_length: + json_str = json_str[:max_length] + "...[truncated]" + + return json_str + except Exception: + fallback_str = str(obj) + if respect_env_limit: + return process_content(fallback_str) + else: + return fallback_str[:max_length] + + +def safe_json_dumps_large( + obj: Any, max_length: int = 1048576, respect_env_limit: bool = True +) -> str: + """ + Safely serialize large objects to JSON with extended length limits. + + This is specifically designed for content that may be large, such as + LLM input/output messages. + + Args: + obj: Object to serialize + max_length: Maximum length (default 1MB, used as fallback) + respect_env_limit: If True (default), use environment variable limit + + Returns: + JSON string representation of the object + """ + return safe_json_dumps(obj, max_length, respect_env_limit) + + +def extract_content_safely( + content: Any, max_length: int = 1024, respect_env_limit: bool = True +) -> str: + """ + Safely extract text content from various ADK content types. + + Args: + content: Content object (could be types.Content, string, etc.) + max_length: Maximum length of extracted content (used as fallback) + respect_env_limit: If True (default), use environment variable limit + + Returns: + String representation of the content + """ + if not content: + return "" + + try: + # Handle Google genai types.Content objects + if hasattr(content, "parts") and content.parts: + text_parts = [] + for part in content.parts: + if hasattr(part, "text") and part.text: + text_parts.append(part.text) + content_str = "".join(text_parts) + elif hasattr(content, "text"): + content_str = content.text + else: + content_str = str(content) + + # Apply length limit with proper truncation handling + if respect_env_limit: + return process_content(content_str) + elif len(content_str) > max_length: + content_str = content_str[:max_length] + "...[truncated]" + + return content_str + + except Exception: + fallback_str = str(content) if content else "" + if respect_env_limit: + return process_content(fallback_str) + else: + return fallback_str[:max_length] + + +def safe_json_dumps_for_input_output(obj: Any) -> str: + """ + Safely serialize objects for input/output attributes with environment variable length limit. + + This function is specifically designed for input.value and output.value attributes + and always respects the OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH environment variable. + + Args: + obj: Object to serialize + + Returns: + JSON string representation with proper truncation marker if needed + """ + return safe_json_dumps(obj, max_length=1048576, respect_env_limit=True) + + +def extract_content_safely_for_input_output(content: Any) -> str: + """ + Safely extract content for input/output attributes with environment variable length limit. + + This function is specifically designed for input/output content extraction + and always respects the OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH environment variable. + + Args: + content: Content object to extract text from + + Returns: + String representation with proper truncation marker if needed + """ + return extract_content_safely( + content, max_length=1048576, respect_env_limit=True + ) + + +def extract_model_name(model_obj: Any) -> str: + """ + Extract model name from various model object types. + + Args: + model_obj: Model object or model name string + + Returns: + Model name string + """ + if isinstance(model_obj, str): + return model_obj + elif hasattr(model_obj, "model") and model_obj.model: + return model_obj.model + elif hasattr(model_obj, "name") and model_obj.name: + return model_obj.name + else: + return "unknown" + + +def is_slow_call(duration: float, threshold: float = 0.5) -> bool: + """ + Determine if a call should be considered slow. + + Args: + duration: Duration in seconds + threshold: Slow call threshold in seconds (default 500ms) + + Returns: + True if call is considered slow + """ + return duration > threshold + + +def get_error_attributes(error: Exception) -> dict: + """ + Extract error attributes from an exception. + + Args: + error: Exception object + + Returns: + Dictionary of error attributes + """ + return { + "error.type": type(error).__name__, + # Note: error.message is non-standard, OTel recommends using span status + # But we include it for debugging purposes + } diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/package.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/package.py new file mode 100644 index 00000000..ad5a056b --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/package.py @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("google-adk >= 0.1.0",) + +_supports_metrics = True + +_semconv_status = "experimental" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/version.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/version.py new file mode 100644 index 00000000..a5e99969 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/src/opentelemetry/instrumentation/google_adk/version.py @@ -0,0 +1,3 @@ +"""Version information for OpenTelemetry Google ADK Instrumentation.""" + +__version__ = "0.1.0" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/__init__.py new file mode 100644 index 00000000..7eb68885 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for OpenTelemetry Google ADK Instrumentation.""" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_metrics.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_metrics.py new file mode 100644 index 00000000..0df16c0e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_metrics.py @@ -0,0 +1,697 @@ +""" +Integration tests for Google ADK Metrics with InMemoryMetricReader validation. + +Tests validate that metrics are recorded with correct attributes according to +OpenTelemetry GenAI Semantic Conventions using real plugin callbacks and +InMemoryMetricReader to capture actual metrics data. + +This test follows the same pattern as the commercial ARMS version but validates +against the latest OpenTelemetry GenAI semantic conventions. +""" + +import asyncio +from typing import Any, Dict, List +from unittest.mock import Mock + +import pytest + +from opentelemetry.instrumentation.google_adk import GoogleAdkInstrumentor +from opentelemetry.sdk import metrics as metrics_sdk +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + + +def create_mock_callback_context(session_id="session_123", user_id="user_456"): + """Create properly structured mock CallbackContext.""" + mock_callback_context = Mock() + mock_session = Mock() + mock_session.id = session_id + mock_invocation_context = Mock() + mock_invocation_context.session = mock_session + mock_callback_context._invocation_context = mock_invocation_context + mock_callback_context.user_id = user_id + return mock_callback_context + + +class OTelGenAIMetricsValidator: + """ + Validator for OpenTelemetry GenAI Metrics Semantic Conventions. + + Based on the latest OTel GenAI semantic conventions: + - Only 2 standard metrics: gen_ai.client.operation.duration and gen_ai.client.token.usage + - Required attributes: gen_ai.operation.name, gen_ai.provider.name + - Recommended attributes: gen_ai.request.model, gen_ai.response.model, server.address, server.port + - error.type only present on error + - gen_ai.token.type with values "input" or "output" for token usage + """ + + # Standard OTel GenAI metrics + STANDARD_METRICS = { + "gen_ai.client.operation.duration", # Histogram + "gen_ai.client.token.usage", # Histogram + } + + # Non-standard metrics that should NOT be present + NON_STANDARD_METRICS = { + # ARMS-specific metrics + "calls_count", + "calls_duration_seconds", + "call_error_count", + "llm_usage_tokens", + "llm_first_token_seconds", + # Custom GenAI metrics (non-standard) + "genai_calls_count", + "genai_calls_duration_seconds", + "genai_calls_error_count", + "genai_calls_slow_count", + "genai_llm_first_token_seconds", + "genai_llm_usage_tokens", + "genai_avg_first_token_seconds", + } + + def validate_metrics_data( + self, metric_reader: InMemoryMetricReader + ) -> Dict[str, Any]: + """Validate metrics data against OTel GenAI conventions.""" + validation_result = { + "metrics_found": set(), + "non_standard_found": set(), + "metric_validations": {}, + "errors": [], + "warnings": [], + } + + metrics_data = metric_reader.get_metrics_data() + if not metrics_data: + validation_result["warnings"].append("No metrics data found") + return validation_result + + # Collect all found metrics + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + validation_result["metrics_found"].add(metric.name) + + # Check for non-standard metrics + if metric.name in self.NON_STANDARD_METRICS: + validation_result["non_standard_found"].add( + metric.name + ) + + # Validate individual metric + validation_result["metric_validations"][metric.name] = ( + self._validate_single_metric(metric) + ) + + # Check for non-standard metrics + if validation_result["non_standard_found"]: + validation_result["errors"].append( + f"Found non-standard metrics: {validation_result['non_standard_found']}" + ) + + return validation_result + + def _validate_single_metric(self, metric) -> Dict[str, Any]: + """Validate a single metric's attributes and data.""" + result = { + "name": metric.name, + "type": type(metric.data).__name__, + "data_points": [], + "errors": [], + "warnings": [], + } + + # Get data points + data_points = [] + if hasattr(metric.data, "data_points"): + data_points = metric.data.data_points + + for data_point in data_points: + point_validation = self._validate_data_point( + metric.name, data_point + ) + result["data_points"].append(point_validation) + if point_validation["errors"]: + result["errors"].extend(point_validation["errors"]) + + return result + + def _validate_data_point( + self, metric_name: str, data_point + ) -> Dict[str, Any]: + """Validate data point attributes against OTel GenAI conventions.""" + result = { + "attributes": {}, + "value": None, + "errors": [], + "warnings": [], + } + + # Extract attributes + if hasattr(data_point, "attributes"): + result["attributes"] = ( + dict(data_point.attributes) if data_point.attributes else {} + ) + + # Extract value + if hasattr(data_point, "sum"): + result["value"] = data_point.sum + elif hasattr(data_point, "count"): + result["value"] = data_point.count + + # Validate OTel GenAI attributes + attributes = result["attributes"] + + # Check required attributes + if "gen_ai.operation.name" not in attributes: + result["errors"].append( + "Missing required attribute: gen_ai.operation.name" + ) + + if "gen_ai.provider.name" not in attributes: + result["errors"].append( + "Missing required attribute: gen_ai.provider.name" + ) + + # Check for non-standard attributes + non_standard_attrs = { + "callType", + "callKind", + "rpcType", + "spanKind", # ARMS attributes + "modelName", + "usageType", # Should be gen_ai.request.model, gen_ai.token.type + "session_id", + "user_id", # High cardinality, should not be in metrics + } + + for attr in non_standard_attrs: + if attr in attributes: + result["errors"].append( + f"Found non-standard attribute: {attr}" + ) + + # Validate token.type values + if "gen_ai.token.type" in attributes: + token_type = attributes["gen_ai.token.type"] + if token_type not in ["input", "output"]: + result["errors"].append( + f"Invalid gen_ai.token.type value: {token_type}" + ) + + return result + + +class TestGoogleAdkMetricsIntegration: + """Integration tests using InMemoryMetricReader to validate actual metrics.""" + + def setup_method(self): + """Set up test fixtures for each test.""" + # Create independent providers and readers + self.tracer_provider = trace_sdk.TracerProvider() + self.span_exporter = InMemorySpanExporter() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + self.metric_reader = InMemoryMetricReader() + self.meter_provider = metrics_sdk.MeterProvider( + metric_readers=[self.metric_reader] + ) + + # Create instrumentor + self.instrumentor = GoogleAdkInstrumentor() + + # Create validator + self.validator = OTelGenAIMetricsValidator() + + # Clean up any existing instrumentation + if self.instrumentor.is_instrumented_by_opentelemetry: + self.instrumentor.uninstrument() + + # Clear any existing data + self.span_exporter.clear() + + def teardown_method(self): + """Clean up after each test.""" + try: + if self.instrumentor.is_instrumented_by_opentelemetry: + self.instrumentor.uninstrument() + except Exception: + pass + + self.span_exporter.clear() + + def get_metrics_by_name(self, name: str) -> List[Any]: + """Get metrics data by metric name from InMemoryMetricReader.""" + metrics_data = self.metric_reader.get_metrics_data() + if not metrics_data: + return [] + + found_metrics = [] + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + if metric.name == name: + found_metrics.append(metric) + + return found_metrics + + def get_metric_data_points(self, metric_name: str) -> List[Any]: + """Get data points for a specific metric.""" + metrics = self.get_metrics_by_name(metric_name) + if not metrics: + return [] + + data_points = [] + for metric in metrics: + if hasattr(metric.data, "data_points"): + data_points.extend(metric.data.data_points) + + return data_points + + @pytest.mark.asyncio + async def test_llm_metrics_with_standard_otel_attributes(self): + """ + Test that LLM metrics are recorded with standard OTel GenAI attributes. + + Validates: + - gen_ai.client.operation.duration histogram recorded + - gen_ai.client.token.usage histogram recorded + - Required attributes present: gen_ai.operation.name, gen_ai.provider.name + - No non-standard attributes (callType, spanKind, modelName, etc.) + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock LLM request and response + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + mock_llm_request.config.max_tokens = 1000 + mock_llm_request.config.temperature = 0.7 + mock_llm_request.contents = ["test"] + + mock_llm_response = Mock() + mock_llm_response.model = "gemini-pro" + mock_llm_response.finish_reason = "stop" + mock_llm_response.usage_metadata = Mock() + mock_llm_response.usage_metadata.prompt_token_count = 100 + mock_llm_response.usage_metadata.candidates_token_count = 50 + + mock_callback_context = create_mock_callback_context() + + # Execute LLM callbacks + await plugin.before_model_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + ) + + await asyncio.sleep(0.01) # Simulate processing time + + await plugin.after_model_callback( + callback_context=mock_callback_context, + llm_response=mock_llm_response, + ) + + # Validate metrics using InMemoryMetricReader + validation_result = self.validator.validate_metrics_data( + self.metric_reader + ) + + # Check for non-standard metrics + assert ( + len(validation_result["non_standard_found"]) == 0 + ), f"Found non-standard metrics: {validation_result['non_standard_found']}" + + # Check standard metrics are present + assert ( + "gen_ai.client.operation.duration" + in validation_result["metrics_found"] + ), "Should have gen_ai.client.operation.duration metric" + assert ( + "gen_ai.client.token.usage" in validation_result["metrics_found"] + ), "Should have gen_ai.client.token.usage metric" + + # Get actual data points + duration_points = self.get_metric_data_points( + "gen_ai.client.operation.duration" + ) + assert ( + len(duration_points) >= 1 + ), "Should have at least 1 duration data point" + + # Validate duration attributes + duration_attrs = dict(duration_points[0].attributes) + assert ( + duration_attrs.get("gen_ai.operation.name") == "chat" + ), "Should have gen_ai.operation.name = 'chat'" + assert ( + "gen_ai.provider.name" in duration_attrs + ), "Should have gen_ai.provider.name" + assert ( + duration_attrs.get("gen_ai.request.model") == "gemini-pro" + ), "Should have gen_ai.request.model" + + # Validate NO non-standard attributes + assert "callType" not in duration_attrs, "Should NOT have callType" + assert "spanKind" not in duration_attrs, "Should NOT have spanKind" + assert "modelName" not in duration_attrs, "Should NOT have modelName" + assert ( + "session_id" not in duration_attrs + ), "Should NOT have session_id (high cardinality)" + assert ( + "user_id" not in duration_attrs + ), "Should NOT have user_id (high cardinality)" + + # Get token usage data points + token_points = self.get_metric_data_points("gen_ai.client.token.usage") + assert ( + len(token_points) == 2 + ), "Should have 2 token usage data points (input + output)" + + # Validate token types + token_types = { + dict(dp.attributes).get("gen_ai.token.type") for dp in token_points + } + assert token_types == { + "input", + "output", + }, "Should have both input and output token types" + + # Validate token values + input_point = [ + dp + for dp in token_points + if dict(dp.attributes).get("gen_ai.token.type") == "input" + ][0] + output_point = [ + dp + for dp in token_points + if dict(dp.attributes).get("gen_ai.token.type") == "output" + ][0] + + assert input_point.sum == 100, "Should record 100 input tokens" + assert output_point.sum == 50, "Should record 50 output tokens" + + # Validate NO usageType attribute (should be gen_ai.token.type) + input_attrs = dict(input_point.attributes) + assert ( + "usageType" not in input_attrs + ), "Should NOT have usageType (use gen_ai.token.type)" + + @pytest.mark.asyncio + async def test_llm_metrics_with_error(self): + """ + Test that LLM error metrics include error.type attribute. + + Validates: + - error.type attribute present on error + - Standard attributes still present + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock LLM request + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + + mock_callback_context = create_mock_callback_context() + + # Create error + test_error = Exception("API timeout") + + # Execute error scenario + await plugin.before_model_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + ) + + await plugin.on_model_error_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + error=test_error, + ) + + # Get metrics data + duration_points = self.get_metric_data_points( + "gen_ai.client.operation.duration" + ) + assert len(duration_points) >= 1, "Should have error duration metric" + + # Validate error.type attribute + error_attrs = dict(duration_points[0].attributes) + assert "error.type" in error_attrs, "Should have error.type on error" + assert error_attrs["error.type"] == "Exception" + + @pytest.mark.asyncio + async def test_agent_metrics_use_standard_attributes(self): + """ + Test that Agent metrics use standard OTel GenAI attributes. + + Validates: + - gen_ai.operation.name = "invoke_agent" + - Agent name mapped to gen_ai.request.model + - No ARMS-specific attributes + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock agent + mock_agent = Mock() + mock_agent.name = "math_tutor" + mock_agent.description = "Mathematical tutor agent" + mock_agent.sub_agents = [] + + mock_callback_context = create_mock_callback_context() + + # Execute Agent callbacks + await plugin.before_agent_callback( + agent=mock_agent, callback_context=mock_callback_context + ) + + await asyncio.sleep(0.01) + + await plugin.after_agent_callback( + agent=mock_agent, callback_context=mock_callback_context + ) + + # Get metrics data + duration_points = self.get_metric_data_points( + "gen_ai.client.operation.duration" + ) + assert len(duration_points) >= 1, "Should have agent duration metric" + + # Validate attributes + agent_attrs = dict(duration_points[0].attributes) + assert ( + agent_attrs.get("gen_ai.operation.name") == "invoke_agent" + ), "Should have gen_ai.operation.name = 'invoke_agent'" + assert ( + "gen_ai.provider.name" in agent_attrs + ), "Should have provider name" + + # Agent name should be in gen_ai.request.model + assert ( + agent_attrs.get("gen_ai.request.model") == "math_tutor" + or "gen_ai.agent.name" in agent_attrs + ), "Agent name should be in metrics" + + # Validate NO ARMS attributes + assert "spanKind" not in agent_attrs, "Should NOT have spanKind" + assert ( + "session_id" not in agent_attrs + ), "Should NOT have high-cardinality session_id" + assert ( + "user_id" not in agent_attrs + ), "Should NOT have high-cardinality user_id" + + @pytest.mark.asyncio + async def test_tool_metrics_use_standard_attributes(self): + """ + Test that Tool metrics use standard OTel GenAI attributes. + + Validates: + - gen_ai.operation.name = "execute_tool" + - Tool name mapped to gen_ai.request.model + - Standard metric structure + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock tool + mock_tool = Mock() + mock_tool.name = "calculator" + mock_tool.description = "Mathematical calculator" + + mock_tool_args = {"operation": "add", "a": 5, "b": 3} + mock_tool_context = Mock() + mock_tool_context.session_id = "session_456" + mock_result = {"result": 8} + + # Execute Tool callbacks + await plugin.before_tool_callback( + tool=mock_tool, + tool_args=mock_tool_args, + tool_context=mock_tool_context, + ) + + await asyncio.sleep(0.01) + + await plugin.after_tool_callback( + tool=mock_tool, + tool_args=mock_tool_args, + tool_context=mock_tool_context, + result=mock_result, + ) + + # Get metrics data + duration_points = self.get_metric_data_points( + "gen_ai.client.operation.duration" + ) + assert len(duration_points) >= 1, "Should have tool duration metric" + + # Validate attributes + tool_attrs = dict(duration_points[0].attributes) + assert ( + tool_attrs.get("gen_ai.operation.name") == "execute_tool" + ), "Should have gen_ai.operation.name = 'execute_tool'" + assert "gen_ai.provider.name" in tool_attrs + + # Tool name should be in metrics + assert ( + tool_attrs.get("gen_ai.request.model") == "calculator" + or "gen_ai.tool.name" in tool_attrs + ), "Tool name should be in metrics" + + @pytest.mark.asyncio + async def test_only_two_standard_metrics_recorded(self): + """ + Test that only 2 standard OTel GenAI metrics are recorded. + + Validates: + - Only gen_ai.client.operation.duration + - Only gen_ai.client.token.usage + - NO ARMS metrics (calls_count, calls_duration_seconds, etc.) + - NO custom GenAI metrics (genai_calls_count, genai_llm_first_token_seconds, etc.) + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Execute various operations + mock_context = create_mock_callback_context() + + # LLM call + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + mock_llm_request.contents = ["test"] + + mock_llm_response = Mock() + mock_llm_response.model = "gemini-pro" + mock_llm_response.finish_reason = "stop" + mock_llm_response.usage_metadata = Mock() + mock_llm_response.usage_metadata.prompt_token_count = 10 + mock_llm_response.usage_metadata.candidates_token_count = 5 + + await plugin.before_model_callback( + callback_context=mock_context, llm_request=mock_llm_request + ) + await plugin.after_model_callback( + callback_context=mock_context, llm_response=mock_llm_response + ) + + # Agent call + mock_agent = Mock() + mock_agent.name = "agent1" + mock_agent.sub_agents = [] + + await plugin.before_agent_callback( + agent=mock_agent, callback_context=mock_context + ) + await plugin.after_agent_callback( + agent=mock_agent, callback_context=mock_context + ) + + # Validate metrics + validation_result = self.validator.validate_metrics_data( + self.metric_reader + ) + + # Should have exactly 2 standard metrics + standard_metrics = ( + validation_result["metrics_found"] + & self.validator.STANDARD_METRICS + ) + assert ( + len(standard_metrics) == 2 + ), f"Should have exactly 2 standard metrics, got {len(standard_metrics)}: {standard_metrics}" + + # Should have NO non-standard metrics + assert ( + len(validation_result["non_standard_found"]) == 0 + ), f"Should have NO non-standard metrics, found: {validation_result['non_standard_found']}" + + # Explicitly check ARMS metrics are NOT present + arms_metrics = { + "calls_count", + "calls_duration_seconds", + "call_error_count", + "llm_usage_tokens", + "llm_first_token_seconds", + } + found_arms_metrics = validation_result["metrics_found"] & arms_metrics + assert ( + len(found_arms_metrics) == 0 + ), f"Should NOT have ARMS metrics, found: {found_arms_metrics}" + + # Explicitly check custom GenAI metrics are NOT present + custom_genai_metrics = { + "genai_calls_count", + "genai_calls_duration_seconds", + "genai_calls_error_count", + "genai_calls_slow_count", + "genai_llm_first_token_seconds", + "genai_llm_usage_tokens", + } + found_custom_metrics = ( + validation_result["metrics_found"] & custom_genai_metrics + ) + assert ( + len(found_custom_metrics) == 0 + ), f"Should NOT have custom GenAI metrics, found: {found_custom_metrics}" + + +# Run tests +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_plugin_integration.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_plugin_integration.py new file mode 100644 index 00000000..1327adcc --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_plugin_integration.py @@ -0,0 +1,639 @@ +""" +Integration tests for Google ADK Plugin with InMemoryExporter validation. + +Tests validate that spans are created with correct attributes according to +OpenTelemetry GenAI Semantic Conventions using real plugin callbacks and +InMemorySpanExporter to capture actual span data. + +This test follows the same pattern as the commercial ARMS version but validates +against the latest OpenTelemetry GenAI semantic conventions. +""" + +from typing import Any, Dict +from unittest.mock import Mock + +import pytest + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.google_adk import GoogleAdkInstrumentor +from opentelemetry.sdk import metrics as metrics_sdk +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + + +def create_mock_callback_context(session_id="session_123", user_id="user_456"): + """Create properly structured mock CallbackContext following ADK structure.""" + mock_callback_context = Mock() + mock_session = Mock() + mock_session.id = session_id + mock_invocation_context = Mock() + mock_invocation_context.session = mock_session + mock_callback_context._invocation_context = mock_invocation_context + mock_callback_context.user_id = user_id + return mock_callback_context + + +class OTelGenAISpanValidator: + """ + Validator for OpenTelemetry GenAI Semantic Conventions. + + Based on the latest OTel GenAI semantic conventions: + - gen_ai.provider.name (required, replaces gen_ai.system) + - gen_ai.operation.name (required, replaces gen_ai.span.kind) + - gen_ai.conversation.id (replaces gen_ai.session.id) + - enduser.id (replaces gen_ai.user.id) + - gen_ai.response.finish_reasons (array, replaces gen_ai.response.finish_reason) + - Tool attributes with gen_ai. prefix + - Agent attributes with gen_ai. prefix + """ + + # Required attributes for different operation types + REQUIRED_ATTRIBUTES_BY_OPERATION = { + "chat": { + "required": { + "gen_ai.operation.name", + "gen_ai.provider.name", + "gen_ai.request.model", + }, + "recommended": { + "gen_ai.response.model", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", + }, + }, + "invoke_agent": { + "required": {"gen_ai.operation.name"}, + "recommended": {"gen_ai.agent.name", "gen_ai.agent.description"}, + }, + "execute_tool": { + "required": {"gen_ai.operation.name", "gen_ai.tool.name"}, + "recommended": {"gen_ai.tool.description"}, + }, + } + + # Non-standard attributes that should NOT be present + NON_STANDARD_ATTRIBUTES = { + "gen_ai.span.kind", # Use gen_ai.operation.name instead + "gen_ai.system", # Use gen_ai.provider.name instead + "gen_ai.session.id", # Use gen_ai.conversation.id instead + "gen_ai.user.id", # Use enduser.id instead + "gen_ai.framework", # Non-standard + "gen_ai.model_name", # Redundant + "gen_ai.request.is_stream", # Non-standard + "gen_ai.usage.total_tokens", # Non-standard + "gen_ai.input.message_count", # Non-standard + "gen_ai.output.message_count", # Non-standard + } + + def validate_span(self, span, expected_operation: str) -> Dict[str, Any]: + """Validate a single span's attributes against OTel GenAI conventions.""" + validation_result = { + "span_name": span.name, + "expected_operation": expected_operation, + "errors": [], + "warnings": [], + "missing_required": [], + "missing_recommended": [], + "non_standard_found": [], + } + + attributes = getattr(span, "attributes", {}) or {} + + # Validate operation name + actual_operation = attributes.get("gen_ai.operation.name") + if not actual_operation: + validation_result["errors"].append( + "Missing required attribute: gen_ai.operation.name" + ) + elif actual_operation != expected_operation: + validation_result["errors"].append( + f"Expected operation '{expected_operation}', got '{actual_operation}'" + ) + + # Check for non-standard attributes + for attr_key in attributes.keys(): + if attr_key in self.NON_STANDARD_ATTRIBUTES: + validation_result["non_standard_found"].append(attr_key) + + # Validate required and recommended attributes + if expected_operation in self.REQUIRED_ATTRIBUTES_BY_OPERATION: + requirements = self.REQUIRED_ATTRIBUTES_BY_OPERATION[ + expected_operation + ] + + # Check required attributes + for attr in requirements["required"]: + if attr not in attributes: + validation_result["missing_required"].append(attr) + + # Check recommended attributes + for attr in requirements["recommended"]: + if attr not in attributes: + validation_result["missing_recommended"].append(attr) + + # Validate specific attribute formats + self._validate_attribute_formats(attributes, validation_result) + + return validation_result + + def _validate_attribute_formats(self, attributes: Dict, result: Dict): + """Validate attribute value formats and types.""" + + # Validate finish_reasons is array + if "gen_ai.response.finish_reasons" in attributes: + finish_reasons = attributes["gen_ai.response.finish_reasons"] + if not isinstance(finish_reasons, (list, tuple)): + result["errors"].append( + f"gen_ai.response.finish_reasons should be array, got {type(finish_reasons)}" + ) + + # Validate numeric attributes + numeric_attrs = [ + "gen_ai.request.max_tokens", + "gen_ai.usage.input_tokens", + "gen_ai.usage.output_tokens", + ] + for attr in numeric_attrs: + if attr in attributes and not isinstance( + attributes[attr], (int, float) + ): + result["errors"].append( + f"Attribute {attr} should be numeric, got {type(attributes[attr])}" + ) + + +class TestGoogleAdkPluginIntegration: + """Integration tests using InMemoryExporter to validate actual spans.""" + + def setup_method(self): + """Set up test fixtures for each test.""" + # Create independent providers and exporters + self.tracer_provider = trace_sdk.TracerProvider() + self.span_exporter = InMemorySpanExporter() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + self.metric_reader = InMemoryMetricReader() + self.meter_provider = metrics_sdk.MeterProvider( + metric_readers=[self.metric_reader] + ) + + # Create instrumentor + self.instrumentor = GoogleAdkInstrumentor() + + # Create validator + self.validator = OTelGenAISpanValidator() + + # Clean up any existing instrumentation + if self.instrumentor.is_instrumented_by_opentelemetry: + self.instrumentor.uninstrument() + + # Clear any existing spans + self.span_exporter.clear() + + def teardown_method(self): + """Clean up after each test.""" + try: + if self.instrumentor.is_instrumented_by_opentelemetry: + self.instrumentor.uninstrument() + except Exception: + pass + + # Clear spans + self.span_exporter.clear() + + @pytest.mark.asyncio + async def test_llm_span_attributes_semantic_conventions(self): + """ + Test that LLM spans follow the latest OTel GenAI semantic conventions. + + Validates: + - Span name format: "chat {model}" + - Required attributes: gen_ai.operation.name, gen_ai.provider.name + - Provider name instead of gen_ai.system + - No non-standard attributes + """ + # Instrument the plugin + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock LLM request + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + mock_llm_request.config.max_tokens = 1000 + mock_llm_request.config.temperature = 0.7 + mock_llm_request.config.top_p = 0.9 + mock_llm_request.config.top_k = 40 + mock_llm_request.contents = ["test message"] + mock_llm_request.stream = False + + # Create mock response + mock_llm_response = Mock() + mock_llm_response.model = "gemini-pro-001" + mock_llm_response.finish_reason = "stop" + mock_llm_response.content = "test response" + mock_llm_response.usage_metadata = Mock() + mock_llm_response.usage_metadata.prompt_token_count = 100 + mock_llm_response.usage_metadata.candidates_token_count = 50 + + mock_callback_context = create_mock_callback_context( + "conv_123", "user_456" + ) + + # Execute LLM span lifecycle + await plugin.before_model_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + ) + await plugin.after_model_callback( + callback_context=mock_callback_context, + llm_response=mock_llm_response, + ) + + # Get finished spans from InMemoryExporter + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1, "Should have exactly 1 LLM span" + + llm_span = spans[0] + + # Validate span name follows OTel convention: "chat {model}" + assert ( + llm_span.name == "chat gemini-pro" + ), f"Expected span name 'chat gemini-pro', got '{llm_span.name}'" + + # Validate span attributes using validator + validation_result = self.validator.validate_span(llm_span, "chat") + + # Check for errors + assert ( + len(validation_result["errors"]) == 0 + ), f"Validation errors: {validation_result['errors']}" + + # Check for non-standard attributes + assert ( + len(validation_result["non_standard_found"]) == 0 + ), f"Found non-standard attributes: {validation_result['non_standard_found']}" + + # Validate specific required attributes + attributes = llm_span.attributes + assert ( + attributes.get("gen_ai.operation.name") == "chat" + ), "Should have gen_ai.operation.name = 'chat'" + assert ( + "gen_ai.provider.name" in attributes + ), "Should have gen_ai.provider.name (not gen_ai.system)" + assert attributes.get("gen_ai.request.model") == "gemini-pro" + assert attributes.get("gen_ai.response.model") == "gemini-pro-001" + + # Validate token usage attributes + assert attributes.get("gen_ai.usage.input_tokens") == 100 + assert attributes.get("gen_ai.usage.output_tokens") == 50 + + # Validate conversation tracking uses correct attributes + assert ( + "gen_ai.conversation.id" in attributes + ), "Should use gen_ai.conversation.id (not gen_ai.session.id)" + assert attributes.get("gen_ai.conversation.id") == "conv_123" + assert ( + "enduser.id" in attributes + ), "Should use enduser.id (not gen_ai.user.id)" + assert attributes.get("enduser.id") == "user_456" + + # Validate finish_reasons is array + assert ( + "gen_ai.response.finish_reasons" in attributes + ), "Should have gen_ai.response.finish_reasons (array)" + finish_reasons = attributes.get("gen_ai.response.finish_reasons") + assert isinstance( + finish_reasons, (list, tuple) + ), "gen_ai.response.finish_reasons should be array" + + # Validate NO non-standard attributes + assert ( + "gen_ai.span.kind" not in attributes + ), "Should NOT have gen_ai.span.kind (non-standard)" + assert ( + "gen_ai.system" not in attributes + ), "Should NOT have gen_ai.system (use gen_ai.provider.name)" + assert ( + "gen_ai.framework" not in attributes + ), "Should NOT have gen_ai.framework (non-standard)" + + @pytest.mark.asyncio + async def test_agent_span_attributes_semantic_conventions(self): + """ + Test that Agent spans follow OTel GenAI semantic conventions. + + Validates: + - Span name format: "invoke_agent {agent_name}" + - gen_ai.operation.name = "invoke_agent" + - Agent attributes with gen_ai. prefix + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock agent + mock_agent = Mock() + mock_agent.name = "weather_agent" + mock_agent.description = "Agent for weather queries" + mock_agent.sub_agents = [] # Simple agent, not a chain + + mock_callback_context = create_mock_callback_context( + "session_789", "user_999" + ) + + # Execute Agent span lifecycle + await plugin.before_agent_callback( + agent=mock_agent, callback_context=mock_callback_context + ) + await plugin.after_agent_callback( + agent=mock_agent, callback_context=mock_callback_context + ) + + # Get finished spans + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1, "Should have exactly 1 Agent span" + + agent_span = spans[0] + + # Validate span name: "invoke_agent {agent_name}" + assert ( + agent_span.name == "invoke_agent weather_agent" + ), f"Expected span name 'invoke_agent weather_agent', got '{agent_span.name}'" + + # Validate attributes + validation_result = self.validator.validate_span( + agent_span, "invoke_agent" + ) + assert ( + len(validation_result["errors"]) == 0 + ), f"Validation errors: {validation_result['errors']}" + + attributes = agent_span.attributes + assert attributes.get("gen_ai.operation.name") == "invoke_agent" + + # Validate agent attributes have gen_ai. prefix + assert ( + "gen_ai.agent.name" in attributes or "agent.name" in attributes + ), "Should have agent name attribute" + assert ( + "gen_ai.agent.description" in attributes + or "agent.description" in attributes + ), "Should have agent description attribute" + + @pytest.mark.asyncio + async def test_tool_span_attributes_semantic_conventions(self): + """ + Test that Tool spans follow OTel GenAI semantic conventions. + + Validates: + - Span name format: "execute_tool {tool_name}" + - gen_ai.operation.name = "execute_tool" + - Tool attributes with gen_ai. prefix + - SpanKind = INTERNAL (per OTel convention) + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock tool + mock_tool = Mock() + mock_tool.name = "calculator" + mock_tool.description = "Mathematical calculator" + + mock_tool_args = {"operation": "add", "a": 5, "b": 3} + mock_tool_context = Mock() + mock_tool_context.session_id = "session_456" + mock_result = {"result": 8} + + # Execute Tool span lifecycle + await plugin.before_tool_callback( + tool=mock_tool, + tool_args=mock_tool_args, + tool_context=mock_tool_context, + ) + await plugin.after_tool_callback( + tool=mock_tool, + tool_args=mock_tool_args, + tool_context=mock_tool_context, + result=mock_result, + ) + + # Get finished spans + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1, "Should have exactly 1 Tool span" + + tool_span = spans[0] + + # Validate span name: "execute_tool {tool_name}" + assert ( + tool_span.name == "execute_tool calculator" + ), f"Expected span name 'execute_tool calculator', got '{tool_span.name}'" + + # Validate SpanKind (should be INTERNAL per OTel convention) + assert ( + tool_span.kind == trace_api.SpanKind.INTERNAL + ), "Tool spans should use SpanKind.INTERNAL" + + # Validate attributes + validation_result = self.validator.validate_span( + tool_span, "execute_tool" + ) + assert ( + len(validation_result["errors"]) == 0 + ), f"Validation errors: {validation_result['errors']}" + + attributes = tool_span.attributes + assert attributes.get("gen_ai.operation.name") == "execute_tool" + + # Validate tool attributes + assert attributes.get("gen_ai.tool.name") == "calculator" + assert ( + attributes.get("gen_ai.tool.description") + == "Mathematical calculator" + ) + + @pytest.mark.asyncio + async def test_runner_span_attributes(self): + """Test Runner span creation and attributes.""" + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock invocation context + mock_invocation_context = Mock() + mock_invocation_context.invocation_id = "run_12345" + mock_invocation_context.app_name = "test_app" + mock_invocation_context.session = Mock() + mock_invocation_context.session.id = "session_111" + mock_invocation_context.user_id = "user_222" + + # Execute Runner span lifecycle + await plugin.before_run_callback( + invocation_context=mock_invocation_context + ) + await plugin.after_run_callback( + invocation_context=mock_invocation_context + ) + + # Get finished spans + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1, "Should have exactly 1 Runner span" + + runner_span = spans[0] + + # Validate span name (runner uses agent-style naming) + assert ( + runner_span.name == "invoke_agent test_app" + ), f"Expected span name 'invoke_agent test_app', got '{runner_span.name}'" + + # Validate attributes + attributes = runner_span.attributes + assert attributes.get("gen_ai.operation.name") == "invoke_agent" + # Note: runner.app_name is namespaced with google_adk prefix + assert attributes.get("google_adk.runner.app_name") == "test_app" + + @pytest.mark.asyncio + async def test_error_handling_attributes(self): + """ + Test error handling and span status. + + Validates: + - Span status set to ERROR + - error.type attribute (not error.message per OTel) + - Span description contains error message + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create mock LLM request + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + + mock_callback_context = create_mock_callback_context( + "session_err", "user_err" + ) + + # Create error + test_error = Exception("API rate limit exceeded") + + # Execute error scenario + await plugin.before_model_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + ) + await plugin.on_model_error_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + error=test_error, + ) + + # Get finished spans + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1, "Should have exactly 1 error span" + + error_span = spans[0] + + # Validate span status + assert ( + error_span.status.status_code == trace_api.StatusCode.ERROR + ), "Error span should have ERROR status" + assert ( + "API rate limit exceeded" in error_span.status.description + ), "Error description should contain error message" + + # Validate error attributes + attributes = error_span.attributes + assert "error.type" in attributes, "Should have error.type attribute" + assert attributes["error.type"] == "Exception" + + # Note: error.message is non-standard, OTel recommends using span status + # but we may include it for debugging purposes + + @pytest.mark.asyncio + async def test_metrics_recorded_with_correct_dimensions(self): + """ + Test that metrics are recorded with correct OTel GenAI dimensions. + + Validates: + - gen_ai.client.operation.duration histogram + - gen_ai.client.token.usage histogram + - Correct dimension attributes + """ + # Instrument + self.instrumentor.instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + plugin = self.instrumentor._plugin + + # Create and execute LLM span + mock_llm_request = Mock() + mock_llm_request.model = "gemini-pro" + mock_llm_request.config = Mock() + mock_llm_request.config.max_tokens = 500 + mock_llm_request.config.temperature = 0.5 + mock_llm_request.contents = ["test"] + + mock_llm_response = Mock() + mock_llm_response.model = "gemini-pro" + mock_llm_response.finish_reason = "stop" + mock_llm_response.usage_metadata = Mock() + mock_llm_response.usage_metadata.prompt_token_count = 50 + mock_llm_response.usage_metadata.candidates_token_count = 30 + + mock_callback_context = create_mock_callback_context() + + await plugin.before_model_callback( + callback_context=mock_callback_context, + llm_request=mock_llm_request, + ) + await plugin.after_model_callback( + callback_context=mock_callback_context, + llm_response=mock_llm_response, + ) + + # Get metrics data + metrics_data = self.metric_reader.get_metrics_data() + + # Validate metrics exist + assert metrics_data is not None, "Should have metrics data" + + # Note: Detailed metric validation would require iterating through + # metrics_data.resource_metrics to find the specific histograms + # and verify their attributes match OTel GenAI conventions + + +# Run tests +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_utils.py new file mode 100644 index 00000000..9bfa739e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-google-adk/tests/test_utils.py @@ -0,0 +1,296 @@ +""" +Unit tests for utility functions. +""" + +import os + +import pytest + +from opentelemetry.instrumentation.google_adk.internal._utils import ( + extract_content_safely, + extract_model_name, + get_error_attributes, + get_max_content_length, + is_slow_call, + process_content, + safe_json_dumps, + should_capture_content, +) + + +class TestContentCapture: + """Tests for content capture utilities.""" + + def test_should_capture_content_default_false(self): + """Test that content capture is disabled by default.""" + # Clear environment variable + os.environ.pop( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", None + ) + # Default is False unless explicitly enabled + assert should_capture_content() is False + + def test_should_capture_content_enabled(self): + """Test that content capture can be explicitly enabled.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + assert should_capture_content() is True + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_should_capture_content_disabled(self): + """Test that content capture can be disabled.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "false" + ) + assert should_capture_content() is False + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_get_max_length_default(self): + """Test default max length.""" + os.environ.pop( + "OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH", None + ) + # Return value is Optional[int], so None is valid + max_len = get_max_content_length() + assert max_len is None or max_len > 0 + + def test_get_max_length_custom(self): + """Test custom max length.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH"] = ( + "1000" + ) + assert get_max_content_length() == 1000 + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH") + + def test_get_max_length_invalid(self): + """Test invalid max length returns None.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH"] = ( + "invalid" + ) + assert get_max_content_length() is None + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH") + + def test_process_content_short_string(self): + """Test processing short content.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + content = "Hello, world!" + result = process_content(content) + assert result == content + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_process_content_long_string(self): + """Test processing long content - may be truncated if max length set.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + os.environ["OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH"] = ( + "1000" + ) + content = "A" * 10000 + result = process_content(content) + # Result should be truncated + assert isinstance(result, str) + assert len(result) <= 1000 + assert "[TRUNCATED]" in result + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH") + + def test_process_content_when_disabled(self): + """Test processing content when capture is disabled.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "false" + ) + content = "B" * 200 + result = process_content(content) + # Should return empty string when disabled + assert result == "" + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_process_content_with_custom_max_length(self): + """Test processing content with custom max length.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + os.environ["OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH"] = ( + "100" + ) + content = "B" * 200 + result = process_content(content) + assert len(result) <= 100 + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_MESSAGE_CONTENT_MAX_LENGTH") + + +class TestJsonUtils: + """Tests for JSON utility functions.""" + + def test_safe_json_dumps_basic(self): + """Test basic JSON serialization.""" + data = {"key": "value", "number": 42} + result = safe_json_dumps(data) + assert '"key": "value"' in result + assert '"number": 42' in result + + def test_safe_json_dumps_nested(self): + """Test nested JSON serialization.""" + data = {"outer": {"inner": ["a", "b", "c"]}} + result = safe_json_dumps(data) + assert "outer" in result + assert "inner" in result + + def test_safe_json_dumps_error_fallback(self): + """Test fallback for non-serializable objects.""" + + class NonSerializable: + pass + + data = {"obj": NonSerializable()} + result = safe_json_dumps(data) + # Should return some string representation without crashing + assert isinstance(result, str) + + def test_extract_content_safely_string(self): + """Test extracting string content.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + result = extract_content_safely("test string") + assert result == "test string" + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_extract_content_safely_dict(self): + """Test extracting dict content.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + data = {"message": "test"} + result = extract_content_safely(data) + assert "message" in result + assert "test" in result + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_extract_content_safely_list(self): + """Test extracting list content.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "true" + ) + data = ["item1", "item2"] + result = extract_content_safely(data) + assert "item1" in result + assert "item2" in result + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_extract_content_safely_when_disabled(self): + """Test extracting content when capture is disabled.""" + os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = ( + "false" + ) + result = extract_content_safely("test string") + # Should return empty string when capture is disabled + assert result == "" + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT") + + def test_extract_content_safely_none(self): + """Test extracting None content.""" + result = extract_content_safely(None) + assert result == "" + + +class TestModelUtils: + """Tests for model-related utility functions.""" + + def test_extract_model_name_simple(self): + """Test extracting simple model name.""" + result = extract_model_name("gpt-4") + assert result == "gpt-4" + + def test_extract_model_name_with_provider(self): + """Test extracting model name from full path.""" + # extract_model_name returns the string as-is if it's a string + result = extract_model_name("providers/google/models/gemini-pro") + assert result == "providers/google/models/gemini-pro" + + def test_extract_model_name_empty(self): + """Test extracting empty model name.""" + # Empty string is still a string, so it returns as-is + result = extract_model_name("") + assert result == "" + + def test_extract_model_name_none(self): + """Test extracting None model name.""" + result = extract_model_name(None) + assert result == "unknown" + + def test_extract_model_name_from_object(self): + """Test extracting model name from object with model attribute.""" + from unittest.mock import Mock + + mock_obj = Mock() + mock_obj.model = "gemini-pro" + result = extract_model_name(mock_obj) + assert result == "gemini-pro" + + +class TestSpanUtils: + """Tests for span-related utility functions.""" + + def test_is_slow_call_threshold_exceeded(self): + """Test slow call detection when threshold exceeded.""" + # 2 seconds with 1 second threshold + assert is_slow_call(2.0, threshold=1.0) is True + + def test_is_slow_call_threshold_not_exceeded(self): + """Test slow call detection when threshold not exceeded.""" + # 0.5 seconds with 1 second threshold + assert is_slow_call(0.5, threshold=1.0) is False + + def test_is_slow_call_default_threshold(self): + """Test slow call detection with default threshold.""" + # Assuming default threshold is 0.5 seconds + # 1 second should be slow + assert is_slow_call(1.0) is True + # 0.1 seconds should not be slow + assert is_slow_call(0.1) is False + + +class TestErrorUtils: + """Tests for error handling utilities.""" + + def test_get_error_attributes_basic(self): + """Test getting error attributes for basic exception.""" + error = ValueError("test error") + attrs = get_error_attributes(error) + + assert attrs["error.type"] == "ValueError" + + def test_get_error_attributes_timeout(self): + """Test getting error attributes for timeout.""" + error = TimeoutError("Operation timed out") + attrs = get_error_attributes(error) + + assert attrs["error.type"] == "TimeoutError" + + def test_get_error_attributes_custom_exception(self): + """Test getting error attributes for custom exception.""" + + class CustomError(Exception): + pass + + error = CustomError("custom message") + attrs = get_error_attributes(error) + + assert attrs["error.type"] == "CustomError" + + def test_get_error_attributes_none(self): + """Test getting error attributes when None is passed.""" + # Even None has a type, so error.type will be 'NoneType' + attrs = get_error_attributes(None) + assert "error.type" in attrs + assert attrs["error.type"] == "NoneType" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])