diff --git a/docs/howtos/integrations/_ag_ui.md b/docs/howtos/integrations/_ag_ui.md
new file mode 100644
index 0000000000..cf9e056a5b
--- /dev/null
+++ b/docs/howtos/integrations/_ag_ui.md
@@ -0,0 +1,318 @@
+# AG-UI Integration
+Ragas can evaluate agents that stream events via the [AG-UI protocol](https://docs.ag-ui.com/). This notebook shows how to build evaluation datasets, configure metrics, and score AG-UI endpoints.
+
+
+## Prerequisites
+- Install optional dependencies with `pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio`
+- Start an AG-UI compatible agent locally (Google ADK, PydanticAI, CrewAI, etc.)
+- Create an `.env` file with your evaluator LLM credentials (e.g. `OPENAI_API_KEY`, `GOOGLE_API_KEY`, etc.)
+- If you run this notebook, call `nest_asyncio.apply()` (shown below) so you can `await` coroutines in-place.
+
+
+
+```python
+# !pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio
+
+```
+
+## Imports and environment setup
+Load environment variables and import the classes used throughout the walkthrough.
+
+
+
+```python
+import asyncio
+
+from dotenv import load_dotenv
+import nest_asyncio
+from IPython.display import display
+from langchain_openai import ChatOpenAI
+
+from ragas.dataset_schema import EvaluationDataset, SingleTurnSample, MultiTurnSample
+from ragas.integrations.ag_ui import (
+ evaluate_ag_ui_agent,
+ convert_to_ragas_messages,
+ convert_messages_snapshot,
+)
+from ragas.messages import HumanMessage, ToolCall
+from ragas.metrics import FactualCorrectness, ToolCallF1
+from ragas.llms import LangchainLLMWrapper
+from ag_ui.core import (
+ MessagesSnapshotEvent,
+ TextMessageChunkEvent,
+ UserMessage,
+ AssistantMessage,
+)
+
+load_dotenv()
+# Patch the existing notebook loop so we can await coroutines safely
+nest_asyncio.apply()
+
+```
+
+## Build single-turn evaluation data
+Create `SingleTurnSample` entries when you only need to grade the final answer text.
+
+
+
+```python
+scientist_questions = EvaluationDataset(
+ samples=[
+ SingleTurnSample(
+ user_input="Who originated the theory of relativity?",
+ reference="Albert Einstein originated the theory of relativity.",
+ ),
+ SingleTurnSample(
+ user_input="Who discovered penicillin and when?",
+ reference="Alexander Fleming discovered penicillin in 1928.",
+ ),
+ ]
+)
+
+scientist_questions
+
+```
+
+
+
+
+ EvaluationDataset(features=['user_input', 'reference'], len=2)
+
+
+
+## Build multi-turn conversations
+For tool-usage metrics, extend the dataset with `MultiTurnSample` and expected tool calls.
+
+
+
+```python
+weather_queries = EvaluationDataset(
+ samples=[
+ MultiTurnSample(
+ user_input=[HumanMessage(content="What's the weather in Paris?")],
+ reference_tool_calls=[
+ ToolCall(name="weatherTool", args={"location": "Paris"})
+ ],
+ )
+ ]
+)
+
+weather_queries
+
+```
+
+
+
+
+ EvaluationDataset(features=['user_input', 'reference_tool_calls'], len=1)
+
+
+
+## Configure metrics and the evaluator LLM
+Wrap your grading model with the appropriate adapter and instantiate the metrics you plan to use.
+
+
+
+```python
+evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini"))
+
+qa_metrics = [FactualCorrectness(llm=evaluator_llm)]
+tool_metrics = [ToolCallF1()] # rule-based, no LLM required
+
+```
+
+ /var/folders/8k/tf3xr1rd1fl_dz35dfhfp_tc0000gn/T/ipykernel_93918/2135722072.py:1: DeprecationWarning: LangchainLLMWrapper is deprecated and will be removed in a future version. Use llm_factory instead: from openai import OpenAI; from ragas.llms import llm_factory; llm = llm_factory('gpt-4o-mini', client=OpenAI(api_key='...'))
+ evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini"))
+
+
+## Evaluate a live AG-UI endpoint
+Set the endpoint URL exposed by your agent. Toggle the flags when you are ready to run the evaluations.
+In Jupyter/IPython you can `await` the helpers directly once `nest_asyncio.apply()` has been called.
+
+
+
+```python
+AG_UI_ENDPOINT = "http://localhost:8000/agentic_chat" # Update to match your agent
+
+RUN_FACTUAL_EVAL = False
+RUN_TOOL_EVAL = False
+
+```
+
+
+```python
+async def evaluate_factual():
+ return await evaluate_ag_ui_agent(
+ endpoint_url=AG_UI_ENDPOINT,
+ dataset=scientist_questions,
+ metrics=qa_metrics,
+ evaluator_llm=evaluator_llm,
+ metadata=True,
+ )
+
+if RUN_FACTUAL_EVAL:
+ factual_result = await evaluate_factual()
+ factual_df = factual_result.to_pandas()
+ display(factual_df)
+
+```
+
+
+ Calling AG-UI Agent: 0%| | 0/2 [00:00, ?it/s]
+
+
+
+ Evaluating: 0%| | 0/2 [00:00, ?it/s]
+
+
+
+
+
+
+
+
+ |
+ user_input |
+ retrieved_contexts |
+ response |
+ reference |
+ factual_correctness(mode=f1) |
+
+
+
+
+ | 0 |
+ Who originated the theory of relativity? |
+ [] |
+ The theory of relativity was originated by Alb... |
+ Albert Einstein originated the theory of relat... |
+ 0.33 |
+
+
+ | 1 |
+ Who discovered penicillin and when? |
+ [] |
+ Penicillin was discovered by Alexander Fleming... |
+ Alexander Fleming discovered penicillin in 1928. |
+ 1.00 |
+
+
+
+
+
+
+
+```python
+async def evaluate_tool_usage():
+ return await evaluate_ag_ui_agent(
+ endpoint_url=AG_UI_ENDPOINT,
+ dataset=weather_queries,
+ metrics=tool_metrics,
+ evaluator_llm=evaluator_llm,
+ )
+
+if RUN_TOOL_EVAL:
+ tool_result = await evaluate_tool_usage()
+ tool_df = tool_result.to_pandas()
+ display(tool_df)
+
+```
+
+
+ Calling AG-UI Agent: 0%| | 0/1 [00:00, ?it/s]
+
+
+
+ Evaluating: 0%| | 0/1 [00:00, ?it/s]
+
+
+
+
+
+
+
+
+ |
+ user_input |
+ reference_tool_calls |
+ tool_call_f1 |
+
+
+
+
+ | 0 |
+ [{'content': 'What's the weather in Paris?', '... |
+ [{'name': 'weatherTool', 'args': {'location': ... |
+ 0.0 |
+
+
+
+
+
+
+## Convert recorded AG-UI events
+Use the conversion helpers when you already have an event log to grade offline.
+
+
+
+```python
+events = [
+ TextMessageChunkEvent(
+ message_id="assistant-1",
+ role="assistant",
+ delta="Hello from AG-UI!",
+ )
+]
+
+messages_from_stream = convert_to_ragas_messages(events, metadata=True)
+
+snapshot = MessagesSnapshotEvent(
+ messages=[
+ UserMessage(id="msg-1", content="Hello?"),
+ AssistantMessage(id="msg-2", content="Hi! How can I help you today?"),
+ ]
+)
+
+messages_from_snapshot = convert_messages_snapshot(snapshot)
+
+messages_from_stream, messages_from_snapshot
+
+```
+
+
+
+
+ ([AIMessage(content='Hello from AG-UI!', metadata={'timestamp': None, 'message_id': 'assistant-1'}, type='ai', tool_calls=None)],
+ [HumanMessage(content='Hello?', metadata=None, type='human'),
+ AIMessage(content='Hi! How can I help you today?', metadata=None, type='ai', tool_calls=None)])
+
+
+
+
+```python
+
+```
diff --git a/docs/howtos/integrations/ag_ui.ipynb b/docs/howtos/integrations/ag_ui.ipynb
new file mode 100644
index 0000000000..1faa0e53d8
--- /dev/null
+++ b/docs/howtos/integrations/ag_ui.ipynb
@@ -0,0 +1,516 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "cdcdd4d1",
+ "metadata": {},
+ "source": [
+ "# AG-UI Integration\n",
+ "Ragas can evaluate agents that stream events via the [AG-UI protocol](https://docs.ag-ui.com/). This notebook shows how to build evaluation datasets, configure metrics, and score AG-UI endpoints.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ca0af3e1",
+ "metadata": {},
+ "source": [
+ "## Prerequisites\n",
+ "- Install optional dependencies with `pip install \"ragas[ag-ui]\" langchain-openai python-dotenv nest_asyncio`\n",
+ "- Start an AG-UI compatible agent locally (Google ADK, PydanticAI, CrewAI, etc.)\n",
+ "- Create an `.env` file with your evaluator LLM credentials (e.g. `OPENAI_API_KEY`, `GOOGLE_API_KEY`, etc.)\n",
+ "- If you run this notebook, call `nest_asyncio.apply()` (shown below) so you can `await` coroutines in-place.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "67b16d64",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# !pip install \"ragas[ag-ui]\" langchain-openai python-dotenv nest_asyncio\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7486082d",
+ "metadata": {},
+ "source": [
+ "## Imports and environment setup\n",
+ "Load environment variables and import the classes used throughout the walkthrough.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "c051059b",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import nest_asyncio\n",
+ "from ag_ui.core import (\n",
+ " AssistantMessage,\n",
+ " MessagesSnapshotEvent,\n",
+ " TextMessageChunkEvent,\n",
+ " UserMessage,\n",
+ ")\n",
+ "from dotenv import load_dotenv\n",
+ "from IPython.display import display\n",
+ "from langchain_openai import ChatOpenAI\n",
+ "\n",
+ "from ragas.dataset_schema import EvaluationDataset, MultiTurnSample, SingleTurnSample\n",
+ "from ragas.integrations.ag_ui import (\n",
+ " convert_messages_snapshot,\n",
+ " convert_to_ragas_messages,\n",
+ " evaluate_ag_ui_agent,\n",
+ ")\n",
+ "from ragas.llms import LangchainLLMWrapper\n",
+ "from ragas.messages import HumanMessage, ToolCall\n",
+ "from ragas.metrics import FactualCorrectness, ToolCallF1\n",
+ "\n",
+ "load_dotenv()\n",
+ "# Patch the existing notebook loop so we can await coroutines safely\n",
+ "nest_asyncio.apply()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7e69bc6c",
+ "metadata": {},
+ "source": [
+ "## Build single-turn evaluation data\n",
+ "Create `SingleTurnSample` entries when you only need to grade the final answer text.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "803cc334",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "EvaluationDataset(features=['user_input', 'reference'], len=2)"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "scientist_questions = EvaluationDataset(\n",
+ " samples=[\n",
+ " SingleTurnSample(\n",
+ " user_input=\"Who originated the theory of relativity?\",\n",
+ " reference=\"Albert Einstein originated the theory of relativity.\",\n",
+ " ),\n",
+ " SingleTurnSample(\n",
+ " user_input=\"Who discovered penicillin and when?\",\n",
+ " reference=\"Alexander Fleming discovered penicillin in 1928.\",\n",
+ " ),\n",
+ " ]\n",
+ ")\n",
+ "\n",
+ "scientist_questions"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d4f1bbb7",
+ "metadata": {},
+ "source": [
+ "## Build multi-turn conversations\n",
+ "For tool-usage metrics, extend the dataset with `MultiTurnSample` and expected tool calls.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "7a55eb0a",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "EvaluationDataset(features=['user_input', 'reference_tool_calls'], len=1)"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "weather_queries = EvaluationDataset(\n",
+ " samples=[\n",
+ " MultiTurnSample(\n",
+ " user_input=[HumanMessage(content=\"What's the weather in Paris?\")],\n",
+ " reference_tool_calls=[\n",
+ " ToolCall(name=\"weatherTool\", args={\"location\": \"Paris\"})\n",
+ " ],\n",
+ " )\n",
+ " ]\n",
+ ")\n",
+ "\n",
+ "weather_queries"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "14c3da95",
+ "metadata": {},
+ "source": [
+ "## Configure metrics and the evaluator LLM\n",
+ "Wrap your grading model with the appropriate adapter and instantiate the metrics you plan to use.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "05a59dde",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/var/folders/8k/tf3xr1rd1fl_dz35dfhfp_tc0000gn/T/ipykernel_93918/2135722072.py:1: DeprecationWarning: LangchainLLMWrapper is deprecated and will be removed in a future version. Use llm_factory instead: from openai import OpenAI; from ragas.llms import llm_factory; llm = llm_factory('gpt-4o-mini', client=OpenAI(api_key='...'))\n",
+ " evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model=\"gpt-4o-mini\"))\n"
+ ]
+ }
+ ],
+ "source": [
+ "evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model=\"gpt-4o-mini\"))\n",
+ "\n",
+ "qa_metrics = [FactualCorrectness(llm=evaluator_llm)]\n",
+ "tool_metrics = [ToolCallF1()] # rule-based, no LLM required"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "9e65fe39",
+ "metadata": {},
+ "source": [
+ "## Evaluate a live AG-UI endpoint\n",
+ "Set the endpoint URL exposed by your agent. Toggle the flags when you are ready to run the evaluations.\n",
+ "In Jupyter/IPython you can `await` the helpers directly once `nest_asyncio.apply()` has been called.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 32,
+ "id": "b9808e04",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "AG_UI_ENDPOINT = \"http://localhost:8000/agentic_chat\" # Update to match your agent\n",
+ "\n",
+ "RUN_FACTUAL_EVAL = False\n",
+ "RUN_TOOL_EVAL = False"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 34,
+ "id": "79e80383",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "23ae31282b934d0390f316f966690d44",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Calling AG-UI Agent: 0%| | 0/2 [00:00, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "dd7449e7458a477da9b3bcfb5826b4bb",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Evaluating: 0%| | 0/2 [00:00, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " user_input | \n",
+ " retrieved_contexts | \n",
+ " response | \n",
+ " reference | \n",
+ " factual_correctness(mode=f1) | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " Who originated the theory of relativity? | \n",
+ " [] | \n",
+ " The theory of relativity was originated by Alb... | \n",
+ " Albert Einstein originated the theory of relat... | \n",
+ " 0.33 | \n",
+ "
\n",
+ " \n",
+ " | 1 | \n",
+ " Who discovered penicillin and when? | \n",
+ " [] | \n",
+ " Penicillin was discovered by Alexander Fleming... | \n",
+ " Alexander Fleming discovered penicillin in 1928. | \n",
+ " 1.00 | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " user_input retrieved_contexts \\\n",
+ "0 Who originated the theory of relativity? [] \n",
+ "1 Who discovered penicillin and when? [] \n",
+ "\n",
+ " response \\\n",
+ "0 The theory of relativity was originated by Alb... \n",
+ "1 Penicillin was discovered by Alexander Fleming... \n",
+ "\n",
+ " reference \\\n",
+ "0 Albert Einstein originated the theory of relat... \n",
+ "1 Alexander Fleming discovered penicillin in 1928. \n",
+ "\n",
+ " factual_correctness(mode=f1) \n",
+ "0 0.33 \n",
+ "1 1.00 "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "async def evaluate_factual():\n",
+ " return await evaluate_ag_ui_agent(\n",
+ " endpoint_url=AG_UI_ENDPOINT,\n",
+ " dataset=scientist_questions,\n",
+ " metrics=qa_metrics,\n",
+ " evaluator_llm=evaluator_llm,\n",
+ " metadata=True,\n",
+ " )\n",
+ "\n",
+ "\n",
+ "if RUN_FACTUAL_EVAL:\n",
+ " factual_result = await evaluate_factual()\n",
+ " factual_df = factual_result.to_pandas()\n",
+ " display(factual_df)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 35,
+ "id": "8b731189",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "351ca0c016cc46cd9c0321d43d283f05",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Calling AG-UI Agent: 0%| | 0/1 [00:00, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "39ea65a7bddf47b699d35813571e4d88",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Evaluating: 0%| | 0/1 [00:00, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " user_input | \n",
+ " reference_tool_calls | \n",
+ " tool_call_f1 | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " [{'content': 'What's the weather in Paris?', '... | \n",
+ " [{'name': 'weatherTool', 'args': {'location': ... | \n",
+ " 0.0 | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " user_input \\\n",
+ "0 [{'content': 'What's the weather in Paris?', '... \n",
+ "\n",
+ " reference_tool_calls tool_call_f1 \n",
+ "0 [{'name': 'weatherTool', 'args': {'location': ... 0.0 "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "async def evaluate_tool_usage():\n",
+ " return await evaluate_ag_ui_agent(\n",
+ " endpoint_url=AG_UI_ENDPOINT,\n",
+ " dataset=weather_queries,\n",
+ " metrics=tool_metrics,\n",
+ " evaluator_llm=evaluator_llm,\n",
+ " )\n",
+ "\n",
+ "\n",
+ "if RUN_TOOL_EVAL:\n",
+ " tool_result = await evaluate_tool_usage()\n",
+ " tool_df = tool_result.to_pandas()\n",
+ " display(tool_df)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "452627cf",
+ "metadata": {},
+ "source": [
+ "## Convert recorded AG-UI events\n",
+ "Use the conversion helpers when you already have an event log to grade offline.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "b691bcf7",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "([AIMessage(content='Hello from AG-UI!', metadata={'timestamp': None, 'message_id': 'assistant-1'}, type='ai', tool_calls=None)],\n",
+ " [HumanMessage(content='Hello?', metadata=None, type='human'),\n",
+ " AIMessage(content='Hi! How can I help you today?', metadata=None, type='ai', tool_calls=None)])"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "events = [\n",
+ " TextMessageChunkEvent(\n",
+ " message_id=\"assistant-1\",\n",
+ " role=\"assistant\",\n",
+ " delta=\"Hello from AG-UI!\",\n",
+ " )\n",
+ "]\n",
+ "\n",
+ "messages_from_stream = convert_to_ragas_messages(events, metadata=True)\n",
+ "\n",
+ "snapshot = MessagesSnapshotEvent(\n",
+ " messages=[\n",
+ " UserMessage(id=\"msg-1\", content=\"Hello?\"),\n",
+ " AssistantMessage(id=\"msg-2\", content=\"Hi! How can I help you today?\"),\n",
+ " ]\n",
+ ")\n",
+ "\n",
+ "messages_from_snapshot = convert_messages_snapshot(snapshot)\n",
+ "\n",
+ "messages_from_stream, messages_from_snapshot"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "cf6235fd-ec1c-4e87-a53f-a2ebf89a29b6",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.14"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/docs/howtos/integrations/ag_ui.md b/docs/howtos/integrations/ag_ui.md
new file mode 100644
index 0000000000..353a8445e0
--- /dev/null
+++ b/docs/howtos/integrations/ag_ui.md
@@ -0,0 +1,197 @@
+# AG-UI
+
+[AG-UI](https://docs.ag-ui.com/) is an event-based protocol for streaming agent updates to user interfaces. The protocol standardizes message, tool-call, and state events, which makes it easy to plug different agent runtimes into visual frontends. The `ragas.integrations.ag_ui` module helps you transform those event streams into Ragas message objects and evaluate live AG-UI endpoints with the same metrics used across the rest of the Ragas ecosystem.
+
+This guide assumes you already have an AG-UI compatible agent running (for example, one built with Google ADK, PydanticAI, or CrewAI) and that you are familiar with creating evaluation datasets in Ragas.
+
+## Install the integration
+
+The AG-UI helpers live behind an optional extra. Install it together with the dependencies required by your evaluator LLM. When running inside Jupyter or IPython, include `nest_asyncio` so you can reuse the notebook's event loop.
+
+```bash
+pip install "ragas[ag-ui]" langchain-openai python-dotenv nest_asyncio
+```
+
+Configure your evaluator LLM credentials. For example, if you are using OpenAI models:
+
+```bash
+# .env
+OPENAI_API_KEY=sk-...
+```
+
+Load the environment variables inside Python before running the examples:
+
+```python
+from dotenv import load_dotenv
+import nest_asyncio
+
+load_dotenv()
+
+# If you're inside Jupyter/IPython, patch the running event loop once.
+nest_asyncio.apply()
+```
+
+## Build an evaluation dataset
+
+`EvaluationDataset` can contain single-turn or multi-turn samples. With AG-UI you can evaluate either pattern—single questions with free-form responses, or longer conversations that can include tool calls.
+
+### Single-turn samples
+
+Use `SingleTurnSample` when you only need the final answer text.
+
+```python
+from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+
+scientist_questions = EvaluationDataset(
+ samples=[
+ SingleTurnSample(
+ user_input="Who originated the theory of relativity?",
+ reference="Albert Einstein originated the theory of relativity."
+ ),
+ SingleTurnSample(
+ user_input="Who discovered penicillin and when?",
+ reference="Alexander Fleming discovered penicillin in 1928."
+ ),
+ ]
+)
+```
+
+### Multi-turn samples with tool expectations
+
+When you want to grade intermediate agent behavior—like whether it calls tools correctly—switch to `MultiTurnSample`. Provide an initial conversation history and (optionally) expected tool calls.
+
+```python
+from ragas.dataset_schema import EvaluationDataset, MultiTurnSample
+from ragas.messages import HumanMessage, ToolCall
+
+weather_queries = EvaluationDataset(
+ samples=[
+ MultiTurnSample(
+ user_input=[HumanMessage(content="What's the weather in Paris?")],
+ reference_tool_calls=[
+ ToolCall(name="weatherTool", args={"location": "Paris"})
+ ]
+ )
+ ]
+)
+```
+
+## Choose metrics and evaluator model
+
+The integration works with any Ragas metric. For most text-based evaluations you will want a grading LLM. Wrap your model with the appropriate adapter (LangChain shown here, but llama-index and LiteLLM wrappers work as well).
+
+```python
+from ragas.metrics import FactualCorrectness, ToolCallF1
+from ragas.llms import LangchainLLMWrapper
+from langchain_openai import ChatOpenAI
+
+evaluator_llm = LangchainLLMWrapper(ChatOpenAI(model="gpt-4o-mini"))
+
+qa_metrics = [FactualCorrectness(llm=evaluator_llm)]
+tool_metrics = [ToolCallF1()] # rule-based metric, no LLM required
+```
+
+## Evaluate a live AG-UI endpoint
+
+`evaluate_ag_ui_agent` calls your FastAPI endpoint, captures the AG-UI Server-Sent Events (SSE) stream, converts those events into Ragas messages, and runs the metrics you selected.
+
+> ⚠️ The endpoint must expose the AG-UI SSE stream. Common paths include `/chat`, `/agent`, or `/agentic_chat`.
+
+### Evaluate factual responses
+
+In Jupyter or IPython, use top-level `await` (after `nest_asyncio.apply()`) instead of `asyncio.run` to avoid the "event loop is already running" error. For scripts you can keep `asyncio.run`.
+
+```python
+import asyncio
+from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+
+async def run_factual_eval():
+ result = await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agentic_chat",
+ dataset=scientist_questions,
+ metrics=qa_metrics,
+ evaluator_llm=evaluator_llm,
+ metadata=True, # optional, keeps run/thread metadata on messages
+ )
+ return result
+
+# In Jupyter/IPython (after calling nest_asyncio.apply())
+factual_result = await run_factual_eval()
+
+# In a standalone script, use:
+# factual_result = asyncio.run(run_factual_eval())
+factual_result.to_pandas()
+```
+
+The resulting dataframe includes per-sample scores, raw agent responses, and any retrieved contexts (if provided by the agent). You can save it with `result.save()` or export to CSV through pandas.
+
+### Evaluate tool usage
+
+The same function supports multi-turn datasets. Agent responses (AI messages and tool outputs) are appended to the existing conversation before scoring.
+
+```python
+async def run_tool_eval():
+ result = await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agentic_chat",
+ dataset=weather_queries,
+ metrics=tool_metrics,
+ evaluator_llm=evaluator_llm,
+ )
+ return result
+
+# In Jupyter/IPython
+tool_result = await run_tool_eval()
+
+# Or in a script
+# tool_result = asyncio.run(run_tool_eval())
+tool_result.to_pandas()
+```
+
+If a request fails, the executor logs the error and marks the corresponding sample with `NaN` scores so you can retry or inspect the endpoint logs.
+
+## Working directly with AG-UI events
+
+Sometimes you may want to collect event logs separately—perhaps from a recorded run or a staging environment—and evaluate them offline. The conversion helpers expose the same parsing logic used by `evaluate_ag_ui_agent`.
+
+```python
+from ragas.integrations.ag_ui import convert_to_ragas_messages
+from ag_ui.core import TextMessageChunkEvent
+
+events = [
+ TextMessageChunkEvent(
+ message_id="assistant-1",
+ role="assistant",
+ delta="Hello from AG-UI!",
+ timestamp="2024-12-01T00:00:00Z",
+ )
+]
+
+ragas_messages = convert_to_ragas_messages(events, metadata=True)
+```
+
+If you already have a `MessagesSnapshotEvent` you can skip streaming reconstruction and call `convert_messages_snapshot`.
+
+```python
+from ragas.integrations.ag_ui import convert_messages_snapshot
+from ag_ui.core import MessagesSnapshotEvent, UserMessage, AssistantMessage
+
+snapshot = MessagesSnapshotEvent(
+ messages=[
+ UserMessage(id="msg-1", content="Hello?"),
+ AssistantMessage(id="msg-2", content="Hi! How can I help you today?"),
+ ]
+)
+
+ragas_messages = convert_messages_snapshot(snapshot)
+```
+
+The converted messages can be plugged into `EvaluationDataset` objects or passed directly to lower-level Ragas evaluation APIs if you need custom workflows.
+
+## Tips for production evaluations
+
+- **Batch size**: use the `batch_size` argument to control parallel requests to your agent.
+- **Custom headers**: pass authentication tokens or tenant IDs via `extra_headers`.
+- **Timeouts**: tune the `timeout` parameter if your agent performs long-running tool calls.
+- **Metadata debugging**: set `metadata=True` to keep AG-UI run, thread, and message IDs on every `RagasMessage` for easier traceability.
+
+Once you are satisfied with your scoring setup, consider wrapping the snippets in a script or notebook. An example walkthrough notebook is available at `docs/howtos/integrations/ag_ui.ipynb`.
diff --git a/examples/ragas_examples/ag_ui_agent_evals/README.md b/examples/ragas_examples/ag_ui_agent_evals/README.md
new file mode 100644
index 0000000000..0846b6b483
--- /dev/null
+++ b/examples/ragas_examples/ag_ui_agent_evals/README.md
@@ -0,0 +1,315 @@
+# AG-UI Agent Evaluation Examples
+
+This example demonstrates how to evaluate agents built with the **AG-UI protocol** using Ragas metrics.
+
+## What is AG-UI?
+
+AG-UI (Agent-to-UI) is a protocol for streaming agent events from backend to frontend. It defines a standardized event format for agent-to-UI communication, enabling real-time streaming of agent actions, tool calls, and responses.
+
+## Prerequisites
+
+Before running these examples, you need to have an AG-UI compatible agent running. Follow the [AG-UI Quickstart Guide](https://docs.ag-ui.com/quickstart/applications) to set up your agent.
+
+### Popular AG-UI Compatible Frameworks
+
+- **Google ADK (Agent Development Kit)** - Google's framework for building AI agents
+- **Pydantic AI** - Type-safe agent framework using Pydantic
+- **Mastra** - Modular, TypeScript-based agentic AI framework
+- **Crew.ai** - Python framework for orchestrating collaborative, specialized AI agent teams
+- And more...
+
+### Example Setup
+
+Here's a quick overview of setting up an AG-UI agent (refer to the [official documentation](https://docs.ag-ui.com/quickstart/applications) for detailed instructions):
+
+1. Choose your agent framework (e.g., Google ADK, Pydantic AI)
+2. Implement your agent with the required tools
+3. Start the AG-UI server (typically runs at `http://localhost:8000/chat` or `http://localhost:8000/agentic_chat`)
+4. Verify the endpoint is accessible
+
+## Installation
+
+Install the required dependencies:
+
+```bash
+# From the ragas repository root
+uv pip install -e ".[dev]"
+
+# Or install specific dependencies
+pip install ragas langchain-openai
+```
+
+## Evaluation Scenarios
+
+This example includes two evaluation scenarios:
+
+### 1. Scientist Biographies (Factual Correctness)
+
+Tests the agent's ability to provide factually correct information about famous scientists.
+
+- **Metric**: `FactualCorrectness` - Measures how accurate the agent's responses are compared to reference answers
+- **Dataset**: `test_data/scientist_biographies.csv` - 5 questions about scientists (Einstein, Fleming, Newton, etc.)
+- **Sample Type**: `SingleTurnSample` - Simple question-answer pairs
+
+### 2. Weather Tool Usage (Tool Call F1)
+
+Tests the agent's ability to correctly invoke the weather tool when appropriate.
+
+- **Metric**: `ToolCallF1` - F1 score measuring precision and recall of tool invocations
+- **Dataset**: `test_data/weather_tool_calls.csv` - 5 queries requiring weather tool calls
+- **Sample Type**: `MultiTurnSample` - Multi-turn conversations with tool call expectations
+
+## Usage
+
+### Basic Usage
+
+Run both evaluation scenarios:
+
+```bash
+cd examples/ragas_examples/ag_ui_agent_evals
+python evals.py --endpoint-url http://localhost:8000/agentic_chat
+```
+
+### Command Line Options
+
+```bash
+# Specify a different endpoint
+python evals.py --endpoint-url http://localhost:8010/chat
+
+# Use a different evaluator model
+python evals.py --evaluator-model gpt-4o
+
+# Skip the factual correctness evaluation
+python evals.py --skip-factual
+
+# Skip the tool call evaluation
+python evals.py --skip-tool-eval
+
+# Specify output directory for results
+python evals.py --output-dir ./results
+
+# Combine options
+python evals.py \
+ --endpoint-url http://localhost:8000/agentic_chat \
+ --evaluator-model gpt-4o-mini \
+ --output-dir ./my_results
+```
+
+### Using uv (Recommended)
+
+```bash
+# Run with uv from the examples directory
+cd examples
+uv run python ragas_examples/ag_ui_agent_evals/evals.py --endpoint-url http://localhost:8000/agentic_chat
+```
+
+## Expected Output
+
+### Console Output
+
+The script will print detailed evaluation results:
+
+```
+================================================================================
+Starting Scientist Biographies Evaluation
+================================================================================
+Loading scientist biographies dataset from .../test_data/scientist_biographies.csv
+Loaded 5 scientist biography samples
+Evaluating against endpoint: http://localhost:8000/agentic_chat
+
+================================================================================
+Scientist Biographies Evaluation Results
+================================================================================
+ user_input ... factual_correctness(mode=f1)
+0 Who originated the theory of relativity... ... 0.75
+1 Who discovered penicillin and when... ... 1.00
+...
+
+Average Factual Correctness: 0.7160
+Perfect scores (1.0): 2/5
+
+Results saved to: .../scientist_biographies_results_20250101_143022.csv
+
+================================================================================
+Starting Weather Tool Usage Evaluation
+================================================================================
+...
+Average Tool Call F1: 1.0000
+Perfect scores (F1=1.0): 5/5
+Failed scores (F1=0.0): 0/5
+
+Results saved to: .../weather_tool_calls_results_20250101_143045.csv
+
+================================================================================
+All evaluations completed successfully!
+================================================================================
+```
+
+### CSV Output Files
+
+Results are saved as timestamped CSV files:
+
+- `scientist_biographies_results_YYYYMMDD_HHMMSS.csv`
+- `weather_tool_calls_results_YYYYMMDD_HHMMSS.csv`
+
+Example CSV structure:
+
+```csv
+user_input,response,reference,factual_correctness(mode=f1)
+"Who originated the theory of relativity...","Albert Einstein...","Albert Einstein originated...",0.75
+```
+
+## Customizing the Evaluation
+
+### Adding New Test Cases
+
+#### For Factual Correctness
+
+Edit `test_data/scientist_biographies.csv`:
+
+```csv
+user_input,reference
+"Your question here","Your reference answer here"
+```
+
+#### For Tool Call Evaluation
+
+Edit `test_data/weather_tool_calls.csv`:
+
+```csv
+user_input,reference_tool_calls
+"What's the weather in Paris?","[{\"name\": \"weatherTool\", \"args\": {\"location\": \"Paris\"}}]"
+```
+
+### Using Different Metrics
+
+Modify `evals.py` to include additional Ragas metrics:
+
+```python
+from ragas.metrics import AnswerRelevancy, ContextPrecision
+
+# In evaluate_scientist_biographies function:
+metrics = [
+ FactualCorrectness(),
+ AnswerRelevancy(), # Add additional metrics
+]
+```
+
+### Evaluating Your Own Agent
+
+1. **Ensure your agent supports AG-UI protocol**
+ - Agent must expose an endpoint that accepts AG-UI messages
+ - Agent must return Server-Sent Events (SSE) with AG-UI event format
+
+2. **Update the endpoint URL**
+ ```bash
+ python evals.py --endpoint-url http://your-agent:port/your-endpoint
+ ```
+
+3. **Customize test data**
+ - Create new CSV files with your test cases
+ - Update the loader functions in `evals.py` if needed
+
+## Troubleshooting
+
+### Connection Errors
+
+```
+Error: Connection refused at http://localhost:8000/agentic_chat
+```
+
+**Solution**: Ensure your AG-UI agent is running and accessible at the specified endpoint.
+
+### Import Errors
+
+```
+ImportError: No module named 'ragas'
+```
+
+**Solution**: Install ragas and its dependencies:
+```bash
+pip install ragas langchain-openai
+```
+
+### API Key Errors
+
+```
+Error: OpenAI API key not found
+```
+
+**Solution**: Set your OpenAI API key:
+```bash
+export OPENAI_API_KEY='your-api-key-here'
+```
+
+### Agent Timeout
+
+```
+Error: Request timeout after 60.0 seconds
+```
+
+**Solution**: Your agent may be slow to respond. You can increase the timeout in the code or optimize your agent's performance.
+
+## Understanding the Results
+
+### Factual Correctness Metric
+
+- **Range**: 0.0 to 1.0
+- **1.0**: Perfect match between response and reference
+- **0.5-0.9**: Partially correct with some missing or incorrect information
+- **<0.5**: Significant discrepancies with the reference
+
+### Tool Call F1 Metric
+
+- **Range**: 0.0 to 1.0
+- **1.0**: Perfect tool call accuracy (correct tools with correct arguments)
+- **0.5-0.9**: Some correct tools but missing some or calling extra tools
+- **0.0**: Incorrect tool usage or no tool calls when expected
+
+## Integration with Your Workflow
+
+### CI/CD Integration
+
+You can integrate these evaluations into your CI/CD pipeline:
+
+```bash
+# In your CI script
+python evals.py \
+ --endpoint-url http://staging-agent:8000/chat \
+ --output-dir ./test-results \
+ || exit 1
+```
+
+### Tracking Performance Over Time
+
+Save results with timestamps to track improvements:
+
+```bash
+# Run evaluations regularly
+python evals.py --output-dir ./historical-results/$(date +%Y%m%d)
+```
+
+### Automated Testing
+
+Create a simple test harness:
+
+```python
+import subprocess
+import sys
+
+result = subprocess.run(
+ ["python", "evals.py", "--endpoint-url", "http://localhost:8000/chat"],
+ capture_output=True
+)
+
+if result.returncode != 0:
+ print("Evaluation failed!")
+ sys.exit(1)
+```
+
+## Additional Resources
+
+- [AG-UI Documentation](https://docs.ag-ui.com)
+- [AG-UI Quickstart](https://docs.ag-ui.com/quickstart/applications)
+- [Ragas Documentation](https://docs.ragas.io)
+- [Ragas AG-UI Integration Guide](https://docs.ragas.io/integrations/ag-ui)
diff --git a/examples/ragas_examples/ag_ui_agent_evals/__init__.py b/examples/ragas_examples/ag_ui_agent_evals/__init__.py
new file mode 100644
index 0000000000..7b75b49c7f
--- /dev/null
+++ b/examples/ragas_examples/ag_ui_agent_evals/__init__.py
@@ -0,0 +1,52 @@
+"""
+AG-UI Agent Evaluation Examples
+
+This package demonstrates how to evaluate agents built with the AG-UI protocol
+using Ragas metrics.
+
+## What is AG-UI?
+
+AG-UI (Agent-to-UI) is a protocol for streaming agent events from backend to frontend.
+It defines a standardized event format for agent-to-UI communication.
+
+## Getting Started
+
+Before running these examples, you'll need to have an AG-UI compatible agent running.
+Follow the AG-UI quickstart guide to set up your agent:
+
+https://docs.ag-ui.com/quickstart/applications
+
+Popular agent frameworks that support AG-UI include:
+- Google ADK (Agent Development Kit)
+- Pydantic AI
+- And more...
+
+## Running the Examples
+
+Once you have your AG-UI agent endpoint running (typically at
+http://localhost:8000/chat or http://localhost:8000/agentic_chat), you can run
+the evaluation examples:
+
+```bash
+# From the examples directory
+cd ragas_examples/ag_ui_agent_evals
+uv run python evals.py --endpoint-url http://localhost:8000/agentic_chat
+```
+
+## Evaluation Scenarios
+
+This package includes two evaluation scenarios:
+
+1. **Scientist Biographies** - Tests factual correctness of agent responses
+ using the FactualCorrectness metric with SingleTurnSample datasets.
+
+2. **Weather Tool Usage** - Tests tool calling accuracy using the ToolCallF1
+ metric with MultiTurnSample datasets.
+
+## Results
+
+Evaluation results are saved as CSV files with timestamps for tracking performance
+over time.
+"""
+
+__version__ = "0.1.0"
diff --git a/examples/ragas_examples/ag_ui_agent_evals/evals.py b/examples/ragas_examples/ag_ui_agent_evals/evals.py
new file mode 100644
index 0000000000..fbf8229170
--- /dev/null
+++ b/examples/ragas_examples/ag_ui_agent_evals/evals.py
@@ -0,0 +1,314 @@
+"""
+AG-UI Agent Evaluation Script
+
+This script demonstrates how to evaluate agents built with the AG-UI protocol
+using Ragas metrics. It includes two evaluation scenarios:
+
+1. Scientist Biographies - Tests factual correctness of agent responses
+2. Weather Tool Usage - Tests tool calling accuracy
+
+Prerequisites:
+- An AG-UI compatible agent running at the specified endpoint URL
+- See https://docs.ag-ui.com/quickstart/applications for agent setup
+
+Usage:
+ python evals.py --endpoint-url http://localhost:8000/agentic_chat
+ python evals.py --endpoint-url http://localhost:8000/chat --skip-tool-eval
+"""
+
+import argparse
+import asyncio
+import csv
+import json
+import logging
+import os
+from datetime import datetime
+from pathlib import Path
+from typing import List
+
+from langchain_openai import ChatOpenAI
+
+from ragas.dataset_schema import (
+ EvaluationDataset,
+ MultiTurnSample,
+ SingleTurnSample,
+)
+from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+from ragas.llms import LangchainLLMWrapper
+from ragas.messages import HumanMessage, ToolCall
+from ragas.metrics import FactualCorrectness, ToolCallF1
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+# Get the directory where this script is located
+SCRIPT_DIR = Path(__file__).parent
+TEST_DATA_DIR = SCRIPT_DIR / "test_data"
+
+
+def load_scientist_dataset() -> EvaluationDataset:
+ """
+ Load the scientist biographies dataset from CSV.
+
+ Returns:
+ EvaluationDataset with SingleTurnSample entries for testing factual correctness.
+ """
+ csv_path = TEST_DATA_DIR / "scientist_biographies.csv"
+ logger.info(f"Loading scientist biographies dataset from {csv_path}")
+
+ samples = []
+ with open(csv_path, "r", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ for row in reader:
+ sample = SingleTurnSample(
+ user_input=row["user_input"], reference=row["reference"]
+ )
+ samples.append(sample)
+
+ logger.info(f"Loaded {len(samples)} scientist biography samples")
+ return EvaluationDataset(samples=samples)
+
+
+def load_weather_dataset() -> EvaluationDataset:
+ """
+ Load the weather tool call dataset from CSV.
+
+ Returns:
+ EvaluationDataset with MultiTurnSample entries for testing tool call accuracy.
+ """
+ csv_path = TEST_DATA_DIR / "weather_tool_calls.csv"
+ logger.info(f"Loading weather tool call dataset from {csv_path}")
+
+ samples = []
+ with open(csv_path, "r", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ for row in reader:
+ # Parse the reference_tool_calls JSON
+ tool_calls_data = json.loads(row["reference_tool_calls"])
+ tool_calls = [
+ ToolCall(name=tc["name"], args=tc["args"]) for tc in tool_calls_data
+ ]
+
+ # Create MultiTurnSample with user_input as a list of HumanMessage
+ sample = MultiTurnSample(
+ user_input=[HumanMessage(content=row["user_input"])],
+ reference_tool_calls=tool_calls,
+ )
+ samples.append(sample)
+
+ logger.info(f"Loaded {len(samples)} weather tool call samples")
+ return EvaluationDataset(samples=samples)
+
+
+async def evaluate_scientist_biographies(
+ endpoint_url: str, evaluator_llm: LangchainLLMWrapper
+) -> tuple:
+ """
+ Evaluate the agent's ability to provide factually correct information
+ about scientists.
+
+ Args:
+ endpoint_url: The AG-UI endpoint URL
+ evaluator_llm: The LLM to use for evaluation
+
+ Returns:
+ Tuple of (result, dataframe) where result is the EvaluationResult
+ and dataframe is the pandas DataFrame with results.
+ """
+ logger.info("=" * 80)
+ logger.info("Starting Scientist Biographies Evaluation")
+ logger.info("=" * 80)
+
+ # Load dataset
+ dataset = load_scientist_dataset()
+
+ # Define metrics
+ metrics = [FactualCorrectness()]
+
+ # Run evaluation
+ logger.info(f"Evaluating against endpoint: {endpoint_url}")
+ result = await evaluate_ag_ui_agent(
+ endpoint_url=endpoint_url,
+ dataset=dataset,
+ metrics=metrics,
+ evaluator_llm=evaluator_llm,
+ )
+
+ # Convert to DataFrame and clean up
+ df = result.to_pandas()
+ df = df.drop(columns=["retrieved_contexts"], errors="ignore")
+
+ # Print summary
+ logger.info("\n" + "=" * 80)
+ logger.info("Scientist Biographies Evaluation Results")
+ logger.info("=" * 80)
+ logger.info(f"\nDataFrame shape: {df.shape}")
+ logger.info(f"\n{df.to_string()}")
+
+ if "factual_correctness(mode=f1)" in df.columns:
+ avg_correctness = df["factual_correctness(mode=f1)"].mean()
+ logger.info(f"\nAverage Factual Correctness: {avg_correctness:.4f}")
+ logger.info(
+ f"Perfect scores (1.0): {(df['factual_correctness(mode=f1)'] == 1.0).sum()}/{len(df)}"
+ )
+
+ return result, df
+
+
+async def evaluate_weather_tool_use(
+ endpoint_url: str, evaluator_llm: LangchainLLMWrapper
+) -> tuple:
+ """
+ Evaluate the agent's ability to correctly call the weather tool.
+
+ Args:
+ endpoint_url: The AG-UI endpoint URL
+ evaluator_llm: The LLM to use for evaluation
+
+ Returns:
+ Tuple of (result, dataframe) where result is the EvaluationResult
+ and dataframe is the pandas DataFrame with results.
+ """
+ logger.info("\n" + "=" * 80)
+ logger.info("Starting Weather Tool Usage Evaluation")
+ logger.info("=" * 80)
+
+ # Load dataset
+ dataset = load_weather_dataset()
+
+ # Define metrics
+ metrics = [ToolCallF1()]
+
+ # Run evaluation
+ logger.info(f"Evaluating against endpoint: {endpoint_url}")
+ result = await evaluate_ag_ui_agent(
+ endpoint_url=endpoint_url,
+ dataset=dataset,
+ metrics=metrics,
+ evaluator_llm=evaluator_llm,
+ )
+
+ # Convert to DataFrame and clean up
+ df = result.to_pandas()
+ columns_to_drop = [
+ col for col in ["retrieved_contexts", "reference"] if col in df.columns
+ ]
+ if columns_to_drop:
+ df = df.drop(columns=columns_to_drop)
+
+ # Print summary
+ logger.info("\n" + "=" * 80)
+ logger.info("Weather Tool Usage Evaluation Results")
+ logger.info("=" * 80)
+ logger.info(f"\nDataFrame shape: {df.shape}")
+ logger.info(f"\n{df.to_string()}")
+
+ if "tool_call_f1" in df.columns:
+ avg_f1 = df["tool_call_f1"].mean()
+ logger.info(f"\nAverage Tool Call F1: {avg_f1:.4f}")
+ logger.info(
+ f"Perfect scores (F1=1.0): {(df['tool_call_f1'] == 1.0).sum()}/{len(df)}"
+ )
+ logger.info(
+ f"Failed scores (F1=0.0): {(df['tool_call_f1'] == 0.0).sum()}/{len(df)}"
+ )
+
+ return result, df
+
+
+def save_results(df, scenario_name: str, output_dir: Path = None):
+ """
+ Save evaluation results to a timestamped CSV file.
+
+ Args:
+ df: The pandas DataFrame with evaluation results
+ scenario_name: Name of the evaluation scenario
+ output_dir: Directory to save results (defaults to script directory)
+ """
+ if output_dir is None:
+ output_dir = SCRIPT_DIR
+
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"{scenario_name}_results_{timestamp}.csv"
+ filepath = output_dir / filename
+
+ df.to_csv(filepath, index=False)
+ logger.info(f"\nResults saved to: {filepath}")
+
+
+async def main():
+ """Main execution function."""
+ # Parse command line arguments
+ parser = argparse.ArgumentParser(
+ description="Evaluate AG-UI agents using Ragas metrics"
+ )
+ parser.add_argument(
+ "--endpoint-url",
+ type=str,
+ default="http://localhost:8000/agentic_chat",
+ help="AG-UI endpoint URL (default: http://localhost:8000/agentic_chat)",
+ )
+ parser.add_argument(
+ "--evaluator-model",
+ type=str,
+ default="gpt-4o-mini",
+ help="OpenAI model to use for evaluation (default: gpt-4o-mini)",
+ )
+ parser.add_argument(
+ "--skip-factual",
+ action="store_true",
+ help="Skip the factual correctness evaluation",
+ )
+ parser.add_argument(
+ "--skip-tool-eval",
+ action="store_true",
+ help="Skip the tool call evaluation",
+ )
+ parser.add_argument(
+ "--output-dir",
+ type=Path,
+ default=None,
+ help="Directory to save results (default: script directory)",
+ )
+
+ args = parser.parse_args()
+
+ # Setup evaluator LLM
+ logger.info(f"Setting up evaluator LLM: {args.evaluator_model}")
+ llm = ChatOpenAI(model=args.evaluator_model)
+ evaluator_llm = LangchainLLMWrapper(llm)
+
+ # Run evaluations
+ try:
+ if not args.skip_factual:
+ result, df = await evaluate_scientist_biographies(
+ args.endpoint_url, evaluator_llm
+ )
+ save_results(df, "scientist_biographies", args.output_dir)
+
+ if not args.skip_tool_eval:
+ result, df = await evaluate_weather_tool_use(
+ args.endpoint_url, evaluator_llm
+ )
+ save_results(df, "weather_tool_calls", args.output_dir)
+
+ logger.info("\n" + "=" * 80)
+ logger.info("All evaluations completed successfully!")
+ logger.info("=" * 80)
+
+ except Exception as e:
+ logger.error(f"\nEvaluation failed with error: {e}")
+ logger.error(
+ "\nPlease ensure your AG-UI agent is running at the specified endpoint."
+ )
+ logger.error(
+ "See https://docs.ag-ui.com/quickstart/applications for setup instructions."
+ )
+ raise
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv b/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv
new file mode 100644
index 0000000000..9bae4b1a9b
--- /dev/null
+++ b/examples/ragas_examples/ag_ui_agent_evals/test_data/scientist_biographies.csv
@@ -0,0 +1,6 @@
+user_input,reference
+"Who originated the theory of relativity and where were they born?","Albert Einstein originated the theory of relativity. He was born in Ulm, in the Kingdom of Württemberg, Germany."
+"Who discovered penicillin and when was it discovered?","Alexander Fleming discovered penicillin in 1928."
+"Who proposed the law of universal gravitation and in what century?","Isaac Newton proposed the law of universal gravitation in the 17th century."
+"Who is known as the father of modern chemistry and why?","Antoine Lavoisier is known as the father of modern chemistry for establishing the law of conservation of mass."
+"Who developed the polio vaccine and where was it first tested?","Jonas Salk developed the polio vaccine, first tested in the United States."
diff --git a/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv b/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv
new file mode 100644
index 0000000000..7dd4a0ea55
--- /dev/null
+++ b/examples/ragas_examples/ag_ui_agent_evals/test_data/weather_tool_calls.csv
@@ -0,0 +1,6 @@
+user_input,reference_tool_calls
+"What's the weather like in San Francisco?","[{""name"": ""weatherTool"", ""args"": {""location"": ""San Francisco""}}]"
+"Can you check the weather in Tokyo?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Tokyo""}}]"
+"What is the temperature like in Paris today?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Paris""}}]"
+"Is it sunny in Rome?","[{""name"": ""weatherTool"", ""args"": {""location"": ""Rome""}}]"
+"Is it raining in London right now?","[{""name"": ""weatherTool"", ""args"": {""location"": ""London""}}]"
diff --git a/mkdocs.yml b/mkdocs.yml
index 673f45b0c0..62ce82f979 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -130,6 +130,7 @@ nav:
- Evaluate and Improve a RAG App: howtos/applications/evaluate-and-improve-rag.md
- Integrations:
- howtos/integrations/index.md
+ - AG-UI: howtos/integrations/ag_ui.md
- Arize: howtos/integrations/_arize.md
- Amazon Bedrock: howtos/integrations/amazon_bedrock.md
- Haystack: howtos/integrations/haystack.md
diff --git a/pyproject.toml b/pyproject.toml
index 436b89bab2..7cfc6c8e2c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -60,6 +60,7 @@ gdrive = [
]
ai-frameworks = ["haystack-ai"]
oci = ["oci>=2.160.1"]
+ag-ui = ["ag-ui-protocol>=0.1.9", "httpx>=0.27.0"]
# Minimal dev dependencies for fast development setup (used by make install-minimal)
dev-minimal = [
diff --git a/src/ragas/integrations/__init__.py b/src/ragas/integrations/__init__.py
index 141ed39a4b..c9c40446bf 100644
--- a/src/ragas/integrations/__init__.py
+++ b/src/ragas/integrations/__init__.py
@@ -10,6 +10,7 @@
- Observability: Helicone, Langsmith, Opik
- Platforms: Amazon Bedrock, R2R
- AI Systems: Swarm for multi-agent evaluation
+- Protocols: AG-UI for event-based agent communication
Import tracing integrations:
```python
diff --git a/src/ragas/integrations/ag_ui.py b/src/ragas/integrations/ag_ui.py
new file mode 100644
index 0000000000..69bc928dd8
--- /dev/null
+++ b/src/ragas/integrations/ag_ui.py
@@ -0,0 +1,1312 @@
+"""
+AG-UI Protocol Integration for Ragas.
+
+This module provides conversion utilities and evaluation functions for AG-UI
+protocol agents. It supports converting AG-UI streaming events to Ragas message
+format and evaluating AG-UI FastAPI endpoints.
+
+AG-UI is an event-based protocol for agent-to-UI communication that uses typed
+events for streaming text messages, tool calls, and state synchronization. This
+integration supports both streaming events (Start-Content-End triads) and
+convenience chunk events (TextMessageChunk, ToolCallChunk) for complete messages.
+
+Functions:
+ convert_to_ragas_messages: Convert AG-UI event sequences to Ragas messages
+ convert_messages_snapshot: Convert AG-UI message snapshots to Ragas messages
+ evaluate_ag_ui_agent: Batch evaluate an AG-UI FastAPI endpoint
+
+Examples:
+ Convert streaming AG-UI events to Ragas messages::
+
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+ from ag_ui.core import Event
+
+ # List of AG-UI events from agent run
+ ag_ui_events: List[Event] = [...]
+
+ # Convert to Ragas messages
+ ragas_messages = convert_to_ragas_messages(ag_ui_events, metadata=True)
+
+ Evaluate an AG-UI agent endpoint (single-turn)::
+
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+ from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+ from ragas.metrics import AspectCritic
+
+ dataset = EvaluationDataset(samples=[
+ SingleTurnSample(user_input="What's the weather in SF?")
+ ])
+
+ result = await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[AspectCritic()]
+ )
+
+ Evaluate with multi-turn conversations and tool calls::
+
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+ from ragas.dataset_schema import EvaluationDataset, MultiTurnSample
+ from ragas.messages import HumanMessage, ToolCall
+ from ragas.metrics import ToolCallF1
+
+ dataset = EvaluationDataset(samples=[
+ MultiTurnSample(
+ user_input=[HumanMessage(content="What's the weather in SF?")],
+ reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})]
+ )
+ ])
+
+ result = await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[ToolCallF1()]
+ )
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import math
+import typing as t
+import uuid
+from typing import Any, Dict, List, Optional, Union
+
+from ragas.dataset_schema import (
+ EvaluationDataset,
+ EvaluationResult,
+ MultiTurnSample,
+ SingleTurnSample,
+)
+from ragas.evaluation import evaluate as ragas_evaluate
+from ragas.executor import Executor
+from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage
+from ragas.run_config import RunConfig
+
+if t.TYPE_CHECKING:
+ from ragas.metrics.base import Metric
+
+logger = logging.getLogger(__name__)
+
+
+# Lazy imports for ag_ui to avoid hard dependency
+def _import_ag_ui_core():
+ """Import AG-UI core types with helpful error message."""
+ try:
+ from ag_ui.core import (
+ BaseEvent,
+ Event,
+ EventType,
+ MessagesSnapshotEvent,
+ TextMessageChunkEvent,
+ TextMessageContentEvent,
+ TextMessageEndEvent,
+ TextMessageStartEvent,
+ ToolCallArgsEvent,
+ ToolCallChunkEvent,
+ ToolCallEndEvent,
+ ToolCallResultEvent,
+ ToolCallStartEvent,
+ )
+
+ return (
+ BaseEvent,
+ Event,
+ EventType,
+ MessagesSnapshotEvent,
+ TextMessageStartEvent,
+ TextMessageContentEvent,
+ TextMessageEndEvent,
+ TextMessageChunkEvent,
+ ToolCallStartEvent,
+ ToolCallArgsEvent,
+ ToolCallEndEvent,
+ ToolCallResultEvent,
+ ToolCallChunkEvent,
+ )
+ except ImportError as e:
+ raise ImportError(
+ "AG-UI integration requires the ag-ui-protocol package. "
+ "Install it with: pip install ag-ui-protocol"
+ ) from e
+
+
+class AGUIEventCollector:
+ """
+ Collects and reconstructs complete messages from streaming AG-UI events.
+
+ AG-UI uses an event-based streaming protocol where messages are delivered
+ incrementally through Start->Content->End event sequences (triads). This
+ collector accumulates these events and reconstructs complete Ragas messages.
+ It also supports convenience chunk events (TextMessageChunk, ToolCallChunk)
+ for complete messages delivered in a single event.
+
+ Attributes
+ ----------
+ messages : List[Union[HumanMessage, AIMessage, ToolMessage]]
+ Accumulated complete messages ready for Ragas evaluation.
+ include_metadata : bool
+ Whether to include AG-UI metadata in converted messages.
+
+ Example
+ -------
+ >>> collector = AGUIEventCollector(metadata=True)
+ >>> for event in ag_ui_event_stream:
+ ... collector.process_event(event)
+ >>> ragas_messages = collector.get_messages()
+ """
+
+ def __init__(self, metadata: bool = False):
+ """
+ Initialize the event collector.
+
+ Parameters
+ ----------
+ metadata : bool, optional
+ Whether to include AG-UI event metadata in Ragas messages (default: False)
+ """
+ self.include_metadata = metadata
+ self.messages: List[Union[HumanMessage, AIMessage, ToolMessage]] = []
+
+ # State tracking for streaming message reconstruction
+ self._active_text_messages: Dict[str, Dict[str, Any]] = {}
+ self._active_tool_calls: Dict[str, Dict[str, Any]] = {}
+ self._completed_tool_calls: Dict[str, ToolCall] = {}
+
+ # Context tracking for metadata
+ self._current_run_id: Optional[str] = None
+ self._current_thread_id: Optional[str] = None
+ self._current_step: Optional[str] = None
+
+ # Cache AG-UI imports to avoid repeated import calls
+ (
+ self._BaseEvent,
+ self._Event,
+ self._EventType,
+ self._MessagesSnapshotEvent,
+ self._TextMessageStartEvent,
+ self._TextMessageContentEvent,
+ self._TextMessageEndEvent,
+ self._TextMessageChunkEvent,
+ self._ToolCallStartEvent,
+ self._ToolCallArgsEvent,
+ self._ToolCallEndEvent,
+ self._ToolCallResultEvent,
+ self._ToolCallChunkEvent,
+ ) = _import_ag_ui_core()
+
+ def _get_pending_tool_calls(self) -> Optional[List[ToolCall]]:
+ """
+ Retrieve and clear any completed tool calls waiting to be attached to a message.
+
+ Returns
+ -------
+ Optional[List[ToolCall]]
+ List of pending tool calls if any exist, None otherwise.
+ """
+ if self._completed_tool_calls:
+ tool_calls = list(self._completed_tool_calls.values())
+ self._completed_tool_calls.clear()
+ return tool_calls
+ return None
+
+ def process_event(self, event: Any) -> None:
+ """
+ Process a single AG-UI event and update internal state.
+
+ Parameters
+ ----------
+ event : Event
+ An AG-UI protocol event from ag_ui.core
+
+ Notes
+ -----
+ This method handles different event types:
+ - Lifecycle events (RUN_STARTED, STEP_STARTED): Update context
+ - Text message events: Accumulate and reconstruct messages (streaming triads or chunks)
+ - Tool call events: Reconstruct tool calls and results (streaming triads or chunks)
+ - Other events: Silently ignored
+ """
+ # Use cached AG-UI imports
+ EventType = self._EventType
+
+ event_type = event.type
+
+ # Update context from lifecycle events
+ if event_type == EventType.RUN_STARTED:
+ self._current_run_id = event.run_id
+ self._current_thread_id = event.thread_id
+ elif event_type == EventType.STEP_STARTED:
+ self._current_step = event.step_name
+ elif event_type == EventType.STEP_FINISHED:
+ if event.step_name == self._current_step:
+ self._current_step = None
+
+ # Handle text message events
+ elif event_type == EventType.TEXT_MESSAGE_START:
+ self._handle_text_message_start(event)
+ elif event_type == EventType.TEXT_MESSAGE_CONTENT:
+ self._handle_text_message_content(event)
+ elif event_type == EventType.TEXT_MESSAGE_END:
+ self._handle_text_message_end(event)
+ elif event_type == EventType.TEXT_MESSAGE_CHUNK:
+ self._handle_text_message_chunk(event)
+
+ # Handle tool call events
+ elif event_type == EventType.TOOL_CALL_START:
+ self._handle_tool_call_start(event)
+ elif event_type == EventType.TOOL_CALL_ARGS:
+ self._handle_tool_call_args(event)
+ elif event_type == EventType.TOOL_CALL_END:
+ self._handle_tool_call_end(event)
+ elif event_type == EventType.TOOL_CALL_RESULT:
+ self._handle_tool_call_result(event)
+ elif event_type == EventType.TOOL_CALL_CHUNK:
+ self._handle_tool_call_chunk(event)
+
+ # MessagesSnapshot provides complete history
+ elif event_type == EventType.MESSAGES_SNAPSHOT:
+ self._handle_messages_snapshot(event)
+
+ # Ignore lifecycle, state management, and other events
+ else:
+ logger.debug(f"Ignoring AG-UI event type: {event_type}")
+
+ def _handle_text_message_start(self, event: Any) -> None:
+ """Initialize a new streaming text message."""
+ self._active_text_messages[event.message_id] = {
+ "message_id": event.message_id,
+ "role": event.role,
+ "content_chunks": [],
+ "timestamp": event.timestamp,
+ }
+
+ def _handle_text_message_content(self, event: Any) -> None:
+ """Accumulate text content chunk for a streaming message."""
+ if event.message_id in self._active_text_messages:
+ self._active_text_messages[event.message_id]["content_chunks"].append(
+ event.delta
+ )
+ else:
+ logger.warning(
+ f"Received TextMessageContent for unknown message_id: {event.message_id}"
+ )
+
+ def _handle_text_message_end(self, event: Any) -> None:
+ """Finalize a streaming text message and convert to Ragas format."""
+ if event.message_id not in self._active_text_messages:
+ logger.warning(
+ f"Received TextMessageEnd for unknown message_id: {event.message_id}"
+ )
+ return
+
+ msg_data = self._active_text_messages.pop(event.message_id)
+ content = "".join(msg_data["content_chunks"])
+ role = msg_data["role"]
+
+ # Build metadata if requested
+ metadata = None
+ if self.include_metadata:
+ metadata = {
+ "message_id": msg_data["message_id"],
+ "timestamp": msg_data["timestamp"],
+ }
+ if self._current_run_id:
+ metadata["run_id"] = self._current_run_id
+ if self._current_thread_id:
+ metadata["thread_id"] = self._current_thread_id
+ if self._current_step:
+ metadata["step_name"] = self._current_step
+
+ # Convert to appropriate Ragas message type
+ if role == "assistant":
+ # Check if there are completed tool calls for this message
+ # Tool calls are associated by being emitted before the message end
+ tool_calls = self._get_pending_tool_calls()
+
+ self.messages.append(
+ AIMessage(content=content, tool_calls=tool_calls, metadata=metadata)
+ )
+ elif role == "user":
+ self.messages.append(HumanMessage(content=content, metadata=metadata))
+ else:
+ logger.warning(f"Unexpected message role: {role}")
+
+ def _handle_tool_call_start(self, event: Any) -> None:
+ """Initialize a new streaming tool call."""
+ self._active_tool_calls[event.tool_call_id] = {
+ "tool_call_id": event.tool_call_id,
+ "tool_call_name": event.tool_call_name,
+ "parent_message_id": getattr(event, "parent_message_id", None),
+ "args_chunks": [],
+ "timestamp": event.timestamp,
+ }
+
+ def _handle_tool_call_args(self, event: Any) -> None:
+ """Accumulate tool argument chunks."""
+ if event.tool_call_id in self._active_tool_calls:
+ self._active_tool_calls[event.tool_call_id]["args_chunks"].append(
+ event.delta
+ )
+ else:
+ logger.warning(
+ f"Received ToolCallArgs for unknown tool_call_id: {event.tool_call_id}"
+ )
+
+ def _handle_tool_call_end(self, event: Any) -> None:
+ """Finalize a tool call specification (args are complete, but not yet executed)."""
+ if event.tool_call_id not in self._active_tool_calls:
+ logger.warning(
+ f"Received ToolCallEnd for unknown tool_call_id: {event.tool_call_id}"
+ )
+ return
+
+ tool_data = self._active_tool_calls.pop(event.tool_call_id)
+ args_json = "".join(tool_data["args_chunks"])
+
+ # Parse tool arguments
+ try:
+ args = json.loads(args_json) if args_json else {}
+ except json.JSONDecodeError:
+ logger.error(
+ f"Failed to parse tool call arguments for {tool_data['tool_call_name']}: {args_json}"
+ )
+ args = {"raw_args": args_json}
+
+ # Store completed tool call for association with next AI message
+ self._completed_tool_calls[event.tool_call_id] = ToolCall(
+ name=tool_data["tool_call_name"], args=args
+ )
+
+ def _handle_tool_call_result(self, event: Any) -> None:
+ """
+ Convert tool call result to Ragas ToolMessage.
+
+ Also ensures that the most recent AIMessage has tool_calls attached,
+ which is required for MultiTurnSample validation (ToolMessage must be
+ preceded by an AIMessage with tool_calls).
+ """
+ # Find the most recent AIMessage
+ ai_msg_idx = None
+ for i in range(len(self.messages) - 1, -1, -1):
+ if isinstance(self.messages[i], AIMessage):
+ ai_msg_idx = i
+ break
+
+ # Ensure the AIMessage has tool_calls
+ if ai_msg_idx is not None:
+ ai_msg_candidate = self.messages[ai_msg_idx]
+
+ if not isinstance(ai_msg_candidate, AIMessage):
+ logger.warning(
+ "Expected AIMessage when handling tool call result, "
+ f"received {type(ai_msg_candidate).__name__}"
+ )
+ return
+
+ ai_msg = ai_msg_candidate
+
+ # If it doesn't have tool_calls, we need to add them
+ if ai_msg.tool_calls is None or len(ai_msg.tool_calls) == 0:
+ # Check if there are unclaimed tool calls
+ if self._completed_tool_calls:
+ # Attach unclaimed tool calls
+ new_tool_calls = list(self._completed_tool_calls.values())
+ self.messages[ai_msg_idx] = AIMessage(
+ content=ai_msg.content,
+ metadata=ai_msg.metadata,
+ tool_calls=new_tool_calls,
+ )
+ self._completed_tool_calls.clear()
+ else:
+ # No unclaimed tool calls, create a synthetic one
+ # This can happen if tool calls were already attached but lost somehow
+ logger.warning(
+ f"ToolCallResult for {event.tool_call_id} but preceding AIMessage "
+ f"has no tool_calls. Creating synthetic tool call."
+ )
+ synthetic_tool_call = ToolCall(
+ name="unknown_tool", # We don't have the tool name
+ args={},
+ )
+ self.messages[ai_msg_idx] = AIMessage(
+ content=ai_msg.content,
+ metadata=ai_msg.metadata,
+ tool_calls=[synthetic_tool_call],
+ )
+ elif self._completed_tool_calls:
+ # AIMessage already has tool_calls, but there are unclaimed ones
+ # Append them
+ existing_tool_calls = ai_msg.tool_calls or []
+ new_tool_calls = list(self._completed_tool_calls.values())
+ self.messages[ai_msg_idx] = AIMessage(
+ content=ai_msg.content,
+ metadata=ai_msg.metadata,
+ tool_calls=existing_tool_calls + new_tool_calls,
+ )
+ self._completed_tool_calls.clear()
+ else:
+ # No AIMessage found at all - create one
+ logger.warning(
+ "ToolCallResult received but no AIMessage found. Creating synthetic AIMessage."
+ )
+ if self._completed_tool_calls:
+ new_tool_calls = list(self._completed_tool_calls.values())
+ else:
+ new_tool_calls = [ToolCall(name="unknown_tool", args={})]
+
+ self.messages.append(
+ AIMessage(content="", metadata=None, tool_calls=new_tool_calls)
+ )
+ self._completed_tool_calls.clear()
+
+ metadata = None
+ if self.include_metadata:
+ metadata = {
+ "tool_call_id": event.tool_call_id,
+ "message_id": event.message_id,
+ "timestamp": event.timestamp,
+ }
+ if self._current_run_id:
+ metadata["run_id"] = self._current_run_id
+ if self._current_thread_id:
+ metadata["thread_id"] = self._current_thread_id
+
+ self.messages.append(ToolMessage(content=event.content, metadata=metadata))
+
+ def _handle_text_message_chunk(self, event: Any) -> None:
+ """
+ Process a TextMessageChunkEvent - a convenience event combining start, content, and end.
+
+ This handler processes complete messages available at once, bypassing the
+ Start-Content-End streaming sequence.
+ """
+ # Extract message data from chunk event
+ message_id = getattr(event, "message_id", None)
+ role = getattr(event, "role", "assistant")
+ content = getattr(event, "delta", "")
+
+ # Build metadata if requested
+ metadata = None
+ if self.include_metadata:
+ metadata = {
+ "timestamp": event.timestamp,
+ }
+ if message_id:
+ metadata["message_id"] = message_id
+ if self._current_run_id:
+ metadata["run_id"] = self._current_run_id
+ if self._current_thread_id:
+ metadata["thread_id"] = self._current_thread_id
+ if self._current_step:
+ metadata["step_name"] = self._current_step
+
+ # Convert to appropriate Ragas message type
+ if role == "assistant":
+ # Check if there are completed tool calls for this message
+ tool_calls = self._get_pending_tool_calls()
+
+ self.messages.append(
+ AIMessage(content=content, tool_calls=tool_calls, metadata=metadata)
+ )
+ elif role == "user":
+ self.messages.append(HumanMessage(content=content, metadata=metadata))
+ else:
+ logger.warning(f"Unexpected message role in chunk event: {role}")
+
+ def _handle_tool_call_chunk(self, event: Any) -> None:
+ """
+ Process a ToolCallChunkEvent - a convenience event combining tool call specification.
+
+ This handler processes complete tool calls available at once, bypassing the
+ Start-Args-End streaming sequence.
+ """
+ # Extract tool call data from chunk event
+ tool_call_id = getattr(event, "tool_call_id", None)
+ tool_call_name = getattr(event, "tool_call_name", None)
+ args_delta = getattr(event, "delta", None)
+
+ if not tool_call_name:
+ logger.warning("Received ToolCallChunk without tool_call_name")
+ return
+
+ # Parse tool arguments from delta if provided
+ args = {}
+ if args_delta:
+ if isinstance(args_delta, str):
+ try:
+ args = json.loads(args_delta)
+ except json.JSONDecodeError:
+ logger.error(
+ f"Failed to parse tool call arguments for {tool_call_name}: {args_delta}"
+ )
+ args = {"raw_args": args_delta}
+ elif isinstance(args_delta, dict):
+ args = args_delta
+ else:
+ args = {"raw_args": str(args_delta)}
+
+ # Store completed tool call for association with next AI message
+ if tool_call_id:
+ self._completed_tool_calls[tool_call_id] = ToolCall(
+ name=tool_call_name, args=args
+ )
+ else:
+ # If no ID provided, generate one
+ temp_id = f"chunk_{len(self._completed_tool_calls)}"
+ self._completed_tool_calls[temp_id] = ToolCall(
+ name=tool_call_name, args=args
+ )
+
+ def _handle_messages_snapshot(self, event: Any) -> None:
+ """
+ Process a MessagesSnapshotEvent containing complete message history.
+
+ This bypasses streaming reconstruction and directly converts
+ AG-UI Message objects to Ragas format using type-based checking.
+ """
+ # Import AG-UI message types for type checking
+ try:
+ from ag_ui.core import (
+ AssistantMessage,
+ ToolMessage as AGUIToolMessage,
+ UserMessage,
+ )
+ except ImportError as e:
+ raise ImportError(
+ "AG-UI message types are required for snapshot processing. "
+ "Install with: pip install ag-ui-protocol"
+ ) from e
+
+ for msg in event.messages:
+ content = str(getattr(msg, "content", ""))
+
+ metadata = None
+ if self.include_metadata:
+ metadata = {"source": "messages_snapshot"}
+ if hasattr(msg, "id"):
+ metadata["message_id"] = msg.id
+
+ # Type-based checking for AG-UI Message objects
+ if isinstance(msg, AssistantMessage):
+ # Check for tool calls in message
+ tool_calls = None
+ if hasattr(msg, "tool_calls") and msg.tool_calls:
+ tool_calls = []
+ for tc in msg.tool_calls:
+ tc_obj = t.cast(Any, tc)
+ name = t.cast(str, getattr(tc_obj, "name", "unknown_tool"))
+ raw_args = getattr(tc_obj, "args", {})
+ if not isinstance(raw_args, dict):
+ raw_args = {"raw_args": raw_args}
+ tool_calls.append(
+ ToolCall(
+ name=name,
+ args=t.cast(Dict[str, Any], raw_args),
+ )
+ )
+ self.messages.append(
+ AIMessage(content=content, tool_calls=tool_calls, metadata=metadata)
+ )
+ elif isinstance(msg, UserMessage):
+ self.messages.append(HumanMessage(content=content, metadata=metadata))
+ elif isinstance(msg, AGUIToolMessage):
+ self.messages.append(ToolMessage(content=content, metadata=metadata))
+ else:
+ logger.debug(
+ f"Skipping message with unknown type: {type(msg).__name__}"
+ )
+
+ def get_messages(self) -> List[Union[HumanMessage, AIMessage, ToolMessage]]:
+ """
+ Retrieve all accumulated Ragas messages.
+
+ Returns
+ -------
+ List[Union[HumanMessage, AIMessage, ToolMessage]]
+ Complete list of Ragas messages reconstructed from AG-UI events.
+
+ Notes
+ -----
+ This returns a copy of the accumulated messages. The collector's
+ internal state is not cleared, so calling this multiple times
+ returns the same messages.
+ """
+ return self.messages.copy()
+
+ def clear(self) -> None:
+ """
+ Clear all accumulated messages and reset internal state.
+
+ Useful for reusing the same collector instance for multiple
+ conversation sessions.
+ """
+ self.messages.clear()
+ self._active_text_messages.clear()
+ self._active_tool_calls.clear()
+ self._completed_tool_calls.clear()
+ self._current_run_id = None
+ self._current_thread_id = None
+ self._current_step = None
+
+
+def convert_to_ragas_messages(
+ events: List[Any],
+ metadata: bool = False,
+) -> List[Union[HumanMessage, AIMessage, ToolMessage]]:
+ """
+ Convert a sequence of AG-UI protocol events to Ragas message format.
+
+ This function processes AG-UI events and reconstructs complete messages
+ from streaming event sequences (Start->Content->End patterns). It handles
+ text messages, tool calls, and filters out non-message events like
+ lifecycle and state management events.
+
+ Parameters
+ ----------
+ events : List[Event]
+ List of AG-UI protocol events from ag_ui.core. Can contain any mix
+ of event types - non-message events are automatically filtered out.
+ metadata : bool, optional
+ Whether to include AG-UI event metadata (run_id, thread_id, timestamps)
+ in the converted Ragas messages (default: False).
+
+ Returns
+ -------
+ List[Union[HumanMessage, AIMessage, ToolMessage]]
+ List of Ragas messages ready for evaluation. Messages preserve
+ conversation order and tool call associations.
+
+ Raises
+ ------
+ ImportError
+ If the ag-ui-protocol package is not installed.
+
+ Examples
+ --------
+ Convert AG-UI events from an agent run::
+
+ >>> from ragas.integrations.ag_ui import convert_to_ragas_messages
+ >>> from ag_ui.core import (
+ ... RunStartedEvent, TextMessageStartEvent,
+ ... TextMessageContentEvent, TextMessageEndEvent
+ ... )
+ >>>
+ >>> events = [
+ ... RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ ... TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ ... TextMessageContentEvent(message_id="msg-1", delta="Hello"),
+ ... TextMessageContentEvent(message_id="msg-1", delta=" world"),
+ ... TextMessageEndEvent(message_id="msg-1"),
+ ... ]
+ >>> messages = convert_to_ragas_messages(events, metadata=True)
+ >>> messages[0].content
+ 'Hello world'
+
+ Process events with tool calls::
+
+ >>> events = [
+ ... TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ ... TextMessageContentEvent(message_id="msg-1", delta="Let me check"),
+ ... TextMessageEndEvent(message_id="msg-1"),
+ ... ToolCallStartEvent(
+ ... tool_call_id="tc-1",
+ ... tool_call_name="get_weather",
+ ... parent_message_id="msg-1"
+ ... ),
+ ... ToolCallArgsEvent(tool_call_id="tc-1", delta='{"city": "SF"}'),
+ ... ToolCallEndEvent(tool_call_id="tc-1"),
+ ... ToolCallResultEvent(
+ ... tool_call_id="tc-1",
+ ... message_id="result-1",
+ ... content="Sunny, 72°F"
+ ... ),
+ ... ]
+ >>> messages = convert_to_ragas_messages(events)
+ >>> len(messages)
+ 2 # AI message + Tool result message
+
+ Notes
+ -----
+ - Streaming events (Start->Content->End) are automatically reconstructed
+ - Tool calls are associated with the preceding AI message
+ - Non-message events (lifecycle, state) are silently filtered
+ - Incomplete event sequences are logged as warnings
+ - AG-UI metadata can be preserved in message.metadata when metadata=True
+
+ See Also
+ --------
+ convert_messages_snapshot : Convert complete message history from snapshot
+ AGUIEventCollector : Lower-level API for streaming event collection
+ """
+ collector = AGUIEventCollector(metadata=metadata)
+
+ for event in events:
+ collector.process_event(event)
+
+ return collector.get_messages()
+
+
+def convert_messages_snapshot(
+ snapshot_event: Any,
+ metadata: bool = False,
+) -> List[Union[HumanMessage, AIMessage, ToolMessage]]:
+ """
+ Convert an AG-UI MessagesSnapshotEvent to Ragas message format.
+
+ MessagesSnapshotEvent provides a complete conversation history in a
+ single event, bypassing the need to reconstruct from streaming events.
+ This is more efficient when the complete history is already available.
+
+ Parameters
+ ----------
+ snapshot_event : MessagesSnapshotEvent
+ AG-UI event containing complete message history array.
+ metadata : bool, optional
+ Whether to include metadata in converted messages (default: False).
+
+ Returns
+ -------
+ List[Union[HumanMessage, AIMessage, ToolMessage]]
+ List of Ragas messages from the snapshot.
+
+ Raises
+ ------
+ ImportError
+ If the ag-ui-protocol package is not installed.
+
+ Examples
+ --------
+ >>> from ragas.integrations.ag_ui import convert_messages_snapshot
+ >>> from ag_ui.core import MessagesSnapshotEvent
+ >>>
+ >>> snapshot = MessagesSnapshotEvent(messages=[
+ ... {"role": "user", "content": "What's the weather?"},
+ ... {"role": "assistant", "content": "Let me check for you."},
+ ... ])
+ >>> messages = convert_messages_snapshot(snapshot)
+ >>> len(messages)
+ 2
+
+ Notes
+ -----
+ This is the preferred method when working with complete conversation
+ history. It's faster than processing streaming events and avoids the
+ complexity of event sequence reconstruction.
+
+ See Also
+ --------
+ convert_to_ragas_messages : Convert streaming event sequences
+ """
+ collector = AGUIEventCollector(metadata=metadata)
+
+ # Type check using cached import from collector
+ if not isinstance(snapshot_event, collector._MessagesSnapshotEvent):
+ raise TypeError(
+ f"Expected MessagesSnapshotEvent, got {type(snapshot_event).__name__}"
+ )
+ collector._handle_messages_snapshot(snapshot_event)
+ return collector.get_messages()
+
+
+def _convert_ragas_messages_to_ag_ui(
+ messages: List[Union[HumanMessage, AIMessage, ToolMessage]],
+) -> List[Any]:
+ """
+ Convert Ragas messages to AG-UI message format.
+
+ This function transforms a list of Ragas message objects into AG-UI protocol
+ message format for sending to AG-UI endpoints. It handles conversion of:
+ - HumanMessage → UserMessage
+ - AIMessage → AssistantMessage (with tool_calls if present)
+ - ToolMessage → ToolMessage (AG-UI format)
+
+ Parameters
+ ----------
+ messages : List[Union[HumanMessage, AIMessage, ToolMessage]]
+ List of Ragas messages from MultiTurnSample.user_input
+
+ Returns
+ -------
+ List[Any]
+ List of AG-UI protocol messages (UserMessage, AssistantMessage, ToolMessage)
+
+ Examples
+ --------
+ >>> from ragas.messages import HumanMessage, AIMessage, ToolCall
+ >>> messages = [
+ ... HumanMessage(content="What's the weather?"),
+ ... AIMessage(content="Let me check", tool_calls=[
+ ... ToolCall(name="get-weather", args={"location": "SF"})
+ ... ])
+ ... ]
+ >>> ag_ui_messages = _convert_ragas_messages_to_ag_ui(messages)
+ """
+ try:
+ from ag_ui.core import (
+ AssistantMessage,
+ FunctionCall,
+ ToolCall as AGUIToolCall,
+ UserMessage,
+ )
+ except ImportError as e:
+ raise ImportError(
+ "ag-ui-protocol package is required for AG-UI integration. "
+ "Install it with: pip install ag-ui-protocol"
+ ) from e
+
+ ag_ui_messages = []
+
+ for idx, msg in enumerate(messages):
+ msg_id = str(idx + 1)
+
+ if isinstance(msg, HumanMessage):
+ ag_ui_messages.append(UserMessage(id=msg_id, content=msg.content))
+
+ elif isinstance(msg, AIMessage):
+ # Convert Ragas ToolCall to AG-UI ToolCall format
+ tool_calls = None
+ if msg.tool_calls:
+ tool_calls = [
+ AGUIToolCall(
+ id=f"tc-{idx}-{tc_idx}",
+ function=FunctionCall(
+ name=tc.name,
+ arguments=json.dumps(tc.args)
+ if isinstance(tc.args, dict)
+ else tc.args,
+ ),
+ )
+ for tc_idx, tc in enumerate(msg.tool_calls)
+ ]
+
+ ag_ui_messages.append(
+ AssistantMessage(
+ id=msg_id, content=msg.content or "", tool_calls=tool_calls
+ )
+ )
+
+ elif isinstance(msg, ToolMessage):
+ # Note: AG-UI ToolMessage requires toolCallId which Ragas ToolMessage doesn't have.
+ # ToolMessage is typically sent FROM agent, not TO agent in initial conversation.
+ # For now, we skip ToolMessage in the conversion.
+ logger.warning(
+ "Skipping ToolMessage in AG-UI conversion - ToolMessage is typically "
+ "sent from agent, not to agent"
+ )
+ continue
+
+ return ag_ui_messages
+
+
+async def _call_ag_ui_endpoint(
+ endpoint_url: str,
+ user_input: Union[str, List[Union[HumanMessage, AIMessage, ToolMessage]]],
+ thread_id: Optional[str] = None,
+ agent_config: Optional[Dict[str, Any]] = None,
+ timeout: float = 60.0,
+ extra_headers: Optional[Dict[str, str]] = None,
+) -> List[Any]:
+ """
+ Call an AG-UI FastAPI endpoint and collect streaming events.
+
+ Makes an HTTP POST request to an AG-UI compatible FastAPI endpoint
+ and parses the Server-Sent Events (SSE) stream to collect all events.
+
+ Parameters
+ ----------
+ endpoint_url : str
+ The URL of the AG-UI FastAPI endpoint (e.g., "http://localhost:8000/agent").
+ user_input : Union[str, List[Union[HumanMessage, AIMessage, ToolMessage]]]
+ The user message/query to send to the agent. Can be either:
+ - A string for single-turn queries
+ - A list of Ragas messages for multi-turn conversations
+ thread_id : str, optional
+ Optional thread ID for conversation continuity.
+ agent_config : dict, optional
+ Optional agent configuration parameters.
+ timeout : float, optional
+ Request timeout in seconds (default: 60.0).
+ extra_headers : dict, optional
+ Optional extra HTTP headers to include in the request (default: None).
+ These will be merged with the default "Accept: text/event-stream" header.
+
+ Returns
+ -------
+ List[Event]
+ List of AG-UI events collected from the SSE stream.
+
+ Raises
+ ------
+ ImportError
+ If httpx is not installed.
+ httpx.HTTPError
+ If the HTTP request fails.
+
+ Notes
+ -----
+ This function expects the endpoint to return Server-Sent Events (SSE)
+ with content type "text/event-stream". Each event should be in the format:
+
+ data: {"type": "...", ...}\\n\\n
+
+ The function will parse the SSE stream and deserialize each event
+ using AG-UI's RunAgentInput model.
+ """
+ try:
+ import httpx
+ except ImportError as e:
+ raise ImportError(
+ "AG-UI FastAPI integration requires httpx. "
+ "Install it with: pip install httpx"
+ ) from e
+
+ # Import AG-UI types
+ try:
+ from ag_ui.core import Event, RunAgentInput, UserMessage
+ from pydantic import TypeAdapter
+ except ImportError as e:
+ raise ImportError(
+ "AG-UI integration requires the ag-ui-protocol package. "
+ "Install it with: pip install ag-ui-protocol"
+ ) from e
+
+ # Create TypeAdapter for Event discriminated union
+ # This properly handles the union of all event types based on the 'type' discriminator
+ event_adapter = TypeAdapter(Event)
+
+ # Convert user_input to AG-UI messages
+ ag_ui_messages: List[Any]
+ if isinstance(user_input, str):
+ # Single-turn: simple string input
+ ag_ui_messages = t.cast(List[Any], [UserMessage(id="1", content=user_input)])
+ else:
+ # Multi-turn: list of Ragas messages
+ ag_ui_messages = _convert_ragas_messages_to_ag_ui(user_input)
+
+ # Prepare request payload
+ payload = RunAgentInput(
+ thread_id=thread_id
+ or f"thread_{uuid.uuid4()}", # Generate thread ID if not provided
+ run_id=f"run_{uuid.uuid4()}", # Generate a unique run ID
+ messages=t.cast(Any, ag_ui_messages),
+ state={},
+ tools=[],
+ context=[],
+ forwarded_props={},
+ )
+
+ # Collect events from SSE stream
+ events: List[Any] = []
+
+ # Merge default headers with extra headers
+ headers = {"Accept": "text/event-stream"}
+ if extra_headers:
+ headers.update(extra_headers)
+
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
+ async with client.stream(
+ "POST",
+ endpoint_url,
+ json=payload.model_dump(exclude_none=True),
+ headers=headers,
+ ) as response:
+ response.raise_for_status()
+
+ # Parse SSE stream line by line
+ async for line in response.aiter_lines():
+ line = line.strip()
+
+ # SSE format: "data: {...}"
+ if line.startswith("data: "):
+ json_data = line[6:] # Remove "data: " prefix
+
+ try:
+ # Parse JSON and convert to Event using TypeAdapter
+ # TypeAdapter properly handles discriminated unions based on 'type' field
+ event_dict = json.loads(json_data)
+ event = event_adapter.validate_python(event_dict)
+ events.append(event)
+ except (json.JSONDecodeError, ValueError) as e:
+ logger.warning(f"Failed to parse SSE event: {e}")
+ continue
+
+ return events
+
+
+async def evaluate_ag_ui_agent(
+ endpoint_url: str,
+ dataset: EvaluationDataset,
+ metrics: List["Metric"],
+ metadata: bool = False,
+ run_config: Optional[RunConfig] = None,
+ batch_size: Optional[int] = None,
+ raise_exceptions: bool = False,
+ show_progress: bool = True,
+ timeout: float = 60.0,
+ evaluator_llm: Optional[Any] = None,
+ extra_headers: Optional[Dict[str, str]] = None,
+) -> EvaluationResult:
+ """
+ Evaluate an AG-UI agent by calling its FastAPI endpoint with test queries.
+
+ This function runs a batch evaluation by:
+ 1. Calling the AG-UI FastAPI endpoint for each query in the dataset
+ 2. Collecting streaming AG-UI events from each response
+ 3. Converting events to Ragas message format
+ 4. Evaluating with specified metrics
+
+ Supports both single-turn and multi-turn evaluations:
+ - Single-turn: Response extracted to sample.response field
+ - Multi-turn: Agent responses appended to sample.user_input conversation
+
+ Parameters
+ ----------
+ endpoint_url : str
+ URL of the AG-UI FastAPI endpoint (e.g., "http://localhost:8000/agent").
+ dataset : EvaluationDataset
+ Dataset containing test queries. Can contain either:
+ - SingleTurnSample: user_input as string
+ - MultiTurnSample: user_input as list of messages
+ metrics : List[Metric]
+ List of Ragas metrics to evaluate (e.g., AspectCritic, ToolCallF1).
+ metadata : bool, optional
+ Whether to include AG-UI metadata in converted messages (default: False).
+ run_config : RunConfig, optional
+ Configuration for the evaluation run.
+ batch_size : int, optional
+ Number of queries to process in parallel (default: None = auto).
+ raise_exceptions : bool, optional
+ Whether to raise exceptions or log warnings (default: False).
+ show_progress : bool, optional
+ Whether to show progress bar (default: True).
+ timeout : float, optional
+ HTTP request timeout in seconds (default: 60.0).
+ evaluator_llm : Any, optional
+ Optional LLM to use for evaluation metrics (default: None).
+ extra_headers : dict, optional
+ Optional extra HTTP headers to include in requests to the agent endpoint (default: None).
+ These will be merged with the default "Accept: text/event-stream" header.
+
+ Returns
+ -------
+ EvaluationResult
+ Results containing metric scores for the dataset.
+
+ Raises
+ ------
+ ImportError
+ If required packages (httpx, ag-ui-protocol) are not installed.
+ ValueError
+ If dataset is not of type EvaluationDataset.
+
+ Examples
+ --------
+ Evaluate an AG-UI agent endpoint with standard metrics::
+
+ >>> from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+ >>> from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+ >>> from ragas.metrics import AspectCritic, Faithfulness
+ >>>
+ >>> dataset = EvaluationDataset(samples=[
+ ... SingleTurnSample(
+ ... user_input="What's the weather in San Francisco?",
+ ... reference="Use the weather API to check SF weather"
+ ... )
+ ... ])
+ >>>
+ >>> result = await evaluate_ag_ui_agent(
+ ... endpoint_url="http://localhost:8000/agent",
+ ... dataset=dataset,
+ ... metrics=[AspectCritic(), Faithfulness()]
+ ... )
+
+ With AG-UI metadata included::
+
+ >>> result = await evaluate_ag_ui_agent(
+ ... endpoint_url="http://localhost:8000/agent",
+ ... dataset=dataset,
+ ... metrics=[AspectCritic()],
+ ... metadata=True # Include run_id, thread_id, etc.
+ ... )
+
+ Multi-turn evaluation with tool call metrics::
+
+ >>> from ragas.dataset_schema import MultiTurnSample
+ >>> from ragas.messages import HumanMessage, ToolCall
+ >>> from ragas.metrics import ToolCallF1
+ >>>
+ >>> multi_dataset = EvaluationDataset(samples=[
+ ... MultiTurnSample(
+ ... user_input=[
+ ... HumanMessage(content="What's the weather in SF?")
+ ... ],
+ ... reference_tool_calls=[
+ ... ToolCall(name="get-weather", args={"location": "SF"})
+ ... ]
+ ... )
+ ... ])
+ >>>
+ >>> result = await evaluate_ag_ui_agent(
+ ... endpoint_url="http://localhost:8000/agent",
+ ... dataset=multi_dataset,
+ ... metrics=[ToolCallF1()]
+ ... )
+
+ Notes
+ -----
+ - The endpoint must return Server-Sent Events (SSE) with AG-UI protocol events
+ - Each query is sent as a separate HTTP request with RunAgentInput payload
+ - Queries are executed in parallel using Ragas Executor
+ - Failed queries are logged and recorded as NaN in results
+ - **Single-turn**: Response text extracted to sample.response field
+ - **Multi-turn**: Agent responses (AIMessage, ToolMessage) appended to sample.user_input
+
+ See Also
+ --------
+ convert_to_ragas_messages : Convert AG-UI events to Ragas messages
+ _call_ag_ui_endpoint : HTTP client helper for calling endpoints
+ """
+ # Validate dataset
+ if dataset is None or not isinstance(dataset, EvaluationDataset):
+ raise ValueError("Please provide a dataset that is of type EvaluationDataset")
+
+ # Support both single-turn and multi-turn evaluations
+ is_multi_turn = dataset.is_multi_turn()
+ if is_multi_turn:
+ samples = t.cast(List[MultiTurnSample], dataset.samples)
+ else:
+ samples = t.cast(List[SingleTurnSample], dataset.samples)
+
+ # Create executor for parallel HTTP calls
+ executor = Executor(
+ desc="Calling AG-UI Agent",
+ keep_progress_bar=True,
+ show_progress=show_progress,
+ raise_exceptions=raise_exceptions,
+ run_config=run_config,
+ batch_size=batch_size,
+ )
+
+ # Submit HTTP calls for all queries
+ queries = [sample.user_input for sample in samples]
+ for i, query in enumerate(queries):
+ executor.submit(
+ _call_ag_ui_endpoint,
+ endpoint_url=endpoint_url,
+ user_input=query,
+ thread_id=f"thread-eval-{i}",
+ agent_config=None,
+ timeout=timeout,
+ extra_headers=extra_headers,
+ )
+
+ # Collect results and convert to messages
+ results = executor.results()
+
+ if is_multi_turn:
+ # Multi-turn: append agent responses to conversation
+ for i, result in enumerate(results):
+ # Handle failed jobs which are recorded as NaN in the executor
+ if isinstance(result, float) and math.isnan(result):
+ logger.warning(f"AG-UI agent call failed for query {i}: '{queries[i]}'")
+ continue
+
+ # Convert AG-UI events to Ragas messages
+ events = t.cast(List[Any], result)
+ try:
+ logger.info(f"Processing query {i}, received {len(events)} events")
+ messages = convert_to_ragas_messages(events, metadata=metadata)
+ logger.info(f"Converted to {len(messages)} messages")
+
+ # Append agent's response messages to the conversation
+ # Filter out only new messages from agent (AIMessage and ToolMessage)
+ new_messages = [
+ msg for msg in messages if isinstance(msg, (AIMessage, ToolMessage))
+ ]
+
+ # Update the sample's user_input with complete conversation
+ sample = t.cast(MultiTurnSample, samples[i])
+ sample.user_input = sample.user_input + new_messages
+
+ logger.info(
+ f"Query {i} - Appended {len(new_messages)} messages to conversation"
+ )
+
+ except Exception as e:
+ logger.warning(
+ f"Failed to convert events for query {i}: {e}", exc_info=True
+ )
+ else:
+ # Single-turn: extract response and contexts
+ responses: List[Optional[str]] = []
+ retrieved_contexts: List[Optional[List[str]]] = []
+
+ for i, result in enumerate(results):
+ # Handle failed jobs which are recorded as NaN in the executor
+ if isinstance(result, float) and math.isnan(result):
+ responses.append(None)
+ retrieved_contexts.append(None)
+ logger.warning(f"AG-UI agent call failed for query {i}: '{queries[i]}'")
+ continue
+
+ # Convert AG-UI events to Ragas messages
+ events = t.cast(List[Any], result)
+ try:
+ logger.info(f"Processing query {i}, received {len(events)} events")
+ messages = convert_to_ragas_messages(events, metadata=metadata)
+ logger.info(f"Converted to {len(messages)} messages")
+
+ # Extract response text from AI messages
+ response_text = ""
+ context_list: List[str] = []
+
+ for msg in messages:
+ if isinstance(msg, AIMessage) and msg.content:
+ response_text += msg.content
+ logger.debug(
+ f"Found AI message with content: {msg.content[:100]}..."
+ )
+ # Tool results could contain retrieved context
+ elif isinstance(msg, ToolMessage) and msg.content:
+ context_list.append(msg.content)
+ logger.debug(
+ f"Found tool message with content: {msg.content[:100]}..."
+ )
+
+ logger.info(
+ f"Query {i} - Response length: {len(response_text)}, Contexts: {len(context_list)}"
+ )
+ responses.append(response_text or None)
+ retrieved_contexts.append(context_list if context_list else None)
+
+ except Exception as e:
+ logger.warning(
+ f"Failed to convert events for query {i}: {e}", exc_info=True
+ )
+ responses.append(None)
+ retrieved_contexts.append(None)
+
+ # Update samples in place with responses and retrieved_contexts
+ # This ensures the dataset includes all fields needed for evaluation
+ for i, sample in enumerate(samples):
+ single_sample = t.cast(SingleTurnSample, sample)
+ single_sample.response = responses[i] if responses[i] is not None else ""
+ single_sample.retrieved_contexts = (
+ retrieved_contexts[i] if retrieved_contexts[i] is not None else []
+ )
+
+ # Run evaluation with metrics
+ evaluation_result = ragas_evaluate(
+ dataset=dataset,
+ metrics=metrics,
+ raise_exceptions=raise_exceptions,
+ show_progress=show_progress,
+ run_config=run_config or RunConfig(),
+ return_executor=False,
+ llm=evaluator_llm,
+ )
+
+ # Type assertion since return_executor=False guarantees EvaluationResult
+ return t.cast(EvaluationResult, evaluation_result)
diff --git a/tests/unit/integrations/test_ag_ui.py b/tests/unit/integrations/test_ag_ui.py
new file mode 100644
index 0000000000..b2704d24fd
--- /dev/null
+++ b/tests/unit/integrations/test_ag_ui.py
@@ -0,0 +1,1231 @@
+"""Tests for AG-UI integration."""
+
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from ragas.messages import AIMessage, HumanMessage, ToolMessage
+
+# Check if ag_ui is available
+try:
+ from ag_ui.core import (
+ AssistantMessage,
+ EventType,
+ MessagesSnapshotEvent,
+ RunFinishedEvent,
+ RunStartedEvent,
+ StepFinishedEvent,
+ StepStartedEvent,
+ TextMessageChunkEvent,
+ TextMessageContentEvent,
+ TextMessageEndEvent,
+ TextMessageStartEvent,
+ ToolCallArgsEvent,
+ ToolCallChunkEvent,
+ ToolCallEndEvent,
+ ToolCallResultEvent,
+ ToolCallStartEvent,
+ UserMessage,
+ )
+
+ AG_UI_AVAILABLE = True
+except ImportError:
+ AG_UI_AVAILABLE = False
+
+pytestmark = pytest.mark.skipif(
+ not AG_UI_AVAILABLE, reason="ag-ui-protocol not installed"
+)
+
+
+# Mock event class for non-message events
+class MockEvent:
+ """Simple mock for non-message events like STATE_SNAPSHOT."""
+
+ def __init__(self, event_type: str, **kwargs):
+ self.type = event_type
+ self.timestamp = kwargs.get("timestamp", 1234567890)
+ for key, value in kwargs.items():
+ setattr(self, key, value)
+
+
+@pytest.fixture
+def basic_text_message_events():
+ """Create a basic streaming text message event sequence."""
+ return [
+ RunStartedEvent(run_id="run-123", thread_id="thread-456"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Hello"),
+ TextMessageContentEvent(message_id="msg-1", delta=" world"),
+ TextMessageEndEvent(message_id="msg-1"),
+ TextMessageStartEvent(message_id="msg-2", role="assistant"),
+ TextMessageContentEvent(message_id="msg-2", delta="Hi"),
+ TextMessageContentEvent(message_id="msg-2", delta=" there!"),
+ TextMessageEndEvent(message_id="msg-2"),
+ ]
+
+
+@pytest.fixture
+def tool_call_events():
+ """Create events with tool calls."""
+ return [
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ToolCallStartEvent(
+ tool_call_id="tc-1", tool_call_name="get_weather", parent_message_id="msg-1"
+ ),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"city": "San Francisco"'),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta=', "units": "fahrenheit"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ ToolCallResultEvent(
+ tool_call_id="tc-1",
+ message_id="result-1",
+ content="Temperature: 72°F, Conditions: Sunny",
+ ),
+ TextMessageStartEvent(message_id="msg-2", role="assistant"),
+ TextMessageContentEvent(
+ message_id="msg-2", delta="It's sunny and 72°F in San Francisco"
+ ),
+ TextMessageEndEvent(message_id="msg-2"),
+ ]
+
+
+def test_import_error_without_ag_ui_protocol():
+ """Test that appropriate error is raised without ag-ui-protocol package."""
+ from ragas.integrations.ag_ui import _import_ag_ui_core
+
+ # Mock the actual ag_ui import
+ with patch.dict("sys.modules", {"ag_ui": None, "ag_ui.core": None}):
+ with pytest.raises(
+ ImportError, match="AG-UI integration requires the ag-ui-protocol package"
+ ):
+ _import_ag_ui_core()
+
+
+def test_basic_text_message_conversion(basic_text_message_events):
+ """Test converting basic streaming text messages."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages(basic_text_message_events)
+
+ assert len(messages) == 2
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].content == "Hello world"
+ assert isinstance(messages[1], AIMessage)
+ assert messages[1].content == "Hi there!"
+
+
+def test_message_with_metadata(basic_text_message_events):
+ """Test that metadata is included when requested."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages(basic_text_message_events, metadata=True)
+
+ assert len(messages) == 2
+ assert messages[0].metadata is not None
+ assert "message_id" in messages[0].metadata
+ assert messages[0].metadata["message_id"] == "msg-1"
+ assert "run_id" in messages[0].metadata
+ assert messages[0].metadata["run_id"] == "run-123"
+ assert "thread_id" in messages[0].metadata
+ assert messages[0].metadata["thread_id"] == "thread-456"
+
+
+def test_message_without_metadata(basic_text_message_events):
+ """Test that metadata is excluded when not requested."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages(basic_text_message_events, metadata=False)
+
+ assert len(messages) == 2
+ assert messages[0].metadata is None
+ assert messages[1].metadata is None
+
+
+def test_tool_call_conversion(tool_call_events):
+ """Test converting tool calls with arguments and results."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages(tool_call_events)
+
+ # Should have: AI message, Tool result, AI message
+ assert len(messages) == 3
+
+ # First message: AI initiating tool call
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].content == "Let me check the weather"
+
+ # Second message: Tool result
+ assert isinstance(messages[1], ToolMessage)
+ assert "72°F" in messages[1].content
+
+ # Third message: AI with response
+ assert isinstance(messages[2], AIMessage)
+ assert "sunny" in messages[2].content.lower()
+
+
+def test_tool_call_with_metadata(tool_call_events):
+ """Test that tool call metadata is preserved."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages(tool_call_events, metadata=True)
+
+ tool_message = next(msg for msg in messages if isinstance(msg, ToolMessage))
+ assert tool_message.metadata is not None
+ assert "tool_call_id" in tool_message.metadata
+ assert tool_message.metadata["tool_call_id"] == "tc-1"
+
+
+def test_step_context_in_metadata():
+ """Test that step context is included in metadata."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ StepStartedEvent(step_name="analyze_query"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Processing..."),
+ TextMessageEndEvent(message_id="msg-1"),
+ StepFinishedEvent(step_name="analyze_query"),
+ ]
+
+ messages = convert_to_ragas_messages(events, metadata=True)
+
+ assert len(messages) == 1
+ assert "step_name" in messages[0].metadata
+ assert messages[0].metadata["step_name"] == "analyze_query"
+
+
+def test_messages_snapshot_conversion():
+ """Test converting MessagesSnapshotEvent."""
+ from ragas.integrations.ag_ui import convert_messages_snapshot
+
+ snapshot = MessagesSnapshotEvent(
+ messages=[
+ UserMessage(id="msg-1", content="What's 2+2?"),
+ AssistantMessage(id="msg-2", content="4"),
+ UserMessage(id="msg-3", content="Thanks!"),
+ ]
+ )
+
+ messages = convert_messages_snapshot(snapshot)
+
+ assert len(messages) == 3
+ assert isinstance(messages[0], HumanMessage)
+ assert messages[0].content == "What's 2+2?"
+ assert isinstance(messages[1], AIMessage)
+ assert messages[1].content == "4"
+ assert isinstance(messages[2], HumanMessage)
+ assert messages[2].content == "Thanks!"
+
+
+def test_snapshot_with_metadata():
+ """Test that snapshot conversion includes metadata when requested."""
+ from ragas.integrations.ag_ui import convert_messages_snapshot
+
+ snapshot = MessagesSnapshotEvent(
+ messages=[UserMessage(id="msg-1", content="Hello")]
+ )
+
+ messages = convert_messages_snapshot(snapshot, metadata=True)
+
+ assert messages[0].metadata is not None
+ assert "message_id" in messages[0].metadata
+ assert messages[0].metadata["message_id"] == "msg-1"
+
+
+def test_non_message_events_filtered():
+ """Test that non-message events are silently filtered."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ MockEvent(EventType.STATE_SNAPSHOT, snapshot={"key": "value"}),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Hello"),
+ TextMessageEndEvent(message_id="msg-1"),
+ MockEvent("RUN_FINISHED", result="success"),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ # Should only have the text message, other events filtered
+ assert len(messages) == 1
+ assert messages[0].content == "Hello"
+
+
+def test_incomplete_message_stream(caplog):
+ """Test handling of incomplete message streams."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ # Message with content but no end event
+ events = [
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Hello"),
+ # Missing TextMessageEndEvent
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ # Should not create message without end event
+ assert len(messages) == 0
+
+
+def test_orphaned_content_event(caplog):
+ """Test handling of content event without corresponding start."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ # Content event without start
+ TextMessageContentEvent(message_id="msg-unknown", delta="Orphaned content"),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ assert len(messages) == 0
+
+
+def test_tool_call_argument_parsing_error(caplog):
+ """Test handling of invalid JSON in tool arguments."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Using tool"),
+ ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="broken_tool"),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta="{invalid json"),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ TextMessageEndEvent(message_id="msg-1"), # Message ends AFTER tool call
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ # Should still create message with tool call containing raw_args
+ assert len(messages) == 1
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].tool_calls is not None
+ assert len(messages[0].tool_calls) == 1
+ assert messages[0].tool_calls[0].name == "broken_tool"
+ # Invalid JSON should be stored in raw_args
+ assert "raw_args" in messages[0].tool_calls[0].args
+ assert messages[0].tool_calls[0].args["raw_args"] == "{invalid json"
+
+
+def test_tool_call_result_retroactive_attachment():
+ """
+ Tests that ToolCallResultEvent correctly finds the previous AIMessage
+ and attaches the tool call specification if it was missing.
+
+ This can happen when ToolCallEndEvent arrives before TextMessageEndEvent,
+ causing tool_calls to be cleared from _completed_tool_calls before the
+ AIMessage is created.
+ """
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ # Scenario: TextMessageEnd arrives AFTER ToolCallEnd, so the tool call
+ # is already cleared from _completed_tool_calls when the AIMessage is created
+ events = [
+ # AI message starts
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Let me check that"),
+ # Tool call happens
+ ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="search_tool"),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"query": "weather"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ # Message ends AFTER tool call ends
+ TextMessageEndEvent(message_id="msg-1"),
+ # Tool result arrives
+ ToolCallResultEvent(
+ tool_call_id="tc-1", message_id="result-1", content="Sunny, 75F"
+ ),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ # Should have AI message with tool call, then Tool message
+ assert len(messages) == 2
+ assert isinstance(messages[0], AIMessage)
+ assert isinstance(messages[1], ToolMessage)
+
+ # The AIMessage should have the tool_calls attached (either from normal flow
+ # or retroactively attached by _handle_tool_call_result)
+ assert messages[0].tool_calls is not None
+ assert len(messages[0].tool_calls) >= 1
+ # At least one tool call should be present (could be synthetic if needed)
+ assert any(
+ tc.name in ["search_tool", "unknown_tool"] for tc in messages[0].tool_calls
+ )
+
+ # Tool message should contain the result
+ assert messages[1].content == "Sunny, 75F"
+
+
+def test_event_collector_reuse(basic_text_message_events):
+ """Test that AGUIEventCollector can be cleared and reused."""
+ from ragas.integrations.ag_ui import AGUIEventCollector
+
+ collector = AGUIEventCollector()
+
+ # Process first batch
+ for event in basic_text_message_events[:5]: # First message
+ collector.process_event(event)
+
+ messages1 = collector.get_messages()
+ assert len(messages1) == 1
+
+ # Clear and process second batch
+ collector.clear()
+ for event in basic_text_message_events[5:]: # Second message
+ collector.process_event(event)
+
+ messages2 = collector.get_messages()
+ assert len(messages2) == 1
+ assert messages2[0].content != messages1[0].content
+
+
+def test_multiple_tool_calls_in_sequence():
+ """Test handling multiple tool calls in sequence."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="tool1"),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"param": "value1"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ ToolCallStartEvent(tool_call_id="tc-2", tool_call_name="tool2"),
+ ToolCallArgsEvent(tool_call_id="tc-2", delta='{"param": "value2"}'),
+ ToolCallEndEvent(tool_call_id="tc-2"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Done"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ # Should create AI message with both tool calls
+ assert len(messages) == 1
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].tool_calls is not None
+ assert len(messages[0].tool_calls) == 2
+ assert messages[0].tool_calls[0].name == "tool1"
+ assert messages[0].tool_calls[1].name == "tool2"
+
+
+def test_empty_event_list():
+ """Test handling of empty event list."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ messages = convert_to_ragas_messages([])
+ assert len(messages) == 0
+
+
+def test_wrong_snapshot_type_error():
+ """Test that convert_messages_snapshot validates input type."""
+ from ragas.integrations.ag_ui import convert_messages_snapshot
+
+ with pytest.raises(TypeError, match="Expected MessagesSnapshotEvent"):
+ convert_messages_snapshot(MockEvent("WRONG_TYPE"))
+
+
+def test_role_mapping():
+ """Test that different roles map correctly to Ragas message types."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ TextMessageStartEvent(message_id="msg-1", role="user"),
+ TextMessageContentEvent(message_id="msg-1", delta="User message"),
+ TextMessageEndEvent(message_id="msg-1"),
+ TextMessageStartEvent(message_id="msg-2", role="assistant"),
+ TextMessageContentEvent(message_id="msg-2", delta="Assistant message"),
+ TextMessageEndEvent(message_id="msg-2"),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ assert len(messages) == 2
+ assert isinstance(messages[0], HumanMessage)
+ assert messages[0].content == "User message"
+ assert isinstance(messages[1], AIMessage)
+ assert messages[1].content == "Assistant message"
+
+
+def test_complex_conversation_flow():
+ """Test a complex multi-turn conversation with tool calls."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ # User asks
+ TextMessageStartEvent(message_id="msg-1", role="user"),
+ TextMessageContentEvent(message_id="msg-1", delta="What's the weather?"),
+ TextMessageEndEvent(message_id="msg-1"),
+ # Assistant responds and calls tool
+ TextMessageStartEvent(message_id="msg-2", role="assistant"),
+ TextMessageContentEvent(message_id="msg-2", delta="Let me check"),
+ TextMessageEndEvent(message_id="msg-2"),
+ ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="weather_api"),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ # Tool returns result
+ ToolCallResultEvent(
+ tool_call_id="tc-1", message_id="result-1", content="Sunny, 70F"
+ ),
+ # Assistant responds with answer
+ TextMessageStartEvent(message_id="msg-3", role="assistant"),
+ TextMessageContentEvent(message_id="msg-3", delta="It's sunny and 70F"),
+ TextMessageEndEvent(message_id="msg-3"),
+ # User thanks
+ TextMessageStartEvent(message_id="msg-4", role="user"),
+ TextMessageContentEvent(message_id="msg-4", delta="Thanks!"),
+ TextMessageEndEvent(message_id="msg-4"),
+ ]
+
+ messages = convert_to_ragas_messages(events, metadata=True)
+
+ # Should have: Human, AI (with tool_calls), Tool, AI, Human
+ assert len(messages) == 5
+ assert isinstance(messages[0], HumanMessage)
+ assert isinstance(messages[1], AIMessage)
+ assert isinstance(messages[2], ToolMessage)
+ assert isinstance(messages[3], AIMessage)
+ assert isinstance(messages[4], HumanMessage)
+
+ # Check content
+ assert "weather" in messages[0].content.lower()
+ assert "check" in messages[1].content.lower()
+ assert "sunny" in messages[2].content.lower()
+ assert "sunny" in messages[3].content.lower()
+ assert "thanks" in messages[4].content.lower()
+
+ # Check metadata
+ assert all(msg.metadata is not None for msg in messages)
+ assert all("run_id" in msg.metadata for msg in messages)
+
+
+def test_text_message_chunk():
+ """Test TEXT_MESSAGE_CHUNK event handling."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ TextMessageChunkEvent(
+ message_id="msg-1", role="assistant", delta="Complete message"
+ ),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ assert len(messages) == 1
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].content == "Complete message"
+
+
+def test_tool_call_chunk():
+ """Test TOOL_CALL_CHUNK event handling."""
+ from ragas.integrations.ag_ui import convert_to_ragas_messages
+
+ events = [
+ ToolCallChunkEvent(
+ tool_call_id="tc-1", tool_call_name="search", delta='{"query": "test"}'
+ ),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Done"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ]
+
+ messages = convert_to_ragas_messages(events)
+
+ assert len(messages) == 1
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].tool_calls is not None
+ assert len(messages[0].tool_calls) == 1
+ assert messages[0].tool_calls[0].name == "search"
+ assert messages[0].tool_calls[0].args == {"query": "test"}
+
+
+def test_tool_call_chunk_with_dict_delta():
+ """
+ Test that _handle_tool_call_chunk can handle delta as dict.
+
+ While the AG-UI protocol specifies delta as a string, the handler code
+ defensively handles dict deltas. We test this by directly calling the
+ handler with a mock event object.
+ """
+ from ragas.integrations.ag_ui import AGUIEventCollector
+
+ collector = AGUIEventCollector()
+
+ # Create a mock event with dict delta (bypassing Pydantic validation)
+ class MockToolCallChunkEvent:
+ type = "TOOL_CALL_CHUNK"
+ tool_call_id = "tc-1"
+ tool_call_name = "calculate"
+ delta = {"operation": "add", "values": [1, 2, 3]} # dict instead of string
+ timestamp = "2025-01-01T00:00:00Z"
+
+ # Process the mock event directly
+ collector._handle_tool_call_chunk(MockToolCallChunkEvent())
+
+ # Now add an AI message to pick up the tool call
+ from ag_ui.core import (
+ TextMessageContentEvent,
+ TextMessageEndEvent,
+ TextMessageStartEvent,
+ )
+
+ collector.process_event(TextMessageStartEvent(message_id="msg-1", role="assistant"))
+ collector.process_event(
+ TextMessageContentEvent(message_id="msg-1", delta="Result is 6")
+ )
+ collector.process_event(TextMessageEndEvent(message_id="msg-1"))
+
+ messages = collector.get_messages()
+
+ assert len(messages) == 1
+ assert isinstance(messages[0], AIMessage)
+ assert messages[0].tool_calls is not None
+ assert len(messages[0].tool_calls) == 1
+ assert messages[0].tool_calls[0].name == "calculate"
+ assert messages[0].tool_calls[0].args == {"operation": "add", "values": [1, 2, 3]}
+
+
+# ===== FastAPI Integration Tests =====
+
+
+# Helper to check if FastAPI dependencies are available
+def _has_fastapi_deps():
+ try:
+ import httpx # noqa: F401
+
+ return AG_UI_AVAILABLE
+ except ImportError:
+ return False
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_call_ag_ui_endpoint():
+ """Test HTTP client helper for calling AG-UI endpoints."""
+ from unittest.mock import AsyncMock, MagicMock
+
+ from ragas.integrations.ag_ui import _call_ag_ui_endpoint
+
+ # Mock SSE response data
+ sse_lines = [
+ 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567890}',
+ "",
+ 'data: {"type": "TEXT_MESSAGE_START", "message_id": "msg-1", "role": "assistant", "timestamp": 1234567891}',
+ "",
+ 'data: {"type": "TEXT_MESSAGE_CONTENT", "message_id": "msg-1", "delta": "Hello!", "timestamp": 1234567892}',
+ "",
+ 'data: {"type": "TEXT_MESSAGE_END", "message_id": "msg-1", "timestamp": 1234567893}',
+ "",
+ 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567894}',
+ "",
+ ]
+
+ # Create async iterator for SSE lines
+ async def mock_aiter_lines():
+ for line in sse_lines:
+ yield line
+
+ # Mock httpx response
+ mock_response = MagicMock()
+ mock_response.aiter_lines = mock_aiter_lines
+ mock_response.raise_for_status = MagicMock()
+
+ # Mock httpx client
+ mock_client = AsyncMock()
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
+ mock_client.__aexit__ = AsyncMock(return_value=None)
+ mock_client.stream = MagicMock()
+ mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response)
+ mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ events = await _call_ag_ui_endpoint(
+ endpoint_url="http://localhost:8000/agent",
+ user_input="Hello",
+ )
+
+ # Should have collected 5 events
+ assert len(events) == 5
+ assert events[0].type == "RUN_STARTED"
+ assert events[1].type == "TEXT_MESSAGE_START"
+ assert events[2].type == "TEXT_MESSAGE_CONTENT"
+ assert events[3].type == "TEXT_MESSAGE_END"
+ assert events[4].type == "RUN_FINISHED"
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_call_ag_ui_endpoint_with_config():
+ """Test HTTP client with thread_id and agent_config."""
+ from unittest.mock import AsyncMock, MagicMock
+
+ from ragas.integrations.ag_ui import _call_ag_ui_endpoint
+
+ sse_lines = [
+ 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "my-thread", "timestamp": 1234567890}',
+ "",
+ 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "my-thread", "timestamp": 1234567891}',
+ "",
+ ]
+
+ async def mock_aiter_lines():
+ for line in sse_lines:
+ yield line
+
+ mock_response = MagicMock()
+ mock_response.aiter_lines = mock_aiter_lines
+ mock_response.raise_for_status = MagicMock()
+
+ mock_client = AsyncMock()
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
+ mock_client.__aexit__ = AsyncMock(return_value=None)
+ mock_client.stream = MagicMock()
+ mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response)
+ mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ events = await _call_ag_ui_endpoint(
+ endpoint_url="http://localhost:8000/agent",
+ user_input="Test query",
+ thread_id="my-thread",
+ agent_config={"temperature": 0.7},
+ )
+
+ assert len(events) == 2
+ # Check that thread_id was passed through
+ assert events[0].thread_id == "my-thread"
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_call_ag_ui_endpoint_malformed_json():
+ """Test HTTP client handles malformed JSON gracefully."""
+ from unittest.mock import AsyncMock, MagicMock
+
+ from ragas.integrations.ag_ui import _call_ag_ui_endpoint
+
+ sse_lines = [
+ 'data: {"type": "RUN_STARTED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567890}',
+ "",
+ "data: {invalid json}", # Malformed
+ "",
+ 'data: {"type": "RUN_FINISHED", "run_id": "run-1", "thread_id": "thread-1", "timestamp": 1234567891}',
+ "",
+ ]
+
+ async def mock_aiter_lines():
+ for line in sse_lines:
+ yield line
+
+ mock_response = MagicMock()
+ mock_response.aiter_lines = mock_aiter_lines
+ mock_response.raise_for_status = MagicMock()
+
+ mock_client = AsyncMock()
+ mock_client.__aenter__ = AsyncMock(return_value=mock_client)
+ mock_client.__aexit__ = AsyncMock(return_value=None)
+ mock_client.stream = MagicMock()
+ mock_client.stream.return_value.__aenter__ = AsyncMock(return_value=mock_response)
+ mock_client.stream.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ events = await _call_ag_ui_endpoint(
+ endpoint_url="http://localhost:8000/agent",
+ user_input="Test",
+ )
+
+ # Should skip malformed event but collect valid ones
+ assert len(events) == 2
+ assert events[0].type == "RUN_STARTED"
+ assert events[1].type == "RUN_FINISHED"
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_evaluate_ag_ui_agent():
+ """Test batch evaluation of AG-UI agent endpoint."""
+ from unittest.mock import MagicMock
+
+ from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+
+ # Create mock dataset
+ dataset = EvaluationDataset(
+ samples=[
+ SingleTurnSample(
+ user_input="What's the weather?",
+ reference="Check weather API",
+ ),
+ SingleTurnSample(
+ user_input="Tell me a joke",
+ reference="Respond with humor",
+ ),
+ ]
+ )
+
+ # Mock events for first query (weather)
+ weather_events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="It's sunny and 72F"),
+ TextMessageEndEvent(message_id="msg-1"),
+ RunFinishedEvent(run_id="run-1", thread_id="thread-1"),
+ ]
+
+ # Mock events for second query (joke)
+ joke_events = [
+ RunStartedEvent(run_id="run-2", thread_id="thread-2"),
+ TextMessageStartEvent(message_id="msg-2", role="assistant"),
+ TextMessageContentEvent(
+ message_id="msg-2", delta="Why don't scientists trust atoms?"
+ ),
+ TextMessageContentEvent(message_id="msg-2", delta=" They make up everything!"),
+ TextMessageEndEvent(message_id="msg-2"),
+ RunFinishedEvent(run_id="run-2", thread_id="thread-2"),
+ ]
+
+ # Mock _call_ag_ui_endpoint to return different events based on input
+ async def mock_call_endpoint(endpoint_url, user_input, **kwargs):
+ if "weather" in user_input.lower():
+ return weather_events
+ else:
+ return joke_events
+
+ # Mock ragas_evaluate to return a simple result
+ mock_result = MagicMock()
+ mock_result.to_pandas = MagicMock(return_value=MagicMock())
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui._call_ag_ui_endpoint",
+ side_effect=mock_call_endpoint,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ result = await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[], # Empty for testing
+ )
+
+ # Check that dataset was populated
+ assert dataset.samples[0].response == "It's sunny and 72F"
+ assert (
+ dataset.samples[1].response
+ == "Why don't scientists trust atoms? They make up everything!"
+ )
+
+ # Check that evaluation was called
+ assert result == mock_result
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_evaluate_ag_ui_agent_with_tool_calls():
+ """Test evaluation with tool calls in response."""
+ from unittest.mock import MagicMock
+
+ from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+
+ dataset = EvaluationDataset(
+ samples=[
+ SingleTurnSample(
+ user_input="Search for Python tutorials",
+ ),
+ ]
+ )
+
+ # Mock events with tool call
+ search_events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Let me search for that"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ToolCallStartEvent(tool_call_id="tc-1", tool_call_name="search"),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"query": "Python tutorials"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ ToolCallResultEvent(
+ tool_call_id="tc-1",
+ message_id="result-1",
+ content="Found: tutorial1.com, tutorial2.com",
+ ),
+ RunFinishedEvent(run_id="run-1", thread_id="thread-1"),
+ ]
+
+ async def mock_call_endpoint(endpoint_url, user_input, **kwargs):
+ return search_events
+
+ mock_result = MagicMock()
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui._call_ag_ui_endpoint",
+ side_effect=mock_call_endpoint,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[],
+ )
+
+ # Check that response was extracted
+ assert dataset.samples[0].response == "Let me search for that"
+ # Check that tool results are in retrieved_contexts
+ assert dataset.samples[0].retrieved_contexts is not None
+ assert len(dataset.samples[0].retrieved_contexts) == 1
+ assert "tutorial1.com" in dataset.samples[0].retrieved_contexts[0]
+
+
+@pytest.mark.skipif(
+ not _has_fastapi_deps(), reason="httpx or ag-ui-protocol not installed"
+)
+@pytest.mark.asyncio
+async def test_evaluate_ag_ui_agent_handles_failures():
+ """Test evaluation handles HTTP failures gracefully."""
+ import math
+ from unittest.mock import MagicMock
+
+ from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+
+ dataset = EvaluationDataset(
+ samples=[
+ SingleTurnSample(user_input="Query 1"),
+ SingleTurnSample(user_input="Query 2"),
+ ]
+ )
+
+ # Mock events - first succeeds, second fails (returns NaN from executor)
+ success_events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Success response"),
+ TextMessageEndEvent(message_id="msg-1"),
+ RunFinishedEvent(run_id="run-1", thread_id="thread-1"),
+ ]
+
+ call_count = [0]
+
+ async def mock_call_endpoint(endpoint_url, user_input, **kwargs):
+ call_count[0] += 1
+ if call_count[0] == 1:
+ return success_events
+ else:
+ # Simulate failure by raising exception
+ raise Exception("Connection failed")
+
+ mock_result = MagicMock()
+
+ # Mock Executor to handle the exception
+ class MockExecutor:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def submit(self, func, *args, **kwargs):
+ pass
+
+ def results(self):
+ # First result succeeds, second is NaN (failed)
+ return [success_events, math.nan]
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui.Executor",
+ MockExecutor,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[],
+ )
+
+ # First sample should have response, second should be empty string
+ assert dataset.samples[0].response == "Success response"
+ assert dataset.samples[1].response == ""
+ assert dataset.samples[1].retrieved_contexts == []
+
+
+# ============================================================================
+# Multi-turn evaluation tests
+# ============================================================================
+
+
+def test_convert_ragas_messages_to_ag_ui():
+ """Test converting Ragas messages to AG-UI format."""
+ from ragas.integrations.ag_ui import _convert_ragas_messages_to_ag_ui
+ from ragas.messages import ToolCall
+
+ messages = [
+ HumanMessage(content="What's the weather?"),
+ AIMessage(
+ content="Let me check",
+ tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})],
+ ),
+ HumanMessage(content="Thanks!"),
+ ]
+
+ ag_ui_messages = _convert_ragas_messages_to_ag_ui(messages)
+
+ assert len(ag_ui_messages) == 3
+
+ # Check UserMessage
+ assert ag_ui_messages[0].id == "1"
+ assert ag_ui_messages[0].content == "What's the weather?"
+
+ # Check AssistantMessage with tool calls
+ assert ag_ui_messages[1].id == "2"
+ assert ag_ui_messages[1].content == "Let me check"
+ assert ag_ui_messages[1].tool_calls is not None
+ assert len(ag_ui_messages[1].tool_calls) == 1
+ assert ag_ui_messages[1].tool_calls[0].function.name == "get-weather"
+ assert '"location": "SF"' in ag_ui_messages[1].tool_calls[0].function.arguments
+
+ # Check second UserMessage
+ assert ag_ui_messages[2].id == "3"
+ assert ag_ui_messages[2].content == "Thanks!"
+
+
+@pytest.mark.asyncio
+async def test_evaluate_multi_turn_basic():
+ """Test basic multi-turn evaluation."""
+ from unittest.mock import MagicMock, patch
+
+ from ragas.dataset_schema import EvaluationDataset, MultiTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+ from ragas.messages import ToolCall
+
+ # Create multi-turn sample
+ sample = MultiTurnSample(
+ user_input=[HumanMessage(content="What's the weather in SF?")],
+ reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})],
+ )
+
+ dataset = EvaluationDataset(samples=[sample])
+
+ # Mock events that agent would return
+ # Note: Tool calls are completed before message, so they attach to the next AIMessage
+ agent_events = [
+ RunStartedEvent(run_id="run-1", thread_id="thread-1"),
+ ToolCallStartEvent(
+ tool_call_id="tc-1",
+ tool_call_name="get-weather",
+ parent_message_id="msg-1",
+ ),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ToolCallResultEvent(
+ tool_call_id="tc-1",
+ message_id="result-1",
+ content="Temperature: 72°F",
+ ),
+ RunFinishedEvent(run_id="run-1", thread_id="thread-1"),
+ ]
+
+ mock_result = MagicMock()
+
+ # Mock Executor
+ class MockExecutor:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def submit(self, func, *args, **kwargs):
+ pass
+
+ def results(self):
+ return [agent_events]
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui.Executor",
+ MockExecutor,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[],
+ )
+
+ # Verify that agent responses were appended to conversation
+ assert len(sample.user_input) > 1 # Should have original + agent responses
+
+ # Check that we have AIMessage and ToolMessage appended
+ ai_messages = [msg for msg in sample.user_input if isinstance(msg, AIMessage)]
+ tool_messages = [msg for msg in sample.user_input if isinstance(msg, ToolMessage)]
+
+ assert len(ai_messages) >= 1 # At least one AI message
+ assert len(tool_messages) >= 1 # At least one tool message
+
+ # Verify tool calls in AIMessage (tool calls completed before message, so attached to it)
+ assert ai_messages[0].tool_calls is not None
+ assert len(ai_messages[0].tool_calls) > 0
+ assert ai_messages[0].tool_calls[0].name == "get-weather"
+
+
+@pytest.mark.asyncio
+async def test_evaluate_multi_turn_with_existing_conversation():
+ """Test multi-turn evaluation with pre-existing conversation."""
+ from unittest.mock import MagicMock, patch
+
+ from ragas.dataset_schema import EvaluationDataset, MultiTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+ from ragas.messages import ToolCall
+
+ # Create sample with existing conversation
+ sample = MultiTurnSample(
+ user_input=[
+ HumanMessage(content="Hello"),
+ AIMessage(content="Hi there!"),
+ HumanMessage(content="What's the weather in SF?"),
+ ],
+ reference_tool_calls=[ToolCall(name="get-weather", args={"location": "SF"})],
+ )
+
+ original_length = len(sample.user_input)
+ dataset = EvaluationDataset(samples=[sample])
+
+ # Mock agent events
+ agent_events = [
+ TextMessageStartEvent(message_id="msg-1", role="assistant"),
+ TextMessageContentEvent(message_id="msg-1", delta="Let me check the weather"),
+ TextMessageEndEvent(message_id="msg-1"),
+ ToolCallStartEvent(
+ tool_call_id="tc-1",
+ tool_call_name="get-weather",
+ parent_message_id="msg-1",
+ ),
+ ToolCallArgsEvent(tool_call_id="tc-1", delta='{"location": "SF"}'),
+ ToolCallEndEvent(tool_call_id="tc-1"),
+ ]
+
+ mock_result = MagicMock()
+
+ class MockExecutor:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def submit(self, func, *args, **kwargs):
+ pass
+
+ def results(self):
+ return [agent_events]
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui.Executor",
+ MockExecutor,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[],
+ )
+
+ # Verify conversation was extended, not replaced
+ assert len(sample.user_input) > original_length
+
+ # First 3 messages should be unchanged
+ assert isinstance(sample.user_input[0], HumanMessage)
+ assert sample.user_input[0].content == "Hello"
+ assert isinstance(sample.user_input[1], AIMessage)
+ assert sample.user_input[1].content == "Hi there!"
+ assert isinstance(sample.user_input[2], HumanMessage)
+ assert sample.user_input[2].content == "What's the weather in SF?"
+
+ # New messages should be appended
+ new_messages = sample.user_input[original_length:]
+ assert len(new_messages) > 0
+ assert any(isinstance(msg, AIMessage) for msg in new_messages)
+
+
+@pytest.mark.asyncio
+async def test_evaluate_multi_turn_failed_query():
+ """Test multi-turn evaluation handles failed queries correctly."""
+ import math
+ from unittest.mock import MagicMock, patch
+
+ from ragas.dataset_schema import EvaluationDataset, MultiTurnSample
+ from ragas.integrations.ag_ui import evaluate_ag_ui_agent
+
+ # Create multi-turn sample
+ sample = MultiTurnSample(
+ user_input=[HumanMessage(content="Test query")],
+ reference_tool_calls=[],
+ )
+
+ original_length = len(sample.user_input)
+ dataset = EvaluationDataset(samples=[sample])
+
+ mock_result = MagicMock()
+
+ class MockExecutor:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def submit(self, func, *args, **kwargs):
+ pass
+
+ def results(self):
+ # Return NaN to simulate failure
+ return [math.nan]
+
+ with (
+ patch(
+ "ragas.integrations.ag_ui.Executor",
+ MockExecutor,
+ ),
+ patch(
+ "ragas.integrations.ag_ui.ragas_evaluate",
+ return_value=mock_result,
+ ),
+ ):
+ await evaluate_ag_ui_agent(
+ endpoint_url="http://localhost:8000/agent",
+ dataset=dataset,
+ metrics=[],
+ )
+
+ # Conversation should remain unchanged after failure
+ assert len(sample.user_input) == original_length
+ assert isinstance(sample.user_input[0], HumanMessage)
+ assert sample.user_input[0].content == "Test query"