diff --git a/packages/sample-app/sample_app/experiment/run_research_experiment.py b/packages/sample-app/sample_app/experiment/run_research_experiment.py new file mode 100644 index 0000000000..974f690d34 --- /dev/null +++ b/packages/sample-app/sample_app/experiment/run_research_experiment.py @@ -0,0 +1,83 @@ +""" +Example experiment script for CI/CD using run_in_github + +This script: +1. Executes tasks locally on the dataset +2. Sends task results to the backend +3. Backend runs evaluators and posts PR comment with results +""" + +import asyncio +import os +from openai import AsyncOpenAI +from traceloop.sdk import Traceloop +from traceloop.sdk.experiment.model import RunInGithubResponse + +# Initialize Traceloop client +client = Traceloop.init( + app_name="research-experiment-ci-cd", + api_key=os.getenv("TRACELOOP_API_KEY"), + api_endpoint=os.getenv("TRACELOOP_BASE_URL"), +) + + +async def generate_research_response(question: str) -> str: + """Generate a research response using OpenAI""" + openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + + response = await openai_client.chat.completions.create( + model="gpt-4", + messages=[ + { + "role": "system", + "content": "You are a helpful research assistant. Provide accurate, well-researched answers.", + }, + {"role": "user", "content": question}, + ], + temperature=0.7, + max_tokens=500, + ) + + return response.choices[0].message.content + + +async def research_task(row): + """Task function that processes each dataset row""" + query = row.get("query", "") + answer = await generate_research_response(query) + + return { + "completion": answer, + "question": query, + "sentence": answer + } + + +async def main(): + """Run experiment in GitHub context""" + print("šŸš€ Running research experiment in GitHub CI/CD...") + + # Execute tasks locally and send results to backend + response = await client.experiment.run( + task=research_task, + dataset_slug="research-queries", + dataset_version="v2", + evaluators=["research-relevancy", "categories", "research-facts-counter"], + experiment_slug="research-exp", + ) + + # Print response + print("\nāœ… Experiment completed and submitted!") + + if isinstance(response, RunInGithubResponse): + print(f"Experiment Slug: {response.experiment_slug}") + print(f"Run ID: {response.run_id}") + else: + print(f"Results: {response}") + + print("\nšŸ“ The backend will run evaluators and post results to your PR.") + print(" Check your GitHub PR for the results comment.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py b/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py index 9041d8ebe3..453c6f7e8d 100644 --- a/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py +++ b/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py @@ -1,7 +1,8 @@ import cuid import asyncio import json -from typing import Any, List, Callable, Optional, Tuple, Dict +import os +from typing import Any, List, Callable, Optional, Tuple, Dict, Awaitable, Union from traceloop.sdk.client.http import HTTPClient from traceloop.sdk.datasets.datasets import Datasets from traceloop.sdk.evaluator.evaluator import Evaluator @@ -12,6 +13,10 @@ CreateTaskResponse, EvaluatorDetails, TaskResponse, + RunInGithubRequest, + RunInGithubResponse, + TaskResult, + GithubContext, ) import httpx @@ -31,11 +36,65 @@ def __init__(self, http_client: HTTPClient, async_http_client: httpx.AsyncClient async def run( self, - task: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]], + task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]], dataset_slug: Optional[str] = None, dataset_version: Optional[str] = None, evaluators: Optional[List[EvaluatorDetails]] = None, experiment_slug: Optional[str] = None, + experiment_metadata: Optional[Dict[str, Any]] = None, + related_ref: Optional[Dict[str, str]] = None, + aux: Optional[Dict[str, str]] = None, + stop_on_error: bool = False, + wait_for_results: bool = True, + ) -> Tuple[List[TaskResponse], List[str]] | RunInGithubResponse: + """Run an experiment with the given task and evaluators + + Args: + task: Async function to run on each dataset row + dataset_slug: Slug of the dataset to use + dataset_version: Version of the dataset to use + evaluators: List of evaluator slugs to run + experiment_slug: Slug for this experiment run + experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs) + related_ref: Related reference for this experiment run + aux: Auxiliary information for this experiment run + stop_on_error: Whether to stop on first error (default: False) + wait_for_results: Whether to wait for async tasks to complete (default: True) + Returns: + Tuple of (results, errors). Returns ([], []) if wait_for_results is False + """ + if os.getenv("GITHUB_ACTIONS"): + return await self._run_in_github( + task=task, + dataset_slug=dataset_slug, + dataset_version=dataset_version, + evaluators=evaluators, + experiment_slug=experiment_slug, + related_ref=related_ref, + aux=aux, + ) + else: + return await self._run_locally( + task=task, + dataset_slug=dataset_slug, + dataset_version=dataset_version, + evaluators=evaluators, + experiment_slug=experiment_slug, + experiment_metadata=experiment_metadata, + related_ref=related_ref, + aux=aux, + stop_on_error=stop_on_error, + wait_for_results=wait_for_results, + ) + + async def _run_locally( + self, + task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]], + dataset_slug: Optional[str] = None, + dataset_version: Optional[str] = None, + evaluators: Optional[List[EvaluatorDetails]] = None, + experiment_slug: Optional[str] = None, + experiment_metadata: Optional[Dict[str, Any]] = None, related_ref: Optional[Dict[str, str]] = None, aux: Optional[Dict[str, str]] = None, stop_on_error: bool = False, @@ -45,9 +104,10 @@ async def run( Args: dataset_slug: Slug of the dataset to use - task: Function to run on each dataset row + task: Async function to run on each dataset row evaluators: List of evaluator slugs to run experiment_slug: Slug for this experiment run + experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs) related_ref: Related reference for this experiment run aux: Auxiliary information for this experiment run stop_on_error: Whether to stop on first error (default: False) @@ -82,6 +142,7 @@ async def run( evaluator_slugs=[slug for slug, _ in evaluator_details] if evaluator_details else None, + experiment_metadata=experiment_metadata, experiment_run_metadata=experiment_run_metadata, ) @@ -97,17 +158,15 @@ async def run( async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResponse: try: - # TODO: Fix type annotation - task should return Awaitable, not dict - task_result = await task(row) # type: ignore[misc] - # TODO: Fix type - task_input should accept Optional[Dict] + task_result = await task(row) task_id = self._create_task( experiment_slug=experiment_slug, experiment_run_id=run_id, - task_input=row, # type: ignore[arg-type] + task_input=row, task_output=task_result, ).id - eval_results = {} + eval_results: Dict[str, Union[Dict[str, Any], str]] = {} if evaluator_details: for evaluator_slug, evaluator_version in evaluator_details: try: @@ -134,13 +193,11 @@ async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResponse: input=task_result, ) - # TODO: Fix type - eval_results should accept Union[Dict, str] msg = f"Triggered execution of {evaluator_slug}" - eval_results[evaluator_slug] = msg # type: ignore[assignment] + eval_results[evaluator_slug] = msg except Exception as e: - # TODO: Fix type - eval_results should accept Union[Dict, str] - eval_results[evaluator_slug] = f"Error: {str(e)}" # type: ignore[assignment] + eval_results[evaluator_slug] = f"Error: {str(e)}" return TaskResponse( task_result=task_result, @@ -180,6 +237,138 @@ async def run_with_semaphore(row: Optional[Dict[str, Any]]) -> TaskResponse: return results, errors + async def _run_in_github( + self, + task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]], + dataset_slug: Optional[str] = None, + dataset_version: Optional[str] = None, + evaluators: Optional[List[EvaluatorDetails]] = None, + experiment_slug: Optional[str] = None, + experiment_metadata: Optional[Dict[str, Any]] = None, + related_ref: Optional[Dict[str, str]] = None, + aux: Optional[Dict[str, str]] = None, + ) -> RunInGithubResponse: + """Execute tasks locally and submit results to backend for GitHub CI/CD + + This method: + 1. Fetches the dataset + 2. Executes all tasks locally + 3. Sends task results to backend + 4. Backend runs evaluators and posts PR comment + + Args: + task: Async function to run on each dataset row + dataset_slug: Slug of the dataset to use + dataset_version: Version of the dataset + evaluators: List of evaluator slugs or (slug, version) tuples to run + experiment_slug: Slug for this experiment run + experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs) + related_ref: Additional reference information for this experiment run + aux: Auxiliary information for this experiment run + + Returns: + RunInGithubResponse with experiment_id, run_id, and status + + Raises: + RuntimeError: If not running in GitHub Actions environment + Exception: If the API request fails + """ + + # Check if running in GitHub Actions + if not os.getenv("GITHUB_ACTIONS"): + raise RuntimeError( + "run_in_github() can only be used in GitHub Actions CI/CD environment. " + "To run experiments locally, use the run() method instead." + ) + + if not experiment_slug: + experiment_slug = self._experiment_slug or "exp-" + str(cuid.cuid())[:11] + + # Fetch dataset rows + rows = [] + if dataset_slug and dataset_version: + jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version) + rows = self._parse_jsonl_to_rows(jsonl_data) + + task_results = await self._execute_tasks(rows, task) + + # Construct GitHub context + repository = os.getenv("GITHUB_REPOSITORY") + server_url = os.getenv("GITHUB_SERVER_URL", "https://github.com") + github_event_name = os.getenv("GITHUB_EVENT_NAME", "") + + # Verify this is running in a pull request context + if github_event_name != "pull_request": + raise RuntimeError( + f"run_in_github() can only be used in pull_request workflow. " + f"Current event: {github_event_name}. " + "To run experiments locally, use the run() method instead." + ) + + # Extract PR number from GITHUB_REF (format: "refs/pull/123/merge") + github_ref = os.getenv("GITHUB_REF", "") + pr_number = None + if github_ref.startswith("refs/pull/"): + pr_number = github_ref.split("/")[2] + + if not repository or not github_ref or not pr_number: + raise RuntimeError( + "GITHUB_REPOSITORY and GITHUB_REF must be set in the environment. " + "To run experiments locally, use the run() method instead." + ) + + pr_url = f"{server_url}/{repository}/pull/{pr_number}" + + github_context = GithubContext( + repository=repository, + pr_url=pr_url, + commit_hash=os.getenv("GITHUB_SHA", ""), + actor=os.getenv("GITHUB_ACTOR", ""), + ) + + experiment_metadata = dict( + experiment_metadata or {}, + created_from="github" + ) + + experiment_run_metadata = { + key: value + for key, value in [("related_ref", related_ref), ("aux", aux)] + if value is not None + } + + # Extract evaluator slugs + evaluator_slugs = None + if evaluators: + evaluator_slugs = [ + slug if isinstance(slug, str) else slug[0] + for slug in evaluators + ] + + # Prepare request payload + request_body = RunInGithubRequest( + experiment_slug=experiment_slug, + dataset_slug=dataset_slug, + dataset_version=dataset_version, + evaluator_slugs=evaluator_slugs, + task_results=task_results, + github_context=github_context, + experiment_metadata=experiment_metadata, + experiment_run_metadata=experiment_run_metadata, + ) + + response = self._http_client.post( + "/experiments/run-in-github", + request_body.model_dump(mode="json", exclude_none=True), + ) + + if response is None: + raise Exception( + f"Failed to submit experiment '{experiment_slug}' for GitHub execution. " + ) + + return RunInGithubResponse(**response) + def _init_experiment( self, experiment_slug: str, @@ -211,7 +400,7 @@ def _create_task( self, experiment_slug: str, experiment_run_id: str, - task_input: Dict[str, Any], + task_input: Optional[Dict[str, Any]], task_output: Dict[str, Any], ) -> CreateTaskResponse: body = CreateTaskRequest( @@ -242,3 +431,47 @@ def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]: continue return rows + + async def _execute_tasks( + self, + rows: List[Dict[str, Any]], + task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]], + ) -> List[TaskResult]: + """Execute tasks locally with concurrency control + + Args: + rows: List of dataset rows to process + task: Function to run on each row + + Returns: + List of TaskResult objects with inputs, outputs, and errors + """ + task_results: List[TaskResult] = [] + + async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResult: + try: + task_output = await task(row) + return TaskResult( + input=row, + output=task_output, + ) + except Exception as e: + return TaskResult( + input=row, + error=str(e), + ) + + # Execute tasks with concurrency control + semaphore = asyncio.Semaphore(50) + + async def run_with_semaphore(row: Dict[str, Any]) -> TaskResult: + async with semaphore: + return await run_single_row(row) + + tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows] + + for completed_task in asyncio.as_completed(tasks): + result = await completed_task + task_results.append(result) + + return task_results diff --git a/packages/traceloop-sdk/traceloop/sdk/experiment/model.py b/packages/traceloop-sdk/traceloop/sdk/experiment/model.py index 887bcd5743..f55ed0b487 100644 --- a/packages/traceloop-sdk/traceloop/sdk/experiment/model.py +++ b/packages/traceloop-sdk/traceloop/sdk/experiment/model.py @@ -66,3 +66,41 @@ class CreateTaskResponse(BaseModel): """Model for create task response""" id: str + + +class TaskResult(BaseModel): + """Model for a single task result""" + + input: Dict[str, Any] + output: Optional[Dict[str, Any]] = None + error: Optional[str] = None + + +class GithubContext(BaseModel): + """Model for GitHub context""" + + repository: str + pr_url: str + commit_hash: str + actor: str + + +class RunInGithubRequest(BaseModel): + """Model for bulk GitHub experiment execution request""" + + experiment_slug: str + dataset_slug: Optional[str] = None + dataset_version: Optional[str] = None + evaluator_slugs: Optional[List[str]] = None + task_results: List[TaskResult] + github_context: GithubContext + experiment_metadata: Optional[Dict[str, Any]] = None + experiment_run_metadata: Optional[Dict[str, Any]] = None + + +class RunInGithubResponse(BaseModel): + """Model for bulk GitHub experiment execution response""" + + experiment_id: str + experiment_slug: str + run_id: str