diff --git a/docs/howtos/integrations/_ag_ui.md b/docs/howtos/integrations/_ag_ui.md new file mode 100644 index 0000000000..cf9e056a5b --- /dev/null +++ b/docs/howtos/integrations/_ag_ui.md @@ -0,0 +1,318 @@ +# AG-UI Integration +Ragas can evaluate agents that stream events via the [AG-UI protocol](https://docs.ag-ui.com/). This notebook shows how to build evaluation datasets, configure metrics, and score AG-UI endpoints. + + +## Prerequisites +- Install optional dependencies with `pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio` +- Start an AG-UI compatible agent locally (Google ADK, PydanticAI, CrewAI, etc.) +- Create an `.env` file with your evaluator LLM credentials (e.g. `OPENAI_API_KEY`, `GOOGLE_API_KEY`, etc.) +- If you run this notebook, call `nest_asyncio.apply()` (shown below) so you can `await` coroutines in-place. + + + +```python +# !pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio + +``` + +## Imports and environment setup +Load environment variables and import the classes used throughout the walkthrough. + + + +```python +import asyncio + +from dotenv import load_dotenv +import nest_asyncio +from IPython.display import display +from langchain_openai import ChatOpenAI + +from ragas.dataset_schema import EvaluationDataset, SingleTurnSample, MultiTurnSample +from ragas.integrations.ag_ui import ( + evaluate_ag_ui_agent, + convert_to_ragas_messages, + convert_messages_snapshot, +) +from ragas.messages import HumanMessage, ToolCall +from ragas.metrics import FactualCorrectness, ToolCallF1 +from ragas.llms import LangchainLLMWrapper +from ag_ui.core import ( + MessagesSnapshotEvent, + TextMessageChunkEvent, + UserMessage, + AssistantMessage, +) + +load_dotenv() +# Patch the existing notebook loop so we can await coroutines safely +nest_asyncio.apply() + +``` + +## Build single-turn evaluation data +Create `SingleTurnSample` entries when you only need to grade the final answer text. + + + +```python +scientist_questions = EvaluationDataset( + samples=[ + SingleTurnSample( + user_input="Who originated the theory of relativity?", + reference="Albert Einstein originated the theory of relativity.", + ), + SingleTurnSample( + user_input="Who discovered penicillin and when?", + reference="Alexander Fleming discovered penicillin in 1928.", + ), + ] +) + +scientist_questions + +``` + + + + + EvaluationDataset(features=['user_input', 'reference'], len=2) + + + +## Build multi-turn conversations +For tool-usage metrics, extend the dataset with `MultiTurnSample` and expected tool calls. + + + +```python +weather_queries = EvaluationDataset( + samples=[ + MultiTurnSample( + user_input=[HumanMessage(content="What's the weather in Paris?")], + reference_tool_calls=[ + ToolCall(name="weatherTool", args={"location": "Paris"}) + ], + ) + ] +) + +weather_queries + +``` + + + + + EvaluationDataset(features=['user_input', 'reference_tool_calls'], len=1) + + + +## Configure metrics and the evaluator LLM +Wrap your grading model with the appropriate adapter and instantiate the metrics you plan to use. + + + +```python +evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini")) + +qa_metrics = [FactualCorrectness(llm=evaluator_llm)] +tool_metrics = [ToolCallF1()] # rule-based, no LLM required + +``` + + /var/folders/8k/tf3xr1rd1fl_dz35dfhfp_tc0000gn/T/ipykernel_93918/2135722072.py:1: DeprecationWarning: LangchainLLMWrapper is deprecated and will be removed in a future version. Use llm_factory instead: from openai import OpenAI; from ragas.llms import llm_factory; llm = llm_factory('gpt-4o-mini', client=OpenAI(api_key='...')) + evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini")) + + +## Evaluate a live AG-UI endpoint +Set the endpoint URL exposed by your agent. Toggle the flags when you are ready to run the evaluations. +In Jupyter/IPython you can `await` the helpers directly once `nest_asyncio.apply()` has been called. + + + +```python +AG_UI_ENDPOINT = "http://localhost:8000/agentic_chat" # Update to match your agent + +RUN_FACTUAL_EVAL = False +RUN_TOOL_EVAL = False + +``` + + +```python +async def evaluate_factual(): + return await evaluate_ag_ui_agent( + endpoint_url=AG_UI_ENDPOINT, + dataset=scientist_questions, + metrics=qa_metrics, + evaluator_llm=evaluator_llm, + metadata=True, + ) + +if RUN_FACTUAL_EVAL: + factual_result = await evaluate_factual() + factual_df = factual_result.to_pandas() + display(factual_df) + +``` + + + Calling AG-UI Agent: 0%| | 0/2 [00:00 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
user_inputretrieved_contextsresponsereferencefactual_correctness(mode=f1)
0Who originated the theory of relativity?[]The theory of relativity was originated by Alb...Albert Einstein originated the theory of relat...0.33
1Who discovered penicillin and when?[]Penicillin was discovered by Alexander Fleming...Alexander Fleming discovered penicillin in 1928.1.00
+ + + + +```python +async def evaluate_tool_usage(): + return await evaluate_ag_ui_agent( + endpoint_url=AG_UI_ENDPOINT, + dataset=weather_queries, + metrics=tool_metrics, + evaluator_llm=evaluator_llm, + ) + +if RUN_TOOL_EVAL: + tool_result = await evaluate_tool_usage() + tool_df = tool_result.to_pandas() + display(tool_df) + +``` + + + Calling AG-UI Agent: 0%| | 0/1 [00:00 + + + + + + + + + + + + + + + + + + +
user_inputreference_tool_callstool_call_f1
0[{'content': 'What's the weather in Paris?', '...[{'name': 'weatherTool', 'args': {'location': ...0.0
+ + + +## Convert recorded AG-UI events +Use the conversion helpers when you already have an event log to grade offline. + + + +```python +events = [ + TextMessageChunkEvent( + message_id="assistant-1", + role="assistant", + delta="Hello from AG-UI!", + ) +] + +messages_from_stream = convert_to_ragas_messages(events, metadata=True) + +snapshot = MessagesSnapshotEvent( + messages=[ + UserMessage(id="msg-1", content="Hello?"), + AssistantMessage(id="msg-2", content="Hi! How can I help you today?"), + ] +) + +messages_from_snapshot = convert_messages_snapshot(snapshot) + +messages_from_stream, messages_from_snapshot + +``` + + + + + ([AIMessage(content='Hello from AG-UI!', metadata={'timestamp': None, 'message_id': 'assistant-1'}, type='ai', tool_calls=None)], + [HumanMessage(content='Hello?', metadata=None, type='human'), + AIMessage(content='Hi! How can I help you today?', metadata=None, type='ai', tool_calls=None)]) + + + + +```python + +``` diff --git a/docs/howtos/integrations/ag_ui.ipynb b/docs/howtos/integrations/ag_ui.ipynb new file mode 100644 index 0000000000..1faa0e53d8 --- /dev/null +++ b/docs/howtos/integrations/ag_ui.ipynb @@ -0,0 +1,516 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cdcdd4d1", + "metadata": {}, + "source": [ + "# AG-UI Integration\n", + "Ragas can evaluate agents that stream events via the [AG-UI protocol](https://docs.ag-ui.com/). This notebook shows how to build evaluation datasets, configure metrics, and score AG-UI endpoints.\n" + ] + }, + { + "cell_type": "markdown", + "id": "ca0af3e1", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "- Install optional dependencies with `pip install \"ragas[ag-ui]\" langchain-openai python-dotenv nest_asyncio`\n", + "- Start an AG-UI compatible agent locally (Google ADK, PydanticAI, CrewAI, etc.)\n", + "- Create an `.env` file with your evaluator LLM credentials (e.g. `OPENAI_API_KEY`, `GOOGLE_API_KEY`, etc.)\n", + "- If you run this notebook, call `nest_asyncio.apply()` (shown below) so you can `await` coroutines in-place.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67b16d64", + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install \"ragas[ag-ui]\" langchain-openai python-dotenv nest_asyncio\n" + ] + }, + { + "cell_type": "markdown", + "id": "7486082d", + "metadata": {}, + "source": [ + "## Imports and environment setup\n", + "Load environment variables and import the classes used throughout the walkthrough.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "c051059b", + "metadata": {}, + "outputs": [], + "source": [ + "import nest_asyncio\n", + "from ag_ui.core import (\n", + " AssistantMessage,\n", + " MessagesSnapshotEvent,\n", + " TextMessageChunkEvent,\n", + " UserMessage,\n", + ")\n", + "from dotenv import load_dotenv\n", + "from IPython.display import display\n", + "from langchain_openai import ChatOpenAI\n", + "\n", + "from ragas.dataset_schema import EvaluationDataset, MultiTurnSample, SingleTurnSample\n", + "from ragas.integrations.ag_ui import (\n", + " convert_messages_snapshot,\n", + " convert_to_ragas_messages,\n", + " evaluate_ag_ui_agent,\n", + ")\n", + "from ragas.llms import LangchainLLMWrapper\n", + "from ragas.messages import HumanMessage, ToolCall\n", + "from ragas.metrics import FactualCorrectness, ToolCallF1\n", + "\n", + "load_dotenv()\n", + "# Patch the existing notebook loop so we can await coroutines safely\n", + "nest_asyncio.apply()" + ] + }, + { + "cell_type": "markdown", + "id": "7e69bc6c", + "metadata": {}, + "source": [ + "## Build single-turn evaluation data\n", + "Create `SingleTurnSample` entries when you only need to grade the final answer text.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "803cc334", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "EvaluationDataset(features=['user_input', 'reference'], len=2)" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "scientist_questions = EvaluationDataset(\n", + " samples=[\n", + " SingleTurnSample(\n", + " user_input=\"Who originated the theory of relativity?\",\n", + " reference=\"Albert Einstein originated the theory of relativity.\",\n", + " ),\n", + " SingleTurnSample(\n", + " user_input=\"Who discovered penicillin and when?\",\n", + " reference=\"Alexander Fleming discovered penicillin in 1928.\",\n", + " ),\n", + " ]\n", + ")\n", + "\n", + "scientist_questions" + ] + }, + { + "cell_type": "markdown", + "id": "d4f1bbb7", + "metadata": {}, + "source": [ + "## Build multi-turn conversations\n", + "For tool-usage metrics, extend the dataset with `MultiTurnSample` and expected tool calls.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7a55eb0a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "EvaluationDataset(features=['user_input', 'reference_tool_calls'], len=1)" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "weather_queries = EvaluationDataset(\n", + " samples=[\n", + " MultiTurnSample(\n", + " user_input=[HumanMessage(content=\"What's the weather in Paris?\")],\n", + " reference_tool_calls=[\n", + " ToolCall(name=\"weatherTool\", args={\"location\": \"Paris\"})\n", + " ],\n", + " )\n", + " ]\n", + ")\n", + "\n", + "weather_queries" + ] + }, + { + "cell_type": "markdown", + "id": "14c3da95", + "metadata": {}, + "source": [ + "## Configure metrics and the evaluator LLM\n", + "Wrap your grading model with the appropriate adapter and instantiate the metrics you plan to use.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "05a59dde", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/var/folders/8k/tf3xr1rd1fl_dz35dfhfp_tc0000gn/T/ipykernel_93918/2135722072.py:1: DeprecationWarning: LangchainLLMWrapper is deprecated and will be removed in a future version. Use llm_factory instead: from openai import OpenAI; from ragas.llms import llm_factory; llm = llm_factory('gpt-4o-mini', client=OpenAI(api_key='...'))\n", + " evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model=\"gpt-4o-mini\"))\n" + ] + } + ], + "source": [ + "evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model=\"gpt-4o-mini\"))\n", + "\n", + "qa_metrics = [FactualCorrectness(llm=evaluator_llm)]\n", + "tool_metrics = [ToolCallF1()] # rule-based, no LLM required" + ] + }, + { + "cell_type": "markdown", + "id": "9e65fe39", + "metadata": {}, + "source": [ + "## Evaluate a live AG-UI endpoint\n", + "Set the endpoint URL exposed by your agent. Toggle the flags when you are ready to run the evaluations.\n", + "In Jupyter/IPython you can `await` the helpers directly once `nest_asyncio.apply()` has been called.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "b9808e04", + "metadata": {}, + "outputs": [], + "source": [ + "AG_UI_ENDPOINT = \"http://localhost:8000/agentic_chat\" # Update to match your agent\n", + "\n", + "RUN_FACTUAL_EVAL = False\n", + "RUN_TOOL_EVAL = False" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "79e80383", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "23ae31282b934d0390f316f966690d44", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Calling AG-UI Agent: 0%| | 0/2 [00:00\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_inputretrieved_contextsresponsereferencefactual_correctness(mode=f1)
0Who originated the theory of relativity?[]The theory of relativity was originated by Alb...Albert Einstein originated the theory of relat...0.33
1Who discovered penicillin and when?[]Penicillin was discovered by Alexander Fleming...Alexander Fleming discovered penicillin in 1928.1.00
\n", + "" + ], + "text/plain": [ + " user_input retrieved_contexts \\\n", + "0 Who originated the theory of relativity? [] \n", + "1 Who discovered penicillin and when? [] \n", + "\n", + " response \\\n", + "0 The theory of relativity was originated by Alb... \n", + "1 Penicillin was discovered by Alexander Fleming... \n", + "\n", + " reference \\\n", + "0 Albert Einstein originated the theory of relat... \n", + "1 Alexander Fleming discovered penicillin in 1928. \n", + "\n", + " factual_correctness(mode=f1) \n", + "0 0.33 \n", + "1 1.00 " + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "async def evaluate_factual():\n", + " return await evaluate_ag_ui_agent(\n", + " endpoint_url=AG_UI_ENDPOINT,\n", + " dataset=scientist_questions,\n", + " metrics=qa_metrics,\n", + " evaluator_llm=evaluator_llm,\n", + " metadata=True,\n", + " )\n", + "\n", + "\n", + "if RUN_FACTUAL_EVAL:\n", + " factual_result = await evaluate_factual()\n", + " factual_df = factual_result.to_pandas()\n", + " display(factual_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "8b731189", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "351ca0c016cc46cd9c0321d43d283f05", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Calling AG-UI Agent: 0%| | 0/1 [00:00\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
user_inputreference_tool_callstool_call_f1
0[{'content': 'What's the weather in Paris?', '...[{'name': 'weatherTool', 'args': {'location': ...0.0
\n", + "" + ], + "text/plain": [ + " user_input \\\n", + "0 [{'content': 'What's the weather in Paris?', '... \n", + "\n", + " reference_tool_calls tool_call_f1 \n", + "0 [{'name': 'weatherTool', 'args': {'location': ... 0.0 " + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "async def evaluate_tool_usage():\n", + " return await evaluate_ag_ui_agent(\n", + " endpoint_url=AG_UI_ENDPOINT,\n", + " dataset=weather_queries,\n", + " metrics=tool_metrics,\n", + " evaluator_llm=evaluator_llm,\n", + " )\n", + "\n", + "\n", + "if RUN_TOOL_EVAL:\n", + " tool_result = await evaluate_tool_usage()\n", + " tool_df = tool_result.to_pandas()\n", + " display(tool_df)" + ] + }, + { + "cell_type": "markdown", + "id": "452627cf", + "metadata": {}, + "source": [ + "## Convert recorded AG-UI events\n", + "Use the conversion helpers when you already have an event log to grade offline.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "b691bcf7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "([AIMessage(content='Hello from AG-UI!', metadata={'timestamp': None, 'message_id': 'assistant-1'}, type='ai', tool_calls=None)],\n", + " [HumanMessage(content='Hello?', metadata=None, type='human'),\n", + " AIMessage(content='Hi! How can I help you today?', metadata=None, type='ai', tool_calls=None)])" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "events = [\n", + " TextMessageChunkEvent(\n", + " message_id=\"assistant-1\",\n", + " role=\"assistant\",\n", + " delta=\"Hello from AG-UI!\",\n", + " )\n", + "]\n", + "\n", + "messages_from_stream = convert_to_ragas_messages(events, metadata=True)\n", + "\n", + "snapshot = MessagesSnapshotEvent(\n", + " messages=[\n", + " UserMessage(id=\"msg-1\", content=\"Hello?\"),\n", + " AssistantMessage(id=\"msg-2\", content=\"Hi! How can I help you today?\"),\n", + " ]\n", + ")\n", + "\n", + "messages_from_snapshot = convert_messages_snapshot(snapshot)\n", + "\n", + "messages_from_stream, messages_from_snapshot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cf6235fd-ec1c-4e87-a53f-a2ebf89a29b6", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/howtos/integrations/ag_ui.md b/docs/howtos/integrations/ag_ui.md new file mode 100644 index 0000000000..353a8445e0 --- /dev/null +++ b/docs/howtos/integrations/ag_ui.md @@ -0,0 +1,197 @@ +# AG-UI + +[AG-UI](https://docs.ag-ui.com/) is an event-based protocol for streaming agent updates to user interfaces. The protocol standardizes message, tool-call, and state events, which makes it easy to plug different agent runtimes into visual frontends. The `ragas.integrations.ag_ui` module helps you transform those event streams into Ragas message objects and evaluate live AG-UI endpoints with the same metrics used across the rest of the Ragas ecosystem. + +This guide assumes you already have an AG-UI compatible agent running (for example, one built with Google ADK, PydanticAI, or CrewAI) and that you are familiar with creating evaluation datasets in Ragas. + +## Install the integration + +The AG-UI helpers live behind an optional extra. Install it together with the dependencies required by your evaluator LLM. When running inside Jupyter or IPython, include `nest_asyncio` so you can reuse the notebook's event loop. + +```bash +pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio +``` + +Configure your evaluator LLM credentials. For example, if you are using OpenAI models: + +```bash +# .env +OPENAI_API_KEY=sk-... +``` + +Load the environment variables inside Python before running the examples: + +```python +from dotenv import load_dotenv +import nest_asyncio + +load_dotenv() + +# If you're inside Jupyter/IPython, patch the running event loop once. +nest_asyncio.apply() +``` + +## Build an evaluation dataset + +`EvaluationDataset` can contain single-turn or multi-turn samples. With AG-UI you can evaluate either pattern—single questions with free-form responses, or longer conversations that can include tool calls. + +### Single-turn samples + +Use `SingleTurnSample` when you only need the final answer text. + +```python +from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + +scientist_questions = EvaluationDataset( + samples=[ + SingleTurnSample( + user_input="Who originated the theory of relativity?", + reference="Albert Einstein originated the theory of relativity." + ), + SingleTurnSample( + user_input="Who discovered penicillin and when?", + reference="Alexander Fleming discovered penicillin in 1928." + ), + ] +) +``` + +### Multi-turn samples with tool expectations + +When you want to grade intermediate agent behavior—like whether it calls tools correctly—switch to `MultiTurnSample`. Provide an initial conversation history and (optionally) expected tool calls. + +```python +from ragas.dataset_schema import EvaluationDataset, MultiTurnSample +from ragas.messages import HumanMessage, ToolCall + +weather_queries = EvaluationDataset( + samples=[ + MultiTurnSample( + user_input=[HumanMessage(content="What's the weather in Paris?")], + reference_tool_calls=[ + ToolCall(name="weatherTool", args={"location": "Paris"}) + ] + ) + ] +) +``` + +## Choose metrics and evaluator model + +The integration works with any Ragas metric. For most text-based evaluations you will want a grading LLM. Wrap your model with the appropriate adapter (LangChain shown here, but llama-index and LiteLLM wrappers work as well). + +```python +from ragas.metrics import FactualCorrectness, ToolCallF1 +from ragas.llms import LangchainLLMWrapper +from langchain_openai import ChatOpenAI + +evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini")) + +qa_metrics = [FactualCorrectness(llm=evaluator_llm)] +tool_metrics = [ToolCallF1()] # rule-based metric, no LLM required +``` + +## Evaluate a live AG-UI endpoint + +`evaluate_ag_ui_agent` calls your FastAPI endpoint, captures the AG-UI Server-Sent Events (SSE) stream, converts those events into Ragas messages, and runs the metrics you selected. + +> ⚠️ The endpoint must expose the AG-UI SSE stream. Common paths include `/chat`, `/agent`, or `/agentic_chat`. + +### Evaluate factual responses + +In Jupyter or IPython, use top-level `await` (after `nest_asyncio.apply()`) instead of `asyncio.run` to avoid the "event loop is already running" error. For scripts you can keep `asyncio.run`. + +```python +import asyncio +from ragas.integrations.ag_ui import evaluate_ag_ui_agent + +async def run_factual_eval(): + result = await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agentic_chat", + dataset=scientist_questions, + metrics=qa_metrics, + evaluator_llm=evaluator_llm, + metadata=True, # optional, keeps run/thread metadata on messages + ) + return result + +# In Jupyter/IPython (after calling nest_asyncio.apply()) +factual_result = await run_factual_eval() + +# In a standalone script, use: +# factual_result = asyncio.run(run_factual_eval()) +factual_result.to_pandas() +``` + +The resulting dataframe includes per-sample scores, raw agent responses, and any retrieved contexts (if provided by the agent). You can save it with `result.save()` or export to CSV through pandas. + +### Evaluate tool usage + +The same function supports multi-turn datasets. Agent responses (AI messages and tool outputs) are appended to the existing conversation before scoring. + +```python +async def run_tool_eval(): + result = await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agentic_chat", + dataset=weather_queries, + metrics=tool_metrics, + evaluator_llm=evaluator_llm, + ) + return result + +# In Jupyter/IPython +tool_result = await run_tool_eval() + +# Or in a script +# tool_result = asyncio.run(run_tool_eval()) +tool_result.to_pandas() +``` + +If a request fails, the executor logs the error and marks the corresponding sample with `NaN` scores so you can retry or inspect the endpoint logs. + +## Working directly with AG-UI events + +Sometimes you may want to collect event logs separately—perhaps from a recorded run or a staging environment—and evaluate them offline. The conversion helpers expose the same parsing logic used by `evaluate_ag_ui_agent`. + +```python +from ragas.integrations.ag_ui import convert_to_ragas_messages +from ag_ui.core import TextMessageChunkEvent + +events = [ + TextMessageChunkEvent( + message_id="assistant-1", + role="assistant", + delta="Hello from AG-UI!", + timestamp="2024-12-01T00:00:00Z", + ) +] + +ragas_messages = convert_to_ragas_messages(events, metadata=True) +``` + +If you already have a `MessagesSnapshotEvent` you can skip streaming reconstruction and call `convert_messages_snapshot`. + +```python +from ragas.integrations.ag_ui import convert_messages_snapshot +from ag_ui.core import MessagesSnapshotEvent, UserMessage, AssistantMessage + +snapshot = MessagesSnapshotEvent( + messages=[ + UserMessage(id="msg-1", content="Hello?"), + AssistantMessage(id="msg-2", content="Hi! How can I help you today?"), + ] +) + +ragas_messages = convert_messages_snapshot(snapshot) +``` + +The converted messages can be plugged into `EvaluationDataset` objects or passed directly to lower-level Ragas evaluation APIs if you need custom workflows. + +## Tips for production evaluations + +- **Batch size**: use the `batch_size` argument to control parallel requests to your agent. +- **Custom headers**: pass authentication tokens or tenant IDs via `extra_headers`. +- **Timeouts**: tune the `timeout` parameter if your agent performs long-running tool calls. +- **Metadata debugging**: set `metadata=True` to keep AG-UI run, thread, and message IDs on every `RagasMessage` for easier traceability. + +Once you are satisfied with your scoring setup, consider wrapping the snippets in a script or notebook. An example walkthrough notebook is available at `docs/howtos/integrations/ag_ui.ipynb`. diff --git a/examples/ragas_examples/ag_ui_agent_evals/README.md b/examples/ragas_examples/ag_ui_agent_evals/README.md new file mode 100644 index 0000000000..0846b6b483 --- /dev/null +++ b/examples/ragas_examples/ag_ui_agent_evals/README.md @@ -0,0 +1,315 @@ +# AG-UI Agent Evaluation Examples + +This example demonstrates how to evaluate agents built with the **AG-UI protocol** using Ragas metrics. + +## What is AG-UI? + +AG-UI (Agent-to-UI) is a protocol for streaming agent events from backend to frontend. It defines a standardized event format for agent-to-UI communication, enabling real-time streaming of agent actions, tool calls, and responses. + +## Prerequisites + +Before running these examples, you need to have an AG-UI compatible agent running. Follow the [AG-UI Quickstart Guide](https://docs.ag-ui.com/quickstart/applications) to set up your agent. + +### Popular AG-UI Compatible Frameworks + +- **Google ADK (Agent Development Kit)** - Google's framework for building AI agents +- **Pydantic AI** - Type-safe agent framework using Pydantic +- **Mastra** - Modular, TypeScript-based agentic AI framework +- **Crew.ai** - Python framework for orchestrating collaborative, specialized AI agent teams +- And more... + +### Example Setup + +Here's a quick overview of setting up an AG-UI agent (refer to the [official documentation](https://docs.ag-ui.com/quickstart/applications) for detailed instructions): + +1. Choose your agent framework (e.g., Google ADK, Pydantic AI) +2. Implement your agent with the required tools +3. Start the AG-UI server (typically runs at `http://localhost:8000/chat` or `http://localhost:8000/agentic_chat`) +4. Verify the endpoint is accessible + +## Installation + +Install the required dependencies: + +```bash +# From the ragas repository root +uv pip install -e ".[dev]" + +# Or install specific dependencies +pip install ragas langchain-openai +``` + +## Evaluation Scenarios + +This example includes two evaluation scenarios: + +### 1. Scientist Biographies (Factual Correctness) + +Tests the agent's ability to provide factually correct information about famous scientists. + +- **Metric**: `FactualCorrectness` - Measures how accurate the agent's responses are compared to reference answers +- **Dataset**: `test_data/scientist_biographies.csv` - 5 questions about scientists (Einstein, Fleming, Newton, etc.) +- **Sample Type**: `SingleTurnSample` - Simple question-answer pairs + +### 2. Weather Tool Usage (Tool Call F1) + +Tests the agent's ability to correctly invoke the weather tool when appropriate. + +- **Metric**: `ToolCallF1` - F1 score measuring precision and recall of tool invocations +- **Dataset**: `test_data/weather_tool_calls.csv` - 5 queries requiring weather tool calls +- **Sample Type**: `MultiTurnSample` - Multi-turn conversations with tool call expectations + +## Usage + +### Basic Usage + +Run both evaluation scenarios: + +```bash +cd examples/ragas_examples/ag_ui_agent_evals +python evals.py --endpoint-url http://localhost:8000/agentic_chat +``` + +### Command Line Options + +```bash +# Specify a different endpoint +python evals.py --endpoint-url http://localhost:8010/chat + +# Use a different evaluator model +python evals.py --evaluator-model gpt-4o + +# Skip the factual correctness evaluation +python evals.py --skip-factual + +# Skip the tool call evaluation +python evals.py --skip-tool-eval + +# Specify output directory for results +python evals.py --output-dir ./results + +# Combine options +python evals.py \ + --endpoint-url http://localhost:8000/agentic_chat \ + --evaluator-model gpt-4o-mini \ + --output-dir ./my_results +``` + +### Using uv (Recommended) + +```bash +# Run with uv from the examples directory +cd examples +uv run python ragas_examples/ag_ui_agent_evals/evals.py --endpoint-url http://localhost:8000/agentic_chat +``` + +## Expected Output + +### Console Output + +The script will print detailed evaluation results: + +``` +================================================================================ +Starting Scientist Biographies Evaluation +================================================================================ +Loading scientist biographies dataset from .../test_data/scientist_biographies.csv +Loaded 5 scientist biography samples +Evaluating against endpoint: http://localhost:8000/agentic_chat + +================================================================================ +Scientist Biographies Evaluation Results +================================================================================ + user_input ... factual_correctness(mode=f1) +0 Who originated the theory of relativity... ... 0.75 +1 Who discovered penicillin and when... ... 1.00 +... + +Average Factual Correctness: 0.7160 +Perfect scores (1.0): 2/5 + +Results saved to: .../scientist_biographies_results_20250101_143022.csv + +================================================================================ +Starting Weather Tool Usage Evaluation +================================================================================ +... +Average Tool Call F1: 1.0000 +Perfect scores (F1=1.0): 5/5 +Failed scores (F1=0.0): 0/5 + +Results saved to: .../weather_tool_calls_results_20250101_143045.csv + +================================================================================ +All evaluations completed successfully! +================================================================================ +``` + +### CSV Output Files + +Results are saved as timestamped CSV files: + +- `scientist_biographies_results_YYYYMMDD_HHMMSS.csv` +- `weather_tool_calls_results_YYYYMMDD_HHMMSS.csv` + +Example CSV structure: + +```csv +user_input,response,reference,factual_correctness(mode=f1) +"Who originated the theory of relativity...","Albert Einstein...","Albert Einstein originated...",0.75 +``` + +## Customizing the Evaluation + +### Adding New Test Cases + +#### For Factual Correctness + +Edit `test_data/scientist_biographies.csv`: + +```csv +user_input,reference +"Your question here","Your reference answer here" +``` + +#### For Tool Call Evaluation + +Edit `test_data/weather_tool_calls.csv`: + +```csv +user_input,reference_tool_calls +"What's the weather in Paris?","[{\"name\": \"weatherTool\", \"args\": {\"location\": \"Paris\"}}]" +``` + +### Using Different Metrics + +Modify `evals.py` to include additional Ragas metrics: + +```python +from ragas.metrics import AnswerRelevancy, ContextPrecision + +# In evaluate_scientist_biographies function: +metrics = [ + FactualCorrectness(), + AnswerRelevancy(), # Add additional metrics +] +``` + +### Evaluating Your Own Agent + +1. **Ensure your agent supports AG-UI protocol** + - Agent must expose an endpoint that accepts AG-UI messages + - Agent must return Server-Sent Events (SSE) with AG-UI event format + +2. **Update the endpoint URL** + ```bash + python evals.py --endpoint-url http://your-agent:port/your-endpoint + ``` + +3. **Customize test data** + - Create new CSV files with your test cases + - Update the loader functions in `evals.py` if needed + +## Troubleshooting + +### Connection Errors + +``` +Error: Connection refused at http://localhost:8000/agentic_chat +``` + +**Solution**: Ensure your AG-UI agent is running and accessible at the specified endpoint. + +### Import Errors + +``` +ImportError: No module named 'ragas' +``` + +**Solution**: Install ragas and its dependencies: +```bash +pip install ragas langchain-openai +``` + +### API Key Errors + +``` +Error: OpenAI API key not found +``` + +**Solution**: Set your OpenAI API key: +```bash +export OPENAI_API_KEY='your-api-key-here' +``` + +### Agent Timeout + +``` +Error: Request timeout after 60.0 seconds +``` + +**Solution**: Your agent may be slow to respond. You can increase the timeout in the code or optimize your agent's performance. + +## Understanding the Results + +### Factual Correctness Metric + +- **Range**: 0.0 to 1.0 +- **1.0**: Perfect match between response and reference +- **0.5-0.9**: Partially correct with some missing or incorrect information +- **<0.5**: Significant discrepancies with the reference + +### Tool Call F1 Metric + +- **Range**: 0.0 to 1.0 +- **1.0**: Perfect tool call accuracy (correct tools with correct arguments) +- **0.5-0.9**: Some correct tools but missing some or calling extra tools +- **0.0**: Incorrect tool usage or no tool calls when expected + +## Integration with Your Workflow + +### CI/CD Integration + +You can integrate these evaluations into your CI/CD pipeline: + +```bash +# In your CI script +python evals.py \ + --endpoint-url http://staging-agent:8000/chat \ + --output-dir ./test-results \ + || exit 1 +``` + +### Tracking Performance Over Time + +Save results with timestamps to track improvements: + +```bash +# Run evaluations regularly +python evals.py --output-dir ./historical-results/$(date +%Y%m%d) +``` + +### Automated Testing + +Create a simple test harness: + +```python +import subprocess +import sys + +result = subprocess.run( + ["python", "evals.py", "--endpoint-url", "http://localhost:8000/chat"], + capture_output=True +) + +if result.returncode != 0: + print("Evaluation failed!") + sys.exit(1) +``` + +## Additional Resources + +- [AG-UI Documentation](https://docs.ag-ui.com) +- [AG-UI Quickstart](https://docs.ag-ui.com/quickstart/applications) +- [Ragas Documentation](https://docs.ragas.io) +- [Ragas AG-UI Integration Guide](https://docs.ragas.io/integrations/ag-ui) diff --git a/examples/ragas_examples/ag_ui_agent_evals/__init__.py b/examples/ragas_examples/ag_ui_agent_evals/__init__.py new file mode 100644 index 0000000000..7b75b49c7f --- /dev/null +++ b/examples/ragas_examples/ag_ui_agent_evals/__init__.py @@ -0,0 +1,52 @@ +""" +AG-UI Agent Evaluation Examples + +This package demonstrates how to evaluate agents built with the AG-UI protocol +using Ragas metrics. + +## What is AG-UI? + +AG-UI (Agent-to-UI) is a protocol for streaming agent events from backend to frontend. +It defines a standardized event format for agent-to-UI communication. + +## Getting Started + +Before running these examples, you'll need to have an AG-UI compatible agent running. +Follow the AG-UI quickstart guide to set up your agent: + +https://docs.ag-ui.com/quickstart/applications + +Popular agent frameworks that support AG-UI include: +- Google ADK (Agent Development Kit) +- Pydantic AI +- And more... + +## Running the Examples + +Once you have your AG-UI agent endpoint running (typically at +http://localhost:8000/chat or http://localhost:8000/agentic_chat), you can run +the evaluation examples: + +```bash +# From the examples directory +cd ragas_examples/ag_ui_agent_evals +uv run python evals.py --endpoint-url http://localhost:8000/agentic_chat +``` + +## Evaluation Scenarios + +This package includes two evaluation scenarios: + +1. **Scientist Biographies** - Tests factual correctness of agent responses + using the FactualCorrectness metric with SingleTurnSample datasets. + +2. **Weather Tool Usage** - Tests tool calling accuracy using the ToolCallF1 + metric with MultiTurnSample datasets. + +## Results + +Evaluation results are saved as CSV files with timestamps for tracking performance +over time. +""" + +__version__ = "0.1.0" diff --git a/examples/ragas_examples/ag_ui_agent_evals/evals.py b/examples/ragas_examples/ag_ui_agent_evals/evals.py new file mode 100644 index 0000000000..fbf8229170 --- /dev/null +++ b/examples/ragas_examples/ag_ui_agent_evals/evals.py @@ -0,0 +1,314 @@ +""" +AG-UI Agent Evaluation Script + +This script demonstrates how to evaluate agents built with the AG-UI protocol +using Ragas metrics. It includes two evaluation scenarios: + +1. Scientist Biographies - Tests factual correctness of agent responses +2. Weather Tool Usage - Tests tool calling accuracy + +Prerequisites: +- An AG-UI compatible agent running at the specified endpoint URL +- See https://docs.ag-ui.com/quickstart/applications for agent setup + +Usage: + python evals.py --endpoint-url http://localhost:8000/agentic_chat + python evals.py --endpoint-url http://localhost:8000/chat --skip-tool-eval +""" + +import argparse +import asyncio +import csv +import json +import logging +import os +from datetime import datetime +from pathlib import Path +from typing import List + +from langchain_openai import ChatOpenAI + +from ragas.dataset_schema import ( + EvaluationDataset, + MultiTurnSample, + SingleTurnSample, +) +from ragas.integrations.ag_ui import evaluate_ag_ui_agent +from ragas.llms import LangchainLLMWrapper +from ragas.messages import HumanMessage, ToolCall +from ragas.metrics import FactualCorrectness, ToolCallF1 + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Get the directory where this script is located +SCRIPT_DIR = Path(__file__).parent +TEST_DATA_DIR = SCRIPT_DIR / "test_data" + + +def load_scientist_dataset() -> EvaluationDataset: + """ + Load the scientist biographies dataset from CSV. + + Returns: + EvaluationDataset with SingleTurnSample entries for testing factual correctness. + """ + csv_path = TEST_DATA_DIR / "scientist_biographies.csv" + logger.info(f"Loading scientist biographies dataset from {csv_path}") + + samples = [] + with open(csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + sample = SingleTurnSample( + user_input=row["user_input"], reference=row["reference"] + ) + samples.append(sample) + + logger.info(f"Loaded {len(samples)} scientist biography samples") + return EvaluationDataset(samples=samples) + + +def load_weather_dataset() -> EvaluationDataset: + """ + Load the weather tool call dataset from CSV. + + Returns: + EvaluationDataset with MultiTurnSample entries for testing tool call accuracy. + """ + csv_path = TEST_DATA_DIR / "weather_tool_calls.csv" + logger.info(f"Loading weather tool call dataset from {csv_path}") + + samples = [] + with open(csv_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + # Parse the reference_tool_calls JSON + tool_calls_data = json.loads(row["reference_tool_calls"]) + tool_calls = [ + ToolCall(name=tc["name"], args=tc["args"]) for tc in tool_calls_data + ] + + # Create MultiTurnSample with user_input as a list of HumanMessage + sample = MultiTurnSample( + user_input=[HumanMessage(content=row["user_input"])], + reference_tool_calls=tool_calls, + ) + samples.append(sample) + + logger.info(f"Loaded {len(samples)} weather tool call samples") + return EvaluationDataset(samples=samples) + + +async def evaluate_scientist_biographies( + endpoint_url: str, evaluator_llm: LangchainLLMWrapper +) -> tuple: + """ + Evaluate the agent's ability to provide factually correct information + about scientists. + + Args: + endpoint_url: The AG-UI endpoint URL + evaluator_llm: The LLM to use for evaluation + + Returns: + Tuple of (result, dataframe) where result is the EvaluationResult + and dataframe is the pandas DataFrame with results. + """ + logger.info("=" * 80) + logger.info("Starting Scientist Biographies Evaluation") + logger.info("=" * 80) + + # Load dataset + dataset = load_scientist_dataset() + + # Define metrics + metrics = [FactualCorrectness()] + + # Run evaluation + logger.info(f"Evaluating against endpoint: {endpoint_url}") + result = await evaluate_ag_ui_agent( + endpoint_url=endpoint_url, + dataset=dataset, + metrics=metrics, + evaluator_llm=evaluator_llm, + ) + + # Convert to DataFrame and clean up + df = result.to_pandas() + df = df.drop(columns=["retrieved_contexts"], errors="ignore") + + # Print summary + logger.info("\n" + "=" * 80) + logger.info("Scientist Biographies Evaluation Results") + logger.info("=" * 80) + logger.info(f"\nDataFrame shape: {df.shape}") + logger.info(f"\n{df.to_string()}") + + if "factual_correctness(mode=f1)" in df.columns: + avg_correctness = df["factual_correctness(mode=f1)"].mean() + logger.info(f"\nAverage Factual Correctness: {avg_correctness:.4f}") + logger.info( + f"Perfect scores (1.0): {(df['factual_correctness(mode=f1)'] == 1.0).sum()}/{len(df)}" + ) + + return result, df + + +async def evaluate_weather_tool_use( + endpoint_url: str, evaluator_llm: LangchainLLMWrapper +) -> tuple: + """ + Evaluate the agent's ability to correctly call the weather tool. + + Args: + endpoint_url: The AG-UI endpoint URL + evaluator_llm: The LLM to use for evaluation + + Returns: + Tuple of (result, dataframe) where result is the EvaluationResult + and dataframe is the pandas DataFrame with results. + """ + logger.info("\n" + "=" * 80) + logger.info("Starting Weather Tool Usage Evaluation") + logger.info("=" * 80) + + # Load dataset + dataset = load_weather_dataset() + + # Define metrics + metrics = [ToolCallF1()] + + # Run evaluation + logger.info(f"Evaluating against endpoint: {endpoint_url}") + result = await evaluate_ag_ui_agent( + endpoint_url=endpoint_url, + dataset=dataset, + metrics=metrics, + evaluator_llm=evaluator_llm, + ) + + # Convert to DataFrame and clean up + df = result.to_pandas() + columns_to_drop = [ + col for col in ["retrieved_contexts", "reference"] if col in df.columns + ] + if columns_to_drop: + df = df.drop(columns=columns_to_drop) + + # Print summary + logger.info("\n" + "=" * 80) + logger.info("Weather Tool Usage Evaluation Results") + logger.info("=" * 80) + logger.info(f"\nDataFrame shape: {df.shape}") + logger.info(f"\n{df.to_string()}") + + if "tool_call_f1" in df.columns: + avg_f1 = df["tool_call_f1"].mean() + logger.info(f"\nAverage Tool Call F1: {avg_f1:.4f}") + logger.info( + f"Perfect scores (F1=1.0): {(df['tool_call_f1'] == 1.0).sum()}/{len(df)}" + ) + logger.info( + f"Failed scores (F1=0.0): {(df['tool_call_f1'] == 0.0).sum()}/{len(df)}" + ) + + return result, df + + +def save_results(df, scenario_name: str, output_dir: Path = None): + """ + Save evaluation results to a timestamped CSV file. + + Args: + df: The pandas DataFrame with evaluation results + scenario_name: Name of the evaluation scenario + output_dir: Directory to save results (defaults to script directory) + """ + if output_dir is None: + output_dir = SCRIPT_DIR + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{scenario_name}_results_{timestamp}.csv" + filepath = output_dir / filename + + df.to_csv(filepath, index=False) + logger.info(f"\nResults saved to: {filepath}") + + +async def main(): + """Main execution function.""" + # Parse command line arguments + parser = argparse.ArgumentParser( + description="Evaluate AG-UI agents using Ragas metrics" + ) + parser.add_argument( + "--endpoint-url", + type=str, + default="http://localhost:8000/agentic_chat", + help="AG-UI endpoint URL (default: http://localhost:8000/agentic_chat)", + ) + parser.add_argument( + "--evaluator-model", + type=str, + default="gpt-4o-mini", + help="OpenAI model to use for evaluation (default: gpt-4o-mini)", + ) + parser.add_argument( + "--skip-factual", + action="store_true", + help="Skip the factual correctness evaluation", + ) + parser.add_argument( + "--skip-tool-eval", + action="store_true", + help="Skip the tool call evaluation", + ) + parser.add_argument( + "--output-dir", + type=Path, + default=None, + help="Directory to save results (default: script directory)", + ) + + args = parser.parse_args() + + # Setup evaluator LLM + logger.info(f"Setting up evaluator LLM: {args.evaluator_model}") + llm = ChatOpenAI(model=args.evaluator_model) + evaluator_llm = LangchainLLMWrapper(llm) + + # Run evaluations + try: + if not args.skip_factual: + result, df = await evaluate_scientist_biographies( + args.endpoint_url, evaluator_llm + ) + save_results(df, "scientist_biographies", args.output_dir) + + if not args.skip_tool_eval: + result, df = await evaluate_weather_tool_use( + args.endpoint_url, evaluator_llm + ) + save_results(df, "weather_tool_calls", args.output_dir) + + logger.info("\n" + "=" * 80) + logger.info("All evaluations completed successfully!") + logger.info("=" * 80) + + except Exception as e: + logger.error(f"\nEvaluation failed with error: {e}") + logger.error( + "\nPlease ensure your AG-UI agent is running at the specified endpoint." + ) + logger.error( + "See https://docs.ag-ui.com/quickstart/applications for setup instructions." + ) + raise + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv b/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv new file mode 100644 index 0000000000..9bae4b1a9b --- /dev/null +++ b/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv @@ -0,0 +1,6 @@ +user_input,reference +"Who originated the theory of relativity and where were they born?","Albert Einstein originated the theory of relativity. He was born in Ulm, in the Kingdom of Württemberg, Germany." +"Who discovered penicillin and when was it discovered?","Alexander Fleming discovered penicillin in 1928." +"Who proposed the law of universal gravitation and in what century?","Isaac Newton proposed the law of universal gravitation in the 17th century." +"Who is known as the father of modern chemistry and why?","Antoine Lavoisier is known as the father of modern chemistry for establishing the law of conservation of mass." +"Who developed the polio vaccine and where was it first tested?","Jonas Salk developed the polio vaccine, first tested in the United States." diff --git a/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv b/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv new file mode 100644 index 0000000000..7dd4a0ea55 --- /dev/null +++ b/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv @@ -0,0 +1,6 @@ +user_input,reference_tool_calls +"What's the weather like in San Francisco?","[{""name"": ""weatherTool"", ""args"": {""location"": ""San Francisco""}}]" +"Can you check the weather in Tokyo?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Tokyo""}}]" +"What is the temperature like in Paris today?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Paris""}}]" +"Is it sunny in Rome?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Rome""}}]" +"Is it raining in London right now?","[{""name"": ""weatherTool"", ""args"": {""location"": ""London""}}]" diff --git a/mkdocs.yml b/mkdocs.yml index 673f45b0c0..62ce82f979 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -130,6 +130,7 @@ nav: - Evaluate and Improve a RAG App: howtos/applications/evaluate-and-improve-rag.md - Integrations: - howtos/integrations/index.md + - AG-UI: howtos/integrations/ag_ui.md - Arize: howtos/integrations/_arize.md - Amazon Bedrock: howtos/integrations/amazon_bedrock.md - Haystack: howtos/integrations/haystack.md diff --git a/pyproject.toml b/pyproject.toml index 436b89bab2..7cfc6c8e2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ gdrive = [ ] ai-frameworks = ["haystack-ai"] oci = ["oci>=2.160.1"] +ag-ui = ["ag-ui-protocol>=0.1.9", "httpx>=0.27.0"] # Minimal dev dependencies for fast development setup (used by make install-minimal) dev-minimal = [ diff --git a/src/ragas/integrations/__init__.py b/src/ragas/integrations/__init__.py index 141ed39a4b..c9c40446bf 100644 --- a/src/ragas/integrations/__init__.py +++ b/src/ragas/integrations/__init__.py @@ -10,6 +10,7 @@ - Observability: Helicone, Langsmith, Opik - Platforms: Amazon Bedrock, R2R - AI Systems: Swarm for multi-agent evaluation +- Protocols: AG-UI for event-based agent communication Import tracing integrations: ```python diff --git a/src/ragas/integrations/ag_ui.py b/src/ragas/integrations/ag_ui.py new file mode 100644 index 0000000000..69bc928dd8 --- /dev/null +++ b/src/ragas/integrations/ag_ui.py @@ -0,0 +1,1312 @@ +""" +AG-UI Protocol Integration for Ragas. + +This module provides conversion utilities and evaluation functions for AG-UI +protocol agents. It supports converting AG-UI streaming events to Ragas message +format and evaluating AG-UI FastAPI endpoints. + +AG-UI is an event-based protocol for agent-to-UI communication that uses typed +events for streaming text messages, tool calls, and state synchronization. This +integration supports both streaming events (Start-Content-End triads) and +convenience chunk events (TextMessageChunk, ToolCallChunk) for complete messages. + +Functions: + convert_to_ragas_messages: Convert AG-UI event sequences to Ragas messages + convert_messages_snapshot: Convert AG-UI message snapshots to Ragas messages + evaluate_ag_ui_agent: Batch evaluate an AG-UI FastAPI endpoint + +Examples: + Convert streaming AG-UI events to Ragas messages:: + + from ragas.integrations.ag_ui import convert_to_ragas_messages + from ag_ui.core import Event + + # List of AG-UI events from agent run + ag_ui_events: List[Event] = [...] + + # Convert to Ragas messages + ragas_messages = convert_to_ragas_messages(ag_ui_events, metadata=True) + + Evaluate an AG-UI agent endpoint (single-turn):: + + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + from ragas.metrics import AspectCritic + + dataset = EvaluationDataset(samples=[ + SingleTurnSample(user_input="What's the weather in SF?") + ]) + + result = await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[AspectCritic()] + ) + + Evaluate with multi-turn conversations and tool calls:: + + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + from ragas.dataset_schema import EvaluationDataset, MultiTurnSample + from ragas.messages import HumanMessage, ToolCall + from ragas.metrics import ToolCallF1 + + dataset = EvaluationDataset(samples=[ + MultiTurnSample( + user_input=[HumanMessage(content="What's the weather in SF?")], + reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})] + ) + ]) + + result = await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[ToolCallF1()] + ) +""" + +from __future__ import annotations + +import json +import logging +import math +import typing as t +import uuid +from typing import Any, Dict, List, Optional, Union + +from ragas.dataset_schema import ( + EvaluationDataset, + EvaluationResult, + MultiTurnSample, + SingleTurnSample, +) +from ragas.evaluation import evaluate as ragas_evaluate +from ragas.executor import Executor +from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage +from ragas.run_config import RunConfig + +if t.TYPE_CHECKING: + from ragas.metrics.base import Metric + +logger = logging.getLogger(__name__) + + +# Lazy imports for ag_ui to avoid hard dependency +def _import_ag_ui_core(): + """Import AG-UI core types with helpful error message.""" + try: + from ag_ui.core import ( + BaseEvent, + Event, + EventType, + MessagesSnapshotEvent, + TextMessageChunkEvent, + TextMessageContentEvent, + TextMessageEndEvent, + TextMessageStartEvent, + ToolCallArgsEvent, + ToolCallChunkEvent, + ToolCallEndEvent, + ToolCallResultEvent, + ToolCallStartEvent, + ) + + return ( + BaseEvent, + Event, + EventType, + MessagesSnapshotEvent, + TextMessageStartEvent, + TextMessageContentEvent, + TextMessageEndEvent, + TextMessageChunkEvent, + ToolCallStartEvent, + ToolCallArgsEvent, + ToolCallEndEvent, + ToolCallResultEvent, + ToolCallChunkEvent, + ) + except ImportError as e: + raise ImportError( + "AG-UI integration requires the ag-ui-protocol package. " + "Install it with: pip install ag-ui-protocol" + ) from e + + +class AGUIEventCollector: + """ + Collects and reconstructs complete messages from streaming AG-UI events. + + AG-UI uses an event-based streaming protocol where messages are delivered + incrementally through Start->Content->End event sequences (triads). This + collector accumulates these events and reconstructs complete Ragas messages. + It also supports convenience chunk events (TextMessageChunk, ToolCallChunk) + for complete messages delivered in a single event. + + Attributes + ---------- + messages : List[Union[HumanMessage, AIMessage, ToolMessage]] + Accumulated complete messages ready for Ragas evaluation. + include_metadata : bool + Whether to include AG-UI metadata in converted messages. + + Example + ------- + >>> collector = AGUIEventCollector(metadata=True) + >>> for event in ag_ui_event_stream: + ... collector.process_event(event) + >>> ragas_messages = collector.get_messages() + """ + + def __init__(self, metadata: bool = False): + """ + Initialize the event collector. + + Parameters + ---------- + metadata : bool, optional + Whether to include AG-UI event metadata in Ragas messages (default: False) + """ + self.include_metadata = metadata + self.messages: List[Union[HumanMessage, AIMessage, ToolMessage]] = [] + + # State tracking for streaming message reconstruction + self._active_text_messages: Dict[str, Dict[str, Any]] = {} + self._active_tool_calls: Dict[str, Dict[str, Any]] = {} + self._completed_tool_calls: Dict[str, ToolCall] = {} + + # Context tracking for metadata + self._current_run_id: Optional[str] = None + self._current_thread_id: Optional[str] = None + self._current_step: Optional[str] = None + + # Cache AG-UI imports to avoid repeated import calls + ( + self._BaseEvent, + self._Event, + self._EventType, + self._MessagesSnapshotEvent, + self._TextMessageStartEvent, + self._TextMessageContentEvent, + self._TextMessageEndEvent, + self._TextMessageChunkEvent, + self._ToolCallStartEvent, + self._ToolCallArgsEvent, + self._ToolCallEndEvent, + self._ToolCallResultEvent, + self._ToolCallChunkEvent, + ) = _import_ag_ui_core() + + def _get_pending_tool_calls(self) -> Optional[List[ToolCall]]: + """ + Retrieve and clear any completed tool calls waiting to be attached to a message. + + Returns + ------- + Optional[List[ToolCall]] + List of pending tool calls if any exist, None otherwise. + """ + if self._completed_tool_calls: + tool_calls = list(self._completed_tool_calls.values()) + self._completed_tool_calls.clear() + return tool_calls + return None + + def process_event(self, event: Any) -> None: + """ + Process a single AG-UI event and update internal state. + + Parameters + ---------- + event : Event + An AG-UI protocol event from ag_ui.core + + Notes + ----- + This method handles different event types: + - Lifecycle events (RUN_STARTED, STEP_STARTED): Update context + - Text message events: Accumulate and reconstruct messages (streaming triads or chunks) + - Tool call events: Reconstruct tool calls and results (streaming triads or chunks) + - Other events: Silently ignored + """ + # Use cached AG-UI imports + EventType = self._EventType + + event_type = event.type + + # Update context from lifecycle events + if event_type == EventType.RUN_STARTED: + self._current_run_id = event.run_id + self._current_thread_id = event.thread_id + elif event_type == EventType.STEP_STARTED: + self._current_step = event.step_name + elif event_type == EventType.STEP_FINISHED: + if event.step_name == self._current_step: + self._current_step = None + + # Handle text message events + elif event_type == EventType.TEXT_MESSAGE_START: + self._handle_text_message_start(event) + elif event_type == EventType.TEXT_MESSAGE_CONTENT: + self._handle_text_message_content(event) + elif event_type == EventType.TEXT_MESSAGE_END: + self._handle_text_message_end(event) + elif event_type == EventType.TEXT_MESSAGE_CHUNK: + self._handle_text_message_chunk(event) + + # Handle tool call events + elif event_type == EventType.TOOL_CALL_START: + self._handle_tool_call_start(event) + elif event_type == EventType.TOOL_CALL_ARGS: + self._handle_tool_call_args(event) + elif event_type == EventType.TOOL_CALL_END: + self._handle_tool_call_end(event) + elif event_type == EventType.TOOL_CALL_RESULT: + self._handle_tool_call_result(event) + elif event_type == EventType.TOOL_CALL_CHUNK: + self._handle_tool_call_chunk(event) + + # MessagesSnapshot provides complete history + elif event_type == EventType.MESSAGES_SNAPSHOT: + self._handle_messages_snapshot(event) + + # Ignore lifecycle, state management, and other events + else: + logger.debug(f"Ignoring AG-UI event type: {event_type}") + + def _handle_text_message_start(self, event: Any) -> None: + """Initialize a new streaming text message.""" + self._active_text_messages[event.message_id] = { + "message_id": event.message_id, + "role": event.role, + "content_chunks": [], + "timestamp": event.timestamp, + } + + def _handle_text_message_content(self, event: Any) -> None: + """Accumulate text content chunk for a streaming message.""" + if event.message_id in self._active_text_messages: + self._active_text_messages[event.message_id]["content_chunks"].append( + event.delta + ) + else: + logger.warning( + f"Received TextMessageContent for unknown message_id: {event.message_id}" + ) + + def _handle_text_message_end(self, event: Any) -> None: + """Finalize a streaming text message and convert to Ragas format.""" + if event.message_id not in self._active_text_messages: + logger.warning( + f"Received TextMessageEnd for unknown message_id: {event.message_id}" + ) + return + + msg_data = self._active_text_messages.pop(event.message_id) + content = "".join(msg_data["content_chunks"]) + role = msg_data["role"] + + # Build metadata if requested + metadata = None + if self.include_metadata: + metadata = { + "message_id": msg_data["message_id"], + "timestamp": msg_data["timestamp"], + } + if self._current_run_id: + metadata["run_id"] = self._current_run_id + if self._current_thread_id: + metadata["thread_id"] = self._current_thread_id + if self._current_step: + metadata["step_name"] = self._current_step + + # Convert to appropriate Ragas message type + if role == "assistant": + # Check if there are completed tool calls for this message + # Tool calls are associated by being emitted before the message end + tool_calls = self._get_pending_tool_calls() + + self.messages.append( + AIMessage(content=content, tool_calls=tool_calls, metadata=metadata) + ) + elif role == "user": + self.messages.append(HumanMessage(content=content, metadata=metadata)) + else: + logger.warning(f"Unexpected message role: {role}") + + def _handle_tool_call_start(self, event: Any) -> None: + """Initialize a new streaming tool call.""" + self._active_tool_calls[event.tool_call_id] = { + "tool_call_id": event.tool_call_id, + "tool_call_name": event.tool_call_name, + "parent_message_id": getattr(event, "parent_message_id", None), + "args_chunks": [], + "timestamp": event.timestamp, + } + + def _handle_tool_call_args(self, event: Any) -> None: + """Accumulate tool argument chunks.""" + if event.tool_call_id in self._active_tool_calls: + self._active_tool_calls[event.tool_call_id]["args_chunks"].append( + event.delta + ) + else: + logger.warning( + f"Received ToolCallArgs for unknown tool_call_id: {event.tool_call_id}" + ) + + def _handle_tool_call_end(self, event: Any) -> None: + """Finalize a tool call specification (args are complete, but not yet executed).""" + if event.tool_call_id not in self._active_tool_calls: + logger.warning( + f"Received ToolCallEnd for unknown tool_call_id: {event.tool_call_id}" + ) + return + + tool_data = self._active_tool_calls.pop(event.tool_call_id) + args_json = "".join(tool_data["args_chunks"]) + + # Parse tool arguments + try: + args = json.loads(args_json) if args_json else {} + except json.JSONDecodeError: + logger.error( + f"Failed to parse tool call arguments for {tool_data['tool_call_name']}: {args_json}" + ) + args = {"raw_args": args_json} + + # Store completed tool call for association with next AI message + self._completed_tool_calls[event.tool_call_id] = ToolCall( + name=tool_data["tool_call_name"], args=args + ) + + def _handle_tool_call_result(self, event: Any) -> None: + """ + Convert tool call result to Ragas ToolMessage. + + Also ensures that the most recent AIMessage has tool_calls attached, + which is required for MultiTurnSample validation (ToolMessage must be + preceded by an AIMessage with tool_calls). + """ + # Find the most recent AIMessage + ai_msg_idx = None + for i in range(len(self.messages) - 1, -1, -1): + if isinstance(self.messages[i], AIMessage): + ai_msg_idx = i + break + + # Ensure the AIMessage has tool_calls + if ai_msg_idx is not None: + ai_msg_candidate = self.messages[ai_msg_idx] + + if not isinstance(ai_msg_candidate, AIMessage): + logger.warning( + "Expected AIMessage when handling tool call result, " + f"received {type(ai_msg_candidate).__name__}" + ) + return + + ai_msg = ai_msg_candidate + + # If it doesn't have tool_calls, we need to add them + if ai_msg.tool_calls is None or len(ai_msg.tool_calls) == 0: + # Check if there are unclaimed tool calls + if self._completed_tool_calls: + # Attach unclaimed tool calls + new_tool_calls = list(self._completed_tool_calls.values()) + self.messages[ai_msg_idx] = AIMessage( + content=ai_msg.content, + metadata=ai_msg.metadata, + tool_calls=new_tool_calls, + ) + self._completed_tool_calls.clear() + else: + # No unclaimed tool calls, create a synthetic one + # This can happen if tool calls were already attached but lost somehow + logger.warning( + f"ToolCallResult for {event.tool_call_id} but preceding AIMessage " + f"has no tool_calls. Creating synthetic tool call." + ) + synthetic_tool_call = ToolCall( + name="unknown_tool", # We don't have the tool name + args={}, + ) + self.messages[ai_msg_idx] = AIMessage( + content=ai_msg.content, + metadata=ai_msg.metadata, + tool_calls=[synthetic_tool_call], + ) + elif self._completed_tool_calls: + # AIMessage already has tool_calls, but there are unclaimed ones + # Append them + existing_tool_calls = ai_msg.tool_calls or [] + new_tool_calls = list(self._completed_tool_calls.values()) + self.messages[ai_msg_idx] = AIMessage( + content=ai_msg.content, + metadata=ai_msg.metadata, + tool_calls=existing_tool_calls + new_tool_calls, + ) + self._completed_tool_calls.clear() + else: + # No AIMessage found at all - create one + logger.warning( + "ToolCallResult received but no AIMessage found. Creating synthetic AIMessage." + ) + if self._completed_tool_calls: + new_tool_calls = list(self._completed_tool_calls.values()) + else: + new_tool_calls = [ToolCall(name="unknown_tool", args={})] + + self.messages.append( + AIMessage(content="", metadata=None, tool_calls=new_tool_calls) + ) + self._completed_tool_calls.clear() + + metadata = None + if self.include_metadata: + metadata = { + "tool_call_id": event.tool_call_id, + "message_id": event.message_id, + "timestamp": event.timestamp, + } + if self._current_run_id: + metadata["run_id"] = self._current_run_id + if self._current_thread_id: + metadata["thread_id"] = self._current_thread_id + + self.messages.append(ToolMessage(content=event.content, metadata=metadata)) + + def _handle_text_message_chunk(self, event: Any) -> None: + """ + Process a TextMessageChunkEvent - a convenience event combining start, content, and end. + + This handler processes complete messages available at once, bypassing the + Start-Content-End streaming sequence. + """ + # Extract message data from chunk event + message_id = getattr(event, "message_id", None) + role = getattr(event, "role", "assistant") + content = getattr(event, "delta", "") + + # Build metadata if requested + metadata = None + if self.include_metadata: + metadata = { + "timestamp": event.timestamp, + } + if message_id: + metadata["message_id"] = message_id + if self._current_run_id: + metadata["run_id"] = self._current_run_id + if self._current_thread_id: + metadata["thread_id"] = self._current_thread_id + if self._current_step: + metadata["step_name"] = self._current_step + + # Convert to appropriate Ragas message type + if role == "assistant": + # Check if there are completed tool calls for this message + tool_calls = self._get_pending_tool_calls() + + self.messages.append( + AIMessage(content=content, tool_calls=tool_calls, metadata=metadata) + ) + elif role == "user": + self.messages.append(HumanMessage(content=content, metadata=metadata)) + else: + logger.warning(f"Unexpected message role in chunk event: {role}") + + def _handle_tool_call_chunk(self, event: Any) -> None: + """ + Process a ToolCallChunkEvent - a convenience event combining tool call specification. + + This handler processes complete tool calls available at once, bypassing the + Start-Args-End streaming sequence. + """ + # Extract tool call data from chunk event + tool_call_id = getattr(event, "tool_call_id", None) + tool_call_name = getattr(event, "tool_call_name", None) + args_delta = getattr(event, "delta", None) + + if not tool_call_name: + logger.warning("Received ToolCallChunk without tool_call_name") + return + + # Parse tool arguments from delta if provided + args = {} + if args_delta: + if isinstance(args_delta, str): + try: + args = json.loads(args_delta) + except json.JSONDecodeError: + logger.error( + f"Failed to parse tool call arguments for {tool_call_name}: {args_delta}" + ) + args = {"raw_args": args_delta} + elif isinstance(args_delta, dict): + args = args_delta + else: + args = {"raw_args": str(args_delta)} + + # Store completed tool call for association with next AI message + if tool_call_id: + self._completed_tool_calls[tool_call_id] = ToolCall( + name=tool_call_name, args=args + ) + else: + # If no ID provided, generate one + temp_id = f"chunk_{len(self._completed_tool_calls)}" + self._completed_tool_calls[temp_id] = ToolCall( + name=tool_call_name, args=args + ) + + def _handle_messages_snapshot(self, event: Any) -> None: + """ + Process a MessagesSnapshotEvent containing complete message history. + + This bypasses streaming reconstruction and directly converts + AG-UI Message objects to Ragas format using type-based checking. + """ + # Import AG-UI message types for type checking + try: + from ag_ui.core import ( + AssistantMessage, + ToolMessage as AGUIToolMessage, + UserMessage, + ) + except ImportError as e: + raise ImportError( + "AG-UI message types are required for snapshot processing. " + "Install with: pip install ag-ui-protocol" + ) from e + + for msg in event.messages: + content = str(getattr(msg, "content", "")) + + metadata = None + if self.include_metadata: + metadata = {"source": "messages_snapshot"} + if hasattr(msg, "id"): + metadata["message_id"] = msg.id + + # Type-based checking for AG-UI Message objects + if isinstance(msg, AssistantMessage): + # Check for tool calls in message + tool_calls = None + if hasattr(msg, "tool_calls") and msg.tool_calls: + tool_calls = [] + for tc in msg.tool_calls: + tc_obj = t.cast(Any, tc) + name = t.cast(str, getattr(tc_obj, "name", "unknown_tool")) + raw_args = getattr(tc_obj, "args", {}) + if not isinstance(raw_args, dict): + raw_args = {"raw_args": raw_args} + tool_calls.append( + ToolCall( + name=name, + args=t.cast(Dict[str, Any], raw_args), + ) + ) + self.messages.append( + AIMessage(content=content, tool_calls=tool_calls, metadata=metadata) + ) + elif isinstance(msg, UserMessage): + self.messages.append(HumanMessage(content=content, metadata=metadata)) + elif isinstance(msg, AGUIToolMessage): + self.messages.append(ToolMessage(content=content, metadata=metadata)) + else: + logger.debug( + f"Skipping message with unknown type: {type(msg).__name__}" + ) + + def get_messages(self) -> List[Union[HumanMessage, AIMessage, ToolMessage]]: + """ + Retrieve all accumulated Ragas messages. + + Returns + ------- + List[Union[HumanMessage, AIMessage, ToolMessage]] + Complete list of Ragas messages reconstructed from AG-UI events. + + Notes + ----- + This returns a copy of the accumulated messages. The collector's + internal state is not cleared, so calling this multiple times + returns the same messages. + """ + return self.messages.copy() + + def clear(self) -> None: + """ + Clear all accumulated messages and reset internal state. + + Useful for reusing the same collector instance for multiple + conversation sessions. + """ + self.messages.clear() + self._active_text_messages.clear() + self._active_tool_calls.clear() + self._completed_tool_calls.clear() + self._current_run_id = None + self._current_thread_id = None + self._current_step = None + + +def convert_to_ragas_messages( + events: List[Any], + metadata: bool = False, +) -> List[Union[HumanMessage, AIMessage, ToolMessage]]: + """ + Convert a sequence of AG-UI protocol events to Ragas message format. + + This function processes AG-UI events and reconstructs complete messages + from streaming event sequences (Start->Content->End patterns). It handles + text messages, tool calls, and filters out non-message events like + lifecycle and state management events. + + Parameters + ---------- + events : List[Event] + List of AG-UI protocol events from ag_ui.core. Can contain any mix + of event types - non-message events are automatically filtered out. + metadata : bool, optional + Whether to include AG-UI event metadata (run_id, thread_id, timestamps) + in the converted Ragas messages (default: False). + + Returns + ------- + List[Union[HumanMessage, AIMessage, ToolMessage]] + List of Ragas messages ready for evaluation. Messages preserve + conversation order and tool call associations. + + Raises + ------ + ImportError + If the ag-ui-protocol package is not installed. + + Examples + -------- + Convert AG-UI events from an agent run:: + + >>> from ragas.integrations.ag_ui import convert_to_ragas_messages + >>> from ag_ui.core import ( + ... RunStartedEvent, TextMessageStartEvent, + ... TextMessageContentEvent, TextMessageEndEvent + ... ) + >>> + >>> events = [ + ... RunStartedEvent(run_id="run-1", thread_id="thread-1"), + ... TextMessageStartEvent(message_id="msg-1", role="assistant"), + ... TextMessageContentEvent(message_id="msg-1", delta="Hello"), + ... TextMessageContentEvent(message_id="msg-1", delta=" world"), + ... TextMessageEndEvent(message_id="msg-1"), + ... ] + >>> messages = convert_to_ragas_messages(events, metadata=True) + >>> messages[0].content + 'Hello world' + + Process events with tool calls:: + + >>> events = [ + ... TextMessageStartEvent(message_id="msg-1", role="assistant"), + ... TextMessageContentEvent(message_id="msg-1", delta="Let me check"), + ... TextMessageEndEvent(message_id="msg-1"), + ... ToolCallStartEvent( + ... tool_call_id="tc-1", + ... tool_call_name="get_weather", + ... parent_message_id="msg-1" + ... ), + ... ToolCallArgsEvent(tool_call_id="tc-1", delta='{"city": "SF"}'), + ... ToolCallEndEvent(tool_call_id="tc-1"), + ... ToolCallResultEvent( + ... tool_call_id="tc-1", + ... message_id="result-1", + ... content="Sunny, 72°F" + ... ), + ... ] + >>> messages = convert_to_ragas_messages(events) + >>> len(messages) + 2 # AI message + Tool result message + + Notes + ----- + - Streaming events (Start->Content->End) are automatically reconstructed + - Tool calls are associated with the preceding AI message + - Non-message events (lifecycle, state) are silently filtered + - Incomplete event sequences are logged as warnings + - AG-UI metadata can be preserved in message.metadata when metadata=True + + See Also + -------- + convert_messages_snapshot : Convert complete message history from snapshot + AGUIEventCollector : Lower-level API for streaming event collection + """ + collector = AGUIEventCollector(metadata=metadata) + + for event in events: + collector.process_event(event) + + return collector.get_messages() + + +def convert_messages_snapshot( + snapshot_event: Any, + metadata: bool = False, +) -> List[Union[HumanMessage, AIMessage, ToolMessage]]: + """ + Convert an AG-UI MessagesSnapshotEvent to Ragas message format. + + MessagesSnapshotEvent provides a complete conversation history in a + single event, bypassing the need to reconstruct from streaming events. + This is more efficient when the complete history is already available. + + Parameters + ---------- + snapshot_event : MessagesSnapshotEvent + AG-UI event containing complete message history array. + metadata : bool, optional + Whether to include metadata in converted messages (default: False). + + Returns + ------- + List[Union[HumanMessage, AIMessage, ToolMessage]] + List of Ragas messages from the snapshot. + + Raises + ------ + ImportError + If the ag-ui-protocol package is not installed. + + Examples + -------- + >>> from ragas.integrations.ag_ui import convert_messages_snapshot + >>> from ag_ui.core import MessagesSnapshotEvent + >>> + >>> snapshot = MessagesSnapshotEvent(messages=[ + ... {"role": "user", "content": "What's the weather?"}, + ... {"role": "assistant", "content": "Let me check for you."}, + ... ]) + >>> messages = convert_messages_snapshot(snapshot) + >>> len(messages) + 2 + + Notes + ----- + This is the preferred method when working with complete conversation + history. It's faster than processing streaming events and avoids the + complexity of event sequence reconstruction. + + See Also + -------- + convert_to_ragas_messages : Convert streaming event sequences + """ + collector = AGUIEventCollector(metadata=metadata) + + # Type check using cached import from collector + if not isinstance(snapshot_event, collector._MessagesSnapshotEvent): + raise TypeError( + f"Expected MessagesSnapshotEvent, got {type(snapshot_event).__name__}" + ) + collector._handle_messages_snapshot(snapshot_event) + return collector.get_messages() + + +def _convert_ragas_messages_to_ag_ui( + messages: List[Union[HumanMessage, AIMessage, ToolMessage]], +) -> List[Any]: + """ + Convert Ragas messages to AG-UI message format. + + This function transforms a list of Ragas message objects into AG-UI protocol + message format for sending to AG-UI endpoints. It handles conversion of: + - HumanMessage → UserMessage + - AIMessage → AssistantMessage (with tool_calls if present) + - ToolMessage → ToolMessage (AG-UI format) + + Parameters + ---------- + messages : List[Union[HumanMessage, AIMessage, ToolMessage]] + List of Ragas messages from MultiTurnSample.user_input + + Returns + ------- + List[Any] + List of AG-UI protocol messages (UserMessage, AssistantMessage, ToolMessage) + + Examples + -------- + >>> from ragas.messages import HumanMessage, AIMessage, ToolCall + >>> messages = [ + ... HumanMessage(content="What's the weather?"), + ... AIMessage(content="Let me check", tool_calls=[ + ... ToolCall(name="get-weather", args={"location": "SF"}) + ... ]) + ... ] + >>> ag_ui_messages = _convert_ragas_messages_to_ag_ui(messages) + """ + try: + from ag_ui.core import ( + AssistantMessage, + FunctionCall, + ToolCall as AGUIToolCall, + UserMessage, + ) + except ImportError as e: + raise ImportError( + "ag-ui-protocol package is required for AG-UI integration. " + "Install it with: pip install ag-ui-protocol" + ) from e + + ag_ui_messages = [] + + for idx, msg in enumerate(messages): + msg_id = str(idx + 1) + + if isinstance(msg, HumanMessage): + ag_ui_messages.append(UserMessage(id=msg_id, content=msg.content)) + + elif isinstance(msg, AIMessage): + # Convert Ragas ToolCall to AG-UI ToolCall format + tool_calls = None + if msg.tool_calls: + tool_calls = [ + AGUIToolCall( + id=f"tc-{idx}-{tc_idx}", + function=FunctionCall( + name=tc.name, + arguments=json.dumps(tc.args) + if isinstance(tc.args, dict) + else tc.args, + ), + ) + for tc_idx, tc in enumerate(msg.tool_calls) + ] + + ag_ui_messages.append( + AssistantMessage( + id=msg_id, content=msg.content or "", tool_calls=tool_calls + ) + ) + + elif isinstance(msg, ToolMessage): + # Note: AG-UI ToolMessage requires toolCallId which Ragas ToolMessage doesn't have. + # ToolMessage is typically sent FROM agent, not TO agent in initial conversation. + # For now, we skip ToolMessage in the conversion. + logger.warning( + "Skipping ToolMessage in AG-UI conversion - ToolMessage is typically " + "sent from agent, not to agent" + ) + continue + + return ag_ui_messages + + +async def _call_ag_ui_endpoint( + endpoint_url: str, + user_input: Union[str, List[Union[HumanMessage, AIMessage, ToolMessage]]], + thread_id: Optional[str] = None, + agent_config: Optional[Dict[str, Any]] = None, + timeout: float = 60.0, + extra_headers: Optional[Dict[str, str]] = None, +) -> List[Any]: + """ + Call an AG-UI FastAPI endpoint and collect streaming events. + + Makes an HTTP POST request to an AG-UI compatible FastAPI endpoint + and parses the Server-Sent Events (SSE) stream to collect all events. + + Parameters + ---------- + endpoint_url : str + The URL of the AG-UI FastAPI endpoint (e.g., "http://localhost:8000/agent"). + user_input : Union[str, List[Union[HumanMessage, AIMessage, ToolMessage]]] + The user message/query to send to the agent. Can be either: + - A string for single-turn queries + - A list of Ragas messages for multi-turn conversations + thread_id : str, optional + Optional thread ID for conversation continuity. + agent_config : dict, optional + Optional agent configuration parameters. + timeout : float, optional + Request timeout in seconds (default: 60.0). + extra_headers : dict, optional + Optional extra HTTP headers to include in the request (default: None). + These will be merged with the default "Accept: text/event-stream" header. + + Returns + ------- + List[Event] + List of AG-UI events collected from the SSE stream. + + Raises + ------ + ImportError + If httpx is not installed. + httpx.HTTPError + If the HTTP request fails. + + Notes + ----- + This function expects the endpoint to return Server-Sent Events (SSE) + with content type "text/event-stream". Each event should be in the format: + + data: {"type": "...", ...}\\n\\n + + The function will parse the SSE stream and deserialize each event + using AG-UI's RunAgentInput model. + """ + try: + import httpx + except ImportError as e: + raise ImportError( + "AG-UI FastAPI integration requires httpx. " + "Install it with: pip install httpx" + ) from e + + # Import AG-UI types + try: + from ag_ui.core import Event, RunAgentInput, UserMessage + from pydantic import TypeAdapter + except ImportError as e: + raise ImportError( + "AG-UI integration requires the ag-ui-protocol package. " + "Install it with: pip install ag-ui-protocol" + ) from e + + # Create TypeAdapter for Event discriminated union + # This properly handles the union of all event types based on the 'type' discriminator + event_adapter = TypeAdapter(Event) + + # Convert user_input to AG-UI messages + ag_ui_messages: List[Any] + if isinstance(user_input, str): + # Single-turn: simple string input + ag_ui_messages = t.cast(List[Any], [UserMessage(id="1", content=user_input)]) + else: + # Multi-turn: list of Ragas messages + ag_ui_messages = _convert_ragas_messages_to_ag_ui(user_input) + + # Prepare request payload + payload = RunAgentInput( + thread_id=thread_id + or f"thread_{uuid.uuid4()}", # Generate thread ID if not provided + run_id=f"run_{uuid.uuid4()}", # Generate a unique run ID + messages=t.cast(Any, ag_ui_messages), + state={}, + tools=[], + context=[], + forwarded_props={}, + ) + + # Collect events from SSE stream + events: List[Any] = [] + + # Merge default headers with extra headers + headers = {"Accept": "text/event-stream"} + if extra_headers: + headers.update(extra_headers) + + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: + async with client.stream( + "POST", + endpoint_url, + json=payload.model_dump(exclude_none=True), + headers=headers, + ) as response: + response.raise_for_status() + + # Parse SSE stream line by line + async for line in response.aiter_lines(): + line = line.strip() + + # SSE format: "data: {...}" + if line.startswith("data: "): + json_data = line[6:] # Remove "data: " prefix + + try: + # Parse JSON and convert to Event using TypeAdapter + # TypeAdapter properly handles discriminated unions based on 'type' field + event_dict = json.loads(json_data) + event = event_adapter.validate_python(event_dict) + events.append(event) + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"Failed to parse SSE event: {e}") + continue + + return events + + +async def evaluate_ag_ui_agent( + endpoint_url: str, + dataset: EvaluationDataset, + metrics: List["Metric"], + metadata: bool = False, + run_config: Optional[RunConfig] = None, + batch_size: Optional[int] = None, + raise_exceptions: bool = False, + show_progress: bool = True, + timeout: float = 60.0, + evaluator_llm: Optional[Any] = None, + extra_headers: Optional[Dict[str, str]] = None, +) -> EvaluationResult: + """ + Evaluate an AG-UI agent by calling its FastAPI endpoint with test queries. + + This function runs a batch evaluation by: + 1. Calling the AG-UI FastAPI endpoint for each query in the dataset + 2. Collecting streaming AG-UI events from each response + 3. Converting events to Ragas message format + 4. Evaluating with specified metrics + + Supports both single-turn and multi-turn evaluations: + - Single-turn: Response extracted to sample.response field + - Multi-turn: Agent responses appended to sample.user_input conversation + + Parameters + ---------- + endpoint_url : str + URL of the AG-UI FastAPI endpoint (e.g., "http://localhost:8000/agent"). + dataset : EvaluationDataset + Dataset containing test queries. Can contain either: + - SingleTurnSample: user_input as string + - MultiTurnSample: user_input as list of messages + metrics : List[Metric] + List of Ragas metrics to evaluate (e.g., AspectCritic, ToolCallF1). + metadata : bool, optional + Whether to include AG-UI metadata in converted messages (default: False). + run_config : RunConfig, optional + Configuration for the evaluation run. + batch_size : int, optional + Number of queries to process in parallel (default: None = auto). + raise_exceptions : bool, optional + Whether to raise exceptions or log warnings (default: False). + show_progress : bool, optional + Whether to show progress bar (default: True). + timeout : float, optional + HTTP request timeout in seconds (default: 60.0). + evaluator_llm : Any, optional + Optional LLM to use for evaluation metrics (default: None). + extra_headers : dict, optional + Optional extra HTTP headers to include in requests to the agent endpoint (default: None). + These will be merged with the default "Accept: text/event-stream" header. + + Returns + ------- + EvaluationResult + Results containing metric scores for the dataset. + + Raises + ------ + ImportError + If required packages (httpx, ag-ui-protocol) are not installed. + ValueError + If dataset is not of type EvaluationDataset. + + Examples + -------- + Evaluate an AG-UI agent endpoint with standard metrics:: + + >>> from ragas.integrations.ag_ui import evaluate_ag_ui_agent + >>> from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + >>> from ragas.metrics import AspectCritic, Faithfulness + >>> + >>> dataset = EvaluationDataset(samples=[ + ... SingleTurnSample( + ... user_input="What's the weather in San Francisco?", + ... reference="Use the weather API to check SF weather" + ... ) + ... ]) + >>> + >>> result = await evaluate_ag_ui_agent( + ... endpoint_url="http://localhost:8000/agent", + ... dataset=dataset, + ... metrics=[AspectCritic(), Faithfulness()] + ... ) + + With AG-UI metadata included:: + + >>> result = await evaluate_ag_ui_agent( + ... endpoint_url="http://localhost:8000/agent", + ... dataset=dataset, + ... metrics=[AspectCritic()], + ... metadata=True # Include run_id, thread_id, etc. + ... ) + + Multi-turn evaluation with tool call metrics:: + + >>> from ragas.dataset_schema import MultiTurnSample + >>> from ragas.messages import HumanMessage, ToolCall + >>> from ragas.metrics import ToolCallF1 + >>> + >>> multi_dataset = EvaluationDataset(samples=[ + ... MultiTurnSample( + ... user_input=[ + ... HumanMessage(content="What's the weather in SF?") + ... ], + ... reference_tool_calls=[ + ... ToolCall(name="get-weather", args={"location": "SF"}) + ... ] + ... ) + ... ]) + >>> + >>> result = await evaluate_ag_ui_agent( + ... endpoint_url="http://localhost:8000/agent", + ... dataset=multi_dataset, + ... metrics=[ToolCallF1()] + ... ) + + Notes + ----- + - The endpoint must return Server-Sent Events (SSE) with AG-UI protocol events + - Each query is sent as a separate HTTP request with RunAgentInput payload + - Queries are executed in parallel using Ragas Executor + - Failed queries are logged and recorded as NaN in results + - **Single-turn**: Response text extracted to sample.response field + - **Multi-turn**: Agent responses (AIMessage, ToolMessage) appended to sample.user_input + + See Also + -------- + convert_to_ragas_messages : Convert AG-UI events to Ragas messages + _call_ag_ui_endpoint : HTTP client helper for calling endpoints + """ + # Validate dataset + if dataset is None or not isinstance(dataset, EvaluationDataset): + raise ValueError("Please provide a dataset that is of type EvaluationDataset") + + # Support both single-turn and multi-turn evaluations + is_multi_turn = dataset.is_multi_turn() + if is_multi_turn: + samples = t.cast(List[MultiTurnSample], dataset.samples) + else: + samples = t.cast(List[SingleTurnSample], dataset.samples) + + # Create executor for parallel HTTP calls + executor = Executor( + desc="Calling AG-UI Agent", + keep_progress_bar=True, + show_progress=show_progress, + raise_exceptions=raise_exceptions, + run_config=run_config, + batch_size=batch_size, + ) + + # Submit HTTP calls for all queries + queries = [sample.user_input for sample in samples] + for i, query in enumerate(queries): + executor.submit( + _call_ag_ui_endpoint, + endpoint_url=endpoint_url, + user_input=query, + thread_id=f"thread-eval-{i}", + agent_config=None, + timeout=timeout, + extra_headers=extra_headers, + ) + + # Collect results and convert to messages + results = executor.results() + + if is_multi_turn: + # Multi-turn: append agent responses to conversation + for i, result in enumerate(results): + # Handle failed jobs which are recorded as NaN in the executor + if isinstance(result, float) and math.isnan(result): + logger.warning(f"AG-UI agent call failed for query {i}: '{queries[i]}'") + continue + + # Convert AG-UI events to Ragas messages + events = t.cast(List[Any], result) + try: + logger.info(f"Processing query {i}, received {len(events)} events") + messages = convert_to_ragas_messages(events, metadata=metadata) + logger.info(f"Converted to {len(messages)} messages") + + # Append agent's response messages to the conversation + # Filter out only new messages from agent (AIMessage and ToolMessage) + new_messages = [ + msg for msg in messages if isinstance(msg, (AIMessage, ToolMessage)) + ] + + # Update the sample's user_input with complete conversation + sample = t.cast(MultiTurnSample, samples[i]) + sample.user_input = sample.user_input + new_messages + + logger.info( + f"Query {i} - Appended {len(new_messages)} messages to conversation" + ) + + except Exception as e: + logger.warning( + f"Failed to convert events for query {i}: {e}", exc_info=True + ) + else: + # Single-turn: extract response and contexts + responses: List[Optional[str]] = [] + retrieved_contexts: List[Optional[List[str]]] = [] + + for i, result in enumerate(results): + # Handle failed jobs which are recorded as NaN in the executor + if isinstance(result, float) and math.isnan(result): + responses.append(None) + retrieved_contexts.append(None) + logger.warning(f"AG-UI agent call failed for query {i}: '{queries[i]}'") + continue + + # Convert AG-UI events to Ragas messages + events = t.cast(List[Any], result) + try: + logger.info(f"Processing query {i}, received {len(events)} events") + messages = convert_to_ragas_messages(events, metadata=metadata) + logger.info(f"Converted to {len(messages)} messages") + + # Extract response text from AI messages + response_text = "" + context_list: List[str] = [] + + for msg in messages: + if isinstance(msg, AIMessage) and msg.content: + response_text += msg.content + logger.debug( + f"Found AI message with content: {msg.content[:100]}..." + ) + # Tool results could contain retrieved context + elif isinstance(msg, ToolMessage) and msg.content: + context_list.append(msg.content) + logger.debug( + f"Found tool message with content: {msg.content[:100]}..." + ) + + logger.info( + f"Query {i} - Response length: {len(response_text)}, Contexts: {len(context_list)}" + ) + responses.append(response_text or None) + retrieved_contexts.append(context_list if context_list else None) + + except Exception as e: + logger.warning( + f"Failed to convert events for query {i}: {e}", exc_info=True + ) + responses.append(None) + retrieved_contexts.append(None) + + # Update samples in place with responses and retrieved_contexts + # This ensures the dataset includes all fields needed for evaluation + for i, sample in enumerate(samples): + single_sample = t.cast(SingleTurnSample, sample) + single_sample.response = responses[i] if responses[i] is not None else "" + single_sample.retrieved_contexts = ( + retrieved_contexts[i] if retrieved_contexts[i] is not None else [] + ) + + # Run evaluation with metrics + evaluation_result = ragas_evaluate( + dataset=dataset, + metrics=metrics, + raise_exceptions=raise_exceptions, + show_progress=show_progress, + run_config=run_config or RunConfig(), + return_executor=False, + llm=evaluator_llm, + ) + + # Type assertion since return_executor=False guarantees EvaluationResult + return t.cast(EvaluationResult, evaluation_result) diff --git a/tests/unit/integrations/test_ag_ui.py b/tests/unit/integrations/test_ag_ui.py new file mode 100644 index 0000000000..b2704d24fd --- /dev/null +++ b/tests/unit/integrations/test_ag_ui.py @@ -0,0 +1,1231 @@ +"""Tests for AG-UI integration.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from ragas.messages import AIMessage, HumanMessage, ToolMessage + +# Check if ag_ui is available +try: + from ag_ui.core import ( + AssistantMessage, + EventType, + MessagesSnapshotEvent, + RunFinishedEvent, + RunStartedEvent, + StepFinishedEvent, + StepStartedEvent, + TextMessageChunkEvent, + TextMessageContentEvent, + TextMessageEndEvent, + TextMessageStartEvent, + ToolCallArgsEvent, + ToolCallChunkEvent, + ToolCallEndEvent, + ToolCallResultEvent, + ToolCallStartEvent, + UserMessage, + ) + + AG_UI_AVAILABLE = True +except ImportError: + AG_UI_AVAILABLE = False + +pytestmark = pytest.mark.skipif( + not AG_UI_AVAILABLE, reason="ag-ui-protocol not installed" +) + + +# Mock event class for non-message events +class MockEvent: + """Simple mock for non-message events like STATE_SNAPSHOT.""" + + def __init__(self, event_type: str, **kwargs): + self.type = event_type + self.timestamp = kwargs.get("timestamp", 1234567890) + for key, value in kwargs.items(): + setattr(self, key, value) + + +@pytest.fixture +def basic_text_message_events(): + """Create a basic streaming text message event sequence.""" + return [ + RunStartedEvent(run_id="run-123", thread_id="thread-456"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Hello"), + TextMessageContentEvent(message_id="msg-1", delta=" world"), + TextMessageEndEvent(message_id="msg-1"), + TextMessageStartEvent(message_id="msg-2", role="assistant"), + TextMessageContentEvent(message_id="msg-2", delta="Hi"), + TextMessageContentEvent(message_id="msg-2", delta=" there!"), + TextMessageEndEvent(message_id="msg-2"), + ] + + +@pytest.fixture +def tool_call_events(): + """Create events with tool calls.""" + return [ + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"), + TextMessageEndEvent(message_id="msg-1"), + ToolCallStartEvent( + tool_call_id="tc-1", tool_call_name="get_weather", parent_message_id="msg-1" + ), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"city": "San Francisco"'), + ToolCallArgsEvent(tool_call_id="tc-1", delta=', "units": "fahrenheit"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + ToolCallResultEvent( + tool_call_id="tc-1", + message_id="result-1", + content="Temperature: 72°F, Conditions: Sunny", + ), + TextMessageStartEvent(message_id="msg-2", role="assistant"), + TextMessageContentEvent( + message_id="msg-2", delta="It's sunny and 72°F in San Francisco" + ), + TextMessageEndEvent(message_id="msg-2"), + ] + + +def test_import_error_without_ag_ui_protocol(): + """Test that appropriate error is raised without ag-ui-protocol package.""" + from ragas.integrations.ag_ui import _import_ag_ui_core + + # Mock the actual ag_ui import + with patch.dict("sys.modules", {"ag_ui": None, "ag_ui.core": None}): + with pytest.raises( + ImportError, match="AG-UI integration requires the ag-ui-protocol package" + ): + _import_ag_ui_core() + + +def test_basic_text_message_conversion(basic_text_message_events): + """Test converting basic streaming text messages.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages(basic_text_message_events) + + assert len(messages) == 2 + assert isinstance(messages[0], AIMessage) + assert messages[0].content == "Hello world" + assert isinstance(messages[1], AIMessage) + assert messages[1].content == "Hi there!" + + +def test_message_with_metadata(basic_text_message_events): + """Test that metadata is included when requested.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages(basic_text_message_events, metadata=True) + + assert len(messages) == 2 + assert messages[0].metadata is not None + assert "message_id" in messages[0].metadata + assert messages[0].metadata["message_id"] == "msg-1" + assert "run_id" in messages[0].metadata + assert messages[0].metadata["run_id"] == "run-123" + assert "thread_id" in messages[0].metadata + assert messages[0].metadata["thread_id"] == "thread-456" + + +def test_message_without_metadata(basic_text_message_events): + """Test that metadata is excluded when not requested.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages(basic_text_message_events, metadata=False) + + assert len(messages) == 2 + assert messages[0].metadata is None + assert messages[1].metadata is None + + +def test_tool_call_conversion(tool_call_events): + """Test converting tool calls with arguments and results.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages(tool_call_events) + + # Should have: AI message, Tool result, AI message + assert len(messages) == 3 + + # First message: AI initiating tool call + assert isinstance(messages[0], AIMessage) + assert messages[0].content == "Let me check the weather" + + # Second message: Tool result + assert isinstance(messages[1], ToolMessage) + assert "72°F" in messages[1].content + + # Third message: AI with response + assert isinstance(messages[2], AIMessage) + assert "sunny" in messages[2].content.lower() + + +def test_tool_call_with_metadata(tool_call_events): + """Test that tool call metadata is preserved.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages(tool_call_events, metadata=True) + + tool_message = next(msg for msg in messages if isinstance(msg, ToolMessage)) + assert tool_message.metadata is not None + assert "tool_call_id" in tool_message.metadata + assert tool_message.metadata["tool_call_id"] == "tc-1" + + +def test_step_context_in_metadata(): + """Test that step context is included in metadata.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + StepStartedEvent(step_name="analyze_query"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Processing..."), + TextMessageEndEvent(message_id="msg-1"), + StepFinishedEvent(step_name="analyze_query"), + ] + + messages = convert_to_ragas_messages(events, metadata=True) + + assert len(messages) == 1 + assert "step_name" in messages[0].metadata + assert messages[0].metadata["step_name"] == "analyze_query" + + +def test_messages_snapshot_conversion(): + """Test converting MessagesSnapshotEvent.""" + from ragas.integrations.ag_ui import convert_messages_snapshot + + snapshot = MessagesSnapshotEvent( + messages=[ + UserMessage(id="msg-1", content="What's 2+2?"), + AssistantMessage(id="msg-2", content="4"), + UserMessage(id="msg-3", content="Thanks!"), + ] + ) + + messages = convert_messages_snapshot(snapshot) + + assert len(messages) == 3 + assert isinstance(messages[0], HumanMessage) + assert messages[0].content == "What's 2+2?" + assert isinstance(messages[1], AIMessage) + assert messages[1].content == "4" + assert isinstance(messages[2], HumanMessage) + assert messages[2].content == "Thanks!" + + +def test_snapshot_with_metadata(): + """Test that snapshot conversion includes metadata when requested.""" + from ragas.integrations.ag_ui import convert_messages_snapshot + + snapshot = MessagesSnapshotEvent( + messages=[UserMessage(id="msg-1", content="Hello")] + ) + + messages = convert_messages_snapshot(snapshot, metadata=True) + + assert messages[0].metadata is not None + assert "message_id" in messages[0].metadata + assert messages[0].metadata["message_id"] == "msg-1" + + +def test_non_message_events_filtered(): + """Test that non-message events are silently filtered.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + MockEvent(EventType.STATE_SNAPSHOT, snapshot={"key": "value"}), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Hello"), + TextMessageEndEvent(message_id="msg-1"), + MockEvent("RUN_FINISHED", result="success"), + ] + + messages = convert_to_ragas_messages(events) + + # Should only have the text message, other events filtered + assert len(messages) == 1 + assert messages[0].content == "Hello" + + +def test_incomplete_message_stream(caplog): + """Test handling of incomplete message streams.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + # Message with content but no end event + events = [ + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Hello"), + # Missing TextMessageEndEvent + ] + + messages = convert_to_ragas_messages(events) + + # Should not create message without end event + assert len(messages) == 0 + + +def test_orphaned_content_event(caplog): + """Test handling of content event without corresponding start.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + # Content event without start + TextMessageContentEvent(message_id="msg-unknown", delta="Orphaned content"), + ] + + messages = convert_to_ragas_messages(events) + + assert len(messages) == 0 + + +def test_tool_call_argument_parsing_error(caplog): + """Test handling of invalid JSON in tool arguments.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Using tool"), + ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="broken_tool"), + ToolCallArgsEvent(tool_call_id="tc-1", delta="{invalid json"), + ToolCallEndEvent(tool_call_id="tc-1"), + TextMessageEndEvent(message_id="msg-1"), # Message ends AFTER tool call + ] + + messages = convert_to_ragas_messages(events) + + # Should still create message with tool call containing raw_args + assert len(messages) == 1 + assert isinstance(messages[0], AIMessage) + assert messages[0].tool_calls is not None + assert len(messages[0].tool_calls) == 1 + assert messages[0].tool_calls[0].name == "broken_tool" + # Invalid JSON should be stored in raw_args + assert "raw_args" in messages[0].tool_calls[0].args + assert messages[0].tool_calls[0].args["raw_args"] == "{invalid json" + + +def test_tool_call_result_retroactive_attachment(): + """ + Tests that ToolCallResultEvent correctly finds the previous AIMessage + and attaches the tool call specification if it was missing. + + This can happen when ToolCallEndEvent arrives before TextMessageEndEvent, + causing tool_calls to be cleared from _completed_tool_calls before the + AIMessage is created. + """ + from ragas.integrations.ag_ui import convert_to_ragas_messages + + # Scenario: TextMessageEnd arrives AFTER ToolCallEnd, so the tool call + # is already cleared from _completed_tool_calls when the AIMessage is created + events = [ + # AI message starts + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Let me check that"), + # Tool call happens + ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="search_tool"), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"query": "weather"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + # Message ends AFTER tool call ends + TextMessageEndEvent(message_id="msg-1"), + # Tool result arrives + ToolCallResultEvent( + tool_call_id="tc-1", message_id="result-1", content="Sunny, 75F" + ), + ] + + messages = convert_to_ragas_messages(events) + + # Should have AI message with tool call, then Tool message + assert len(messages) == 2 + assert isinstance(messages[0], AIMessage) + assert isinstance(messages[1], ToolMessage) + + # The AIMessage should have the tool_calls attached (either from normal flow + # or retroactively attached by _handle_tool_call_result) + assert messages[0].tool_calls is not None + assert len(messages[0].tool_calls) >= 1 + # At least one tool call should be present (could be synthetic if needed) + assert any( + tc.name in ["search_tool", "unknown_tool"] for tc in messages[0].tool_calls + ) + + # Tool message should contain the result + assert messages[1].content == "Sunny, 75F" + + +def test_event_collector_reuse(basic_text_message_events): + """Test that AGUIEventCollector can be cleared and reused.""" + from ragas.integrations.ag_ui import AGUIEventCollector + + collector = AGUIEventCollector() + + # Process first batch + for event in basic_text_message_events[:5]: # First message + collector.process_event(event) + + messages1 = collector.get_messages() + assert len(messages1) == 1 + + # Clear and process second batch + collector.clear() + for event in basic_text_message_events[5:]: # Second message + collector.process_event(event) + + messages2 = collector.get_messages() + assert len(messages2) == 1 + assert messages2[0].content != messages1[0].content + + +def test_multiple_tool_calls_in_sequence(): + """Test handling multiple tool calls in sequence.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="tool1"), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"param": "value1"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + ToolCallStartEvent(tool_call_id="tc-2", tool_call_name="tool2"), + ToolCallArgsEvent(tool_call_id="tc-2", delta='{"param": "value2"}'), + ToolCallEndEvent(tool_call_id="tc-2"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Done"), + TextMessageEndEvent(message_id="msg-1"), + ] + + messages = convert_to_ragas_messages(events) + + # Should create AI message with both tool calls + assert len(messages) == 1 + assert isinstance(messages[0], AIMessage) + assert messages[0].tool_calls is not None + assert len(messages[0].tool_calls) == 2 + assert messages[0].tool_calls[0].name == "tool1" + assert messages[0].tool_calls[1].name == "tool2" + + +def test_empty_event_list(): + """Test handling of empty event list.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + messages = convert_to_ragas_messages([]) + assert len(messages) == 0 + + +def test_wrong_snapshot_type_error(): + """Test that convert_messages_snapshot validates input type.""" + from ragas.integrations.ag_ui import convert_messages_snapshot + + with pytest.raises(TypeError, match="Expected MessagesSnapshotEvent"): + convert_messages_snapshot(MockEvent("WRONG_TYPE")) + + +def test_role_mapping(): + """Test that different roles map correctly to Ragas message types.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + TextMessageStartEvent(message_id="msg-1", role="user"), + TextMessageContentEvent(message_id="msg-1", delta="User message"), + TextMessageEndEvent(message_id="msg-1"), + TextMessageStartEvent(message_id="msg-2", role="assistant"), + TextMessageContentEvent(message_id="msg-2", delta="Assistant message"), + TextMessageEndEvent(message_id="msg-2"), + ] + + messages = convert_to_ragas_messages(events) + + assert len(messages) == 2 + assert isinstance(messages[0], HumanMessage) + assert messages[0].content == "User message" + assert isinstance(messages[1], AIMessage) + assert messages[1].content == "Assistant message" + + +def test_complex_conversation_flow(): + """Test a complex multi-turn conversation with tool calls.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + # User asks + TextMessageStartEvent(message_id="msg-1", role="user"), + TextMessageContentEvent(message_id="msg-1", delta="What's the weather?"), + TextMessageEndEvent(message_id="msg-1"), + # Assistant responds and calls tool + TextMessageStartEvent(message_id="msg-2", role="assistant"), + TextMessageContentEvent(message_id="msg-2", delta="Let me check"), + TextMessageEndEvent(message_id="msg-2"), + ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="weather_api"), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + # Tool returns result + ToolCallResultEvent( + tool_call_id="tc-1", message_id="result-1", content="Sunny, 70F" + ), + # Assistant responds with answer + TextMessageStartEvent(message_id="msg-3", role="assistant"), + TextMessageContentEvent(message_id="msg-3", delta="It's sunny and 70F"), + TextMessageEndEvent(message_id="msg-3"), + # User thanks + TextMessageStartEvent(message_id="msg-4", role="user"), + TextMessageContentEvent(message_id="msg-4", delta="Thanks!"), + TextMessageEndEvent(message_id="msg-4"), + ] + + messages = convert_to_ragas_messages(events, metadata=True) + + # Should have: Human, AI (with tool_calls), Tool, AI, Human + assert len(messages) == 5 + assert isinstance(messages[0], HumanMessage) + assert isinstance(messages[1], AIMessage) + assert isinstance(messages[2], ToolMessage) + assert isinstance(messages[3], AIMessage) + assert isinstance(messages[4], HumanMessage) + + # Check content + assert "weather" in messages[0].content.lower() + assert "check" in messages[1].content.lower() + assert "sunny" in messages[2].content.lower() + assert "sunny" in messages[3].content.lower() + assert "thanks" in messages[4].content.lower() + + # Check metadata + assert all(msg.metadata is not None for msg in messages) + assert all("run_id" in msg.metadata for msg in messages) + + +def test_text_message_chunk(): + """Test TEXT_MESSAGE_CHUNK event handling.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + TextMessageChunkEvent( + message_id="msg-1", role="assistant", delta="Complete message" + ), + ] + + messages = convert_to_ragas_messages(events) + + assert len(messages) == 1 + assert isinstance(messages[0], AIMessage) + assert messages[0].content == "Complete message" + + +def test_tool_call_chunk(): + """Test TOOL_CALL_CHUNK event handling.""" + from ragas.integrations.ag_ui import convert_to_ragas_messages + + events = [ + ToolCallChunkEvent( + tool_call_id="tc-1", tool_call_name="search", delta='{"query": "test"}' + ), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Done"), + TextMessageEndEvent(message_id="msg-1"), + ] + + messages = convert_to_ragas_messages(events) + + assert len(messages) == 1 + assert isinstance(messages[0], AIMessage) + assert messages[0].tool_calls is not None + assert len(messages[0].tool_calls) == 1 + assert messages[0].tool_calls[0].name == "search" + assert messages[0].tool_calls[0].args == {"query": "test"} + + +def test_tool_call_chunk_with_dict_delta(): + """ + Test that _handle_tool_call_chunk can handle delta as dict. + + While the AG-UI protocol specifies delta as a string, the handler code + defensively handles dict deltas. We test this by directly calling the + handler with a mock event object. + """ + from ragas.integrations.ag_ui import AGUIEventCollector + + collector = AGUIEventCollector() + + # Create a mock event with dict delta (bypassing Pydantic validation) + class MockToolCallChunkEvent: + type = "TOOL_CALL_CHUNK" + tool_call_id = "tc-1" + tool_call_name = "calculate" + delta = {"operation": "add", "values": [1, 2, 3]} # dict instead of string + timestamp = "2025-01-01T00:00:00Z" + + # Process the mock event directly + collector._handle_tool_call_chunk(MockToolCallChunkEvent()) + + # Now add an AI message to pick up the tool call + from ag_ui.core import ( + TextMessageContentEvent, + TextMessageEndEvent, + TextMessageStartEvent, + ) + + collector.process_event(TextMessageStartEvent(message_id="msg-1", role="assistant")) + collector.process_event( + TextMessageContentEvent(message_id="msg-1", delta="Result is 6") + ) + collector.process_event(TextMessageEndEvent(message_id="msg-1")) + + messages = collector.get_messages() + + assert len(messages) == 1 + assert isinstance(messages[0], AIMessage) + assert messages[0].tool_calls is not None + assert len(messages[0].tool_calls) == 1 + assert messages[0].tool_calls[0].name == "calculate" + assert messages[0].tool_calls[0].args == {"operation": "add", "values": [1, 2, 3]} + + +# ===== FastAPI Integration Tests ===== + + +# Helper to check if FastAPI dependencies are available +def _has_fastapi_deps(): + try: + import httpx # noqa: F401 + + return AG_UI_AVAILABLE + except ImportError: + return False + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_call_ag_ui_endpoint(): + """Test HTTP client helper for calling AG-UI endpoints.""" + from unittest.mock import AsyncMock, MagicMock + + from ragas.integrations.ag_ui import _call_ag_ui_endpoint + + # Mock SSE response data + sse_lines = [ + 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567890}', + "", + 'data: {"type": "TEXT_MESSAGE_START", "message_id": "msg-1", "role": "assistant", "timestamp": 1234567891}', + "", + 'data: {"type": "TEXT_MESSAGE_CONTENT", "message_id": "msg-1", "delta": "Hello!", "timestamp": 1234567892}', + "", + 'data: {"type": "TEXT_MESSAGE_END", "message_id": "msg-1", "timestamp": 1234567893}', + "", + 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567894}', + "", + ] + + # Create async iterator for SSE lines + async def mock_aiter_lines(): + for line in sse_lines: + yield line + + # Mock httpx response + mock_response = MagicMock() + mock_response.aiter_lines = mock_aiter_lines + mock_response.raise_for_status = MagicMock() + + # Mock httpx client + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client.stream = MagicMock() + mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response) + mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("httpx.AsyncClient", return_value=mock_client): + events = await _call_ag_ui_endpoint( + endpoint_url="http://localhost:8000/agent", + user_input="Hello", + ) + + # Should have collected 5 events + assert len(events) == 5 + assert events[0].type == "RUN_STARTED" + assert events[1].type == "TEXT_MESSAGE_START" + assert events[2].type == "TEXT_MESSAGE_CONTENT" + assert events[3].type == "TEXT_MESSAGE_END" + assert events[4].type == "RUN_FINISHED" + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_call_ag_ui_endpoint_with_config(): + """Test HTTP client with thread_id and agent_config.""" + from unittest.mock import AsyncMock, MagicMock + + from ragas.integrations.ag_ui import _call_ag_ui_endpoint + + sse_lines = [ + 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "my-thread", "timestamp": 1234567890}', + "", + 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "my-thread", "timestamp": 1234567891}', + "", + ] + + async def mock_aiter_lines(): + for line in sse_lines: + yield line + + mock_response = MagicMock() + mock_response.aiter_lines = mock_aiter_lines + mock_response.raise_for_status = MagicMock() + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client.stream = MagicMock() + mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response) + mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("httpx.AsyncClient", return_value=mock_client): + events = await _call_ag_ui_endpoint( + endpoint_url="http://localhost:8000/agent", + user_input="Test query", + thread_id="my-thread", + agent_config={"temperature": 0.7}, + ) + + assert len(events) == 2 + # Check that thread_id was passed through + assert events[0].thread_id == "my-thread" + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_call_ag_ui_endpoint_malformed_json(): + """Test HTTP client handles malformed JSON gracefully.""" + from unittest.mock import AsyncMock, MagicMock + + from ragas.integrations.ag_ui import _call_ag_ui_endpoint + + sse_lines = [ + 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567890}', + "", + "data: {invalid json}", # Malformed + "", + 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567891}', + "", + ] + + async def mock_aiter_lines(): + for line in sse_lines: + yield line + + mock_response = MagicMock() + mock_response.aiter_lines = mock_aiter_lines + mock_response.raise_for_status = MagicMock() + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client.stream = MagicMock() + mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response) + mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("httpx.AsyncClient", return_value=mock_client): + events = await _call_ag_ui_endpoint( + endpoint_url="http://localhost:8000/agent", + user_input="Test", + ) + + # Should skip malformed event but collect valid ones + assert len(events) == 2 + assert events[0].type == "RUN_STARTED" + assert events[1].type == "RUN_FINISHED" + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_evaluate_ag_ui_agent(): + """Test batch evaluation of AG-UI agent endpoint.""" + from unittest.mock import MagicMock + + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + + # Create mock dataset + dataset = EvaluationDataset( + samples=[ + SingleTurnSample( + user_input="What's the weather?", + reference="Check weather API", + ), + SingleTurnSample( + user_input="Tell me a joke", + reference="Respond with humor", + ), + ] + ) + + # Mock events for first query (weather) + weather_events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="It's sunny and 72F"), + TextMessageEndEvent(message_id="msg-1"), + RunFinishedEvent(run_id="run-1", thread_id="thread-1"), + ] + + # Mock events for second query (joke) + joke_events = [ + RunStartedEvent(run_id="run-2", thread_id="thread-2"), + TextMessageStartEvent(message_id="msg-2", role="assistant"), + TextMessageContentEvent( + message_id="msg-2", delta="Why don't scientists trust atoms?" + ), + TextMessageContentEvent(message_id="msg-2", delta=" They make up everything!"), + TextMessageEndEvent(message_id="msg-2"), + RunFinishedEvent(run_id="run-2", thread_id="thread-2"), + ] + + # Mock _call_ag_ui_endpoint to return different events based on input + async def mock_call_endpoint(endpoint_url, user_input, **kwargs): + if "weather" in user_input.lower(): + return weather_events + else: + return joke_events + + # Mock ragas_evaluate to return a simple result + mock_result = MagicMock() + mock_result.to_pandas = MagicMock(return_value=MagicMock()) + + with ( + patch( + "ragas.integrations.ag_ui._call_ag_ui_endpoint", + side_effect=mock_call_endpoint, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + result = await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], # Empty for testing + ) + + # Check that dataset was populated + assert dataset.samples[0].response == "It's sunny and 72F" + assert ( + dataset.samples[1].response + == "Why don't scientists trust atoms? They make up everything!" + ) + + # Check that evaluation was called + assert result == mock_result + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_evaluate_ag_ui_agent_with_tool_calls(): + """Test evaluation with tool calls in response.""" + from unittest.mock import MagicMock + + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + + dataset = EvaluationDataset( + samples=[ + SingleTurnSample( + user_input="Search for Python tutorials", + ), + ] + ) + + # Mock events with tool call + search_events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Let me search for that"), + TextMessageEndEvent(message_id="msg-1"), + ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="search"), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"query": "Python tutorials"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + ToolCallResultEvent( + tool_call_id="tc-1", + message_id="result-1", + content="Found: tutorial1.com, tutorial2.com", + ), + RunFinishedEvent(run_id="run-1", thread_id="thread-1"), + ] + + async def mock_call_endpoint(endpoint_url, user_input, **kwargs): + return search_events + + mock_result = MagicMock() + + with ( + patch( + "ragas.integrations.ag_ui._call_ag_ui_endpoint", + side_effect=mock_call_endpoint, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], + ) + + # Check that response was extracted + assert dataset.samples[0].response == "Let me search for that" + # Check that tool results are in retrieved_contexts + assert dataset.samples[0].retrieved_contexts is not None + assert len(dataset.samples[0].retrieved_contexts) == 1 + assert "tutorial1.com" in dataset.samples[0].retrieved_contexts[0] + + +@pytest.mark.skipif( + not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed" +) +@pytest.mark.asyncio +async def test_evaluate_ag_ui_agent_handles_failures(): + """Test evaluation handles HTTP failures gracefully.""" + import math + from unittest.mock import MagicMock + + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + + dataset = EvaluationDataset( + samples=[ + SingleTurnSample(user_input="Query 1"), + SingleTurnSample(user_input="Query 2"), + ] + ) + + # Mock events - first succeeds, second fails (returns NaN from executor) + success_events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Success response"), + TextMessageEndEvent(message_id="msg-1"), + RunFinishedEvent(run_id="run-1", thread_id="thread-1"), + ] + + call_count = [0] + + async def mock_call_endpoint(endpoint_url, user_input, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return success_events + else: + # Simulate failure by raising exception + raise Exception("Connection failed") + + mock_result = MagicMock() + + # Mock Executor to handle the exception + class MockExecutor: + def __init__(self, *args, **kwargs): + pass + + def submit(self, func, *args, **kwargs): + pass + + def results(self): + # First result succeeds, second is NaN (failed) + return [success_events, math.nan] + + with ( + patch( + "ragas.integrations.ag_ui.Executor", + MockExecutor, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], + ) + + # First sample should have response, second should be empty string + assert dataset.samples[0].response == "Success response" + assert dataset.samples[1].response == "" + assert dataset.samples[1].retrieved_contexts == [] + + +# ============================================================================ +# Multi-turn evaluation tests +# ============================================================================ + + +def test_convert_ragas_messages_to_ag_ui(): + """Test converting Ragas messages to AG-UI format.""" + from ragas.integrations.ag_ui import _convert_ragas_messages_to_ag_ui + from ragas.messages import ToolCall + + messages = [ + HumanMessage(content="What's the weather?"), + AIMessage( + content="Let me check", + tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})], + ), + HumanMessage(content="Thanks!"), + ] + + ag_ui_messages = _convert_ragas_messages_to_ag_ui(messages) + + assert len(ag_ui_messages) == 3 + + # Check UserMessage + assert ag_ui_messages[0].id == "1" + assert ag_ui_messages[0].content == "What's the weather?" + + # Check AssistantMessage with tool calls + assert ag_ui_messages[1].id == "2" + assert ag_ui_messages[1].content == "Let me check" + assert ag_ui_messages[1].tool_calls is not None + assert len(ag_ui_messages[1].tool_calls) == 1 + assert ag_ui_messages[1].tool_calls[0].function.name == "get-weather" + assert '"location": "SF"' in ag_ui_messages[1].tool_calls[0].function.arguments + + # Check second UserMessage + assert ag_ui_messages[2].id == "3" + assert ag_ui_messages[2].content == "Thanks!" + + +@pytest.mark.asyncio +async def test_evaluate_multi_turn_basic(): + """Test basic multi-turn evaluation.""" + from unittest.mock import MagicMock, patch + + from ragas.dataset_schema import EvaluationDataset, MultiTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + from ragas.messages import ToolCall + + # Create multi-turn sample + sample = MultiTurnSample( + user_input=[HumanMessage(content="What's the weather in SF?")], + reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})], + ) + + dataset = EvaluationDataset(samples=[sample]) + + # Mock events that agent would return + # Note: Tool calls are completed before message, so they attach to the next AIMessage + agent_events = [ + RunStartedEvent(run_id="run-1", thread_id="thread-1"), + ToolCallStartEvent( + tool_call_id="tc-1", + tool_call_name="get-weather", + parent_message_id="msg-1", + ), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"), + TextMessageEndEvent(message_id="msg-1"), + ToolCallResultEvent( + tool_call_id="tc-1", + message_id="result-1", + content="Temperature: 72°F", + ), + RunFinishedEvent(run_id="run-1", thread_id="thread-1"), + ] + + mock_result = MagicMock() + + # Mock Executor + class MockExecutor: + def __init__(self, *args, **kwargs): + pass + + def submit(self, func, *args, **kwargs): + pass + + def results(self): + return [agent_events] + + with ( + patch( + "ragas.integrations.ag_ui.Executor", + MockExecutor, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], + ) + + # Verify that agent responses were appended to conversation + assert len(sample.user_input) > 1 # Should have original + agent responses + + # Check that we have AIMessage and ToolMessage appended + ai_messages = [msg for msg in sample.user_input if isinstance(msg, AIMessage)] + tool_messages = [msg for msg in sample.user_input if isinstance(msg, ToolMessage)] + + assert len(ai_messages) >= 1 # At least one AI message + assert len(tool_messages) >= 1 # At least one tool message + + # Verify tool calls in AIMessage (tool calls completed before message, so attached to it) + assert ai_messages[0].tool_calls is not None + assert len(ai_messages[0].tool_calls) > 0 + assert ai_messages[0].tool_calls[0].name == "get-weather" + + +@pytest.mark.asyncio +async def test_evaluate_multi_turn_with_existing_conversation(): + """Test multi-turn evaluation with pre-existing conversation.""" + from unittest.mock import MagicMock, patch + + from ragas.dataset_schema import EvaluationDataset, MultiTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + from ragas.messages import ToolCall + + # Create sample with existing conversation + sample = MultiTurnSample( + user_input=[ + HumanMessage(content="Hello"), + AIMessage(content="Hi there!"), + HumanMessage(content="What's the weather in SF?"), + ], + reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})], + ) + + original_length = len(sample.user_input) + dataset = EvaluationDataset(samples=[sample]) + + # Mock agent events + agent_events = [ + TextMessageStartEvent(message_id="msg-1", role="assistant"), + TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"), + TextMessageEndEvent(message_id="msg-1"), + ToolCallStartEvent( + tool_call_id="tc-1", + tool_call_name="get-weather", + parent_message_id="msg-1", + ), + ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'), + ToolCallEndEvent(tool_call_id="tc-1"), + ] + + mock_result = MagicMock() + + class MockExecutor: + def __init__(self, *args, **kwargs): + pass + + def submit(self, func, *args, **kwargs): + pass + + def results(self): + return [agent_events] + + with ( + patch( + "ragas.integrations.ag_ui.Executor", + MockExecutor, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], + ) + + # Verify conversation was extended, not replaced + assert len(sample.user_input) > original_length + + # First 3 messages should be unchanged + assert isinstance(sample.user_input[0], HumanMessage) + assert sample.user_input[0].content == "Hello" + assert isinstance(sample.user_input[1], AIMessage) + assert sample.user_input[1].content == "Hi there!" + assert isinstance(sample.user_input[2], HumanMessage) + assert sample.user_input[2].content == "What's the weather in SF?" + + # New messages should be appended + new_messages = sample.user_input[original_length:] + assert len(new_messages) > 0 + assert any(isinstance(msg, AIMessage) for msg in new_messages) + + +@pytest.mark.asyncio +async def test_evaluate_multi_turn_failed_query(): + """Test multi-turn evaluation handles failed queries correctly.""" + import math + from unittest.mock import MagicMock, patch + + from ragas.dataset_schema import EvaluationDataset, MultiTurnSample + from ragas.integrations.ag_ui import evaluate_ag_ui_agent + + # Create multi-turn sample + sample = MultiTurnSample( + user_input=[HumanMessage(content="Test query")], + reference_tool_calls=[], + ) + + original_length = len(sample.user_input) + dataset = EvaluationDataset(samples=[sample]) + + mock_result = MagicMock() + + class MockExecutor: + def __init__(self, *args, **kwargs): + pass + + def submit(self, func, *args, **kwargs): + pass + + def results(self): + # Return NaN to simulate failure + return [math.nan] + + with ( + patch( + "ragas.integrations.ag_ui.Executor", + MockExecutor, + ), + patch( + "ragas.integrations.ag_ui.ragas_evaluate", + return_value=mock_result, + ), + ): + await evaluate_ag_ui_agent( + endpoint_url="http://localhost:8000/agent", + dataset=dataset, + metrics=[], + ) + + # Conversation should remain unchanged after failure + assert len(sample.user_input) == original_length + assert isinstance(sample.user_input[0], HumanMessage) + assert sample.user_input[0].content == "Test query"