Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Example experiment script for CI/CD using run_in_github
This script:
1. Executes tasks locally on the dataset
2. Sends task results to the backend
3. Backend runs evaluators and posts PR comment with results
"""

import asyncio
import os
from openai import AsyncOpenAI
from traceloop.sdk import Traceloop

# Initialize Traceloop client
client = Traceloop.init(
app_name="research-experiment-ci-cd",
api_key=os.getenv("TRACELOOP_API_KEY"),
api_endpoint=os.getenv("TRACELOOP_BASE_URL"),
)


async def generate_research_response(question: str) -> str:
"""Generate a research response using OpenAI"""
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

response = await openai_client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "You are a helpful research assistant. Provide accurate, well-researched answers.",
},
{"role": "user", "content": question},
],
temperature=0.7,
max_tokens=500,
)

return response.choices[0].message.content


async def research_task(row):
"""Task function that processes each dataset row"""
query = row.get("query", "")
answer = await generate_research_response(query)

return {
"completion": answer,
"question": query,
"sentenece": answer
}


async def main():
"""Run experiment in GitHub context"""
print("🚀 Running research experiment in GitHub CI/CD...")

# Execute tasks locally and send results to backend
response = await client.experiment.run_in_github(
task=research_task,
dataset_slug="research-queries",
dataset_version="v2",
evaluators=["research-relevancy", "categories", "research-facts-counter"],
experiment_slug="research-exp",
)

# Print response
print("\n✅ Experiment completed and submitted!")
print(f"Experiment Slug: {response.experiment_slug}")
print(f"Run ID: {response.run_id}")

print("\n📝 The backend will run evaluators and post results to your PR.")
print(" Check your GitHub PR for the results comment.")


if __name__ == "__main__":
asyncio.run(main())
172 changes: 172 additions & 0 deletions packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import cuid
import asyncio
import json
import os
from typing import Any, List, Callable, Optional, Tuple, Dict
from traceloop.sdk.client.http import HTTPClient
from traceloop.sdk.datasets.datasets import Datasets
Expand All @@ -12,6 +13,10 @@
CreateTaskResponse,
EvaluatorDetails,
TaskResponse,
RunInGithubRequest,
RunInGithubResponse,
TaskResult,
GithubContext,
)
import httpx

Expand All @@ -36,6 +41,7 @@ async def run(
dataset_version: Optional[str] = None,
evaluators: Optional[List[EvaluatorDetails]] = None,
experiment_slug: Optional[str] = None,
experiment_metadata: Optional[Dict[str, Any]] = None,
related_ref: Optional[Dict[str, str]] = None,
aux: Optional[Dict[str, str]] = None,
stop_on_error: bool = False,
Expand All @@ -48,6 +54,7 @@ async def run(
task: Function to run on each dataset row
evaluators: List of evaluator slugs to run
experiment_slug: Slug for this experiment run
experiment_metadata: Metadata for this experiment (an experiment holds all the experinent runs)
related_ref: Related reference for this experiment run
aux: Auxiliary information for this experiment run
stop_on_error: Whether to stop on first error (default: False)
Expand Down Expand Up @@ -82,6 +89,7 @@ async def run(
evaluator_slugs=[slug for slug, _ in evaluator_details]
if evaluator_details
else None,
experiment_metadata=experiment_metadata,
experiment_run_metadata=experiment_run_metadata,
)

Expand Down Expand Up @@ -177,6 +185,118 @@ async def run_with_semaphore(row) -> TaskResponse:

return results, errors

async def run_in_github(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a private method and then in the general run you decide whether to call run_in_github or run_local based on the env vars

self,
task: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]],
dataset_slug: Optional[str] = None,
dataset_version: Optional[str] = None,
evaluators: Optional[List[EvaluatorDetails]] = None,
experiment_slug: Optional[str] = None,
related_ref: Optional[Dict[str, str]] = None,
) -> RunInGithubResponse:
"""Execute tasks locally and submit results to backend for GitHub CI/CD

This method:
1. Fetches the dataset
2. Executes all tasks locally
3. Sends task results to backend
4. Backend runs evaluators and posts PR comment

Args:
task: Function to run on each dataset row
dataset_slug: Slug of the dataset to use
dataset_version: Version of the dataset
evaluators: List of evaluator slugs or (slug, version) tuples to run
experiment_slug: Slug for this experiment run
related_ref: Additional reference information for this experiment run

Returns:
RunInGithubResponse with experiment_id, run_id, and status

Raises:
RuntimeError: If not running in GitHub Actions environment
Exception: If the API request fails
"""

# Check if running in GitHub Actions
if not os.getenv("GITHUB_ACTIONS"):
raise RuntimeError(
"run_in_github() can only be used in GitHub Actions CI/CD environment. "
"To run experiments locally, use the run() method instead."
)

if not experiment_slug:
experiment_slug = self._experiment_slug or "exp-" + str(cuid.cuid())[:11]

# Fetch dataset rows
rows = []
if dataset_slug and dataset_version:
jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version)
rows = self._parse_jsonl_to_rows(jsonl_data)

task_results = await self._execute_tasks(rows, task)

# Construct GitHub context
repository = os.getenv("GITHUB_REPOSITORY")
server_url = os.getenv("GITHUB_SERVER_URL", "https://github.com")

# Extract PR number from GITHUB_REF (format: "refs/pull/123/merge")
github_ref = os.getenv("GITHUB_REF", "")

if not repository or not github_ref:
raise RuntimeError(
"GITHUB_REPOSITORY and GITHUB_REF must be set in the environment. "
"To run experiments locally, use the run() method instead."
)

pr_number = None
if github_ref.startswith("refs/pull/"):
pr_number = github_ref.split("/")[2]
pr_url = f"{server_url}/{repository}/pull/{pr_number}" if pr_number and repository else None

github_context = GithubContext(
repository=repository,
pr_url=pr_url,
commit_hash=os.getenv("GITHUB_SHA", ""),
actor=os.getenv("GITHUB_ACTOR", ""),
)

experiment_metadata = {
"created_from": "github",
}

# Extract evaluator slugs
evaluator_slugs = None
if evaluators:
evaluator_slugs = [
slug if isinstance(slug, str) else slug[0]
for slug in evaluators
]

# Prepare request payload
request_body = RunInGithubRequest(
experiment_slug=experiment_slug,
dataset_slug=dataset_slug,
dataset_version=dataset_version,
evaluator_slugs=evaluator_slugs,
task_results=task_results,
github_context=github_context,
experiment_metadata=experiment_metadata,
experiment_run_metadata=related_ref,
)

response = self._http_client.post(
"/experiments/run-in-github",
request_body.model_dump(mode="json", exclude_none=True),
)

if response is None:
raise Exception(
f"Failed to submit experiment '{experiment_slug}' for GitHub execution. "
)

return RunInGithubResponse(**response)

def _init_experiment(
self,
experiment_slug: str,
Expand Down Expand Up @@ -239,3 +359,55 @@ def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]:
continue

return rows

async def _execute_tasks(
self,
rows: List[Dict[str, Any]],
task: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]],
) -> List[TaskResult]:
"""Execute tasks locally with concurrency control

Args:
rows: List of dataset rows to process
task: Function to run on each row
stop_on_error: Whether to stop on first error

Returns:
List of TaskResult objects with inputs, outputs, and errors
"""
task_results: List[TaskResult] = []

async def run_single_row(row) -> TaskResult:
try:
task_output = await task(row)
return TaskResult(
input=row,
output=task_output,
)
except Exception as e:
return TaskResult(
input=row,
error=str(e),
)

# Execute tasks with concurrency control
semaphore = asyncio.Semaphore(50)

async def run_with_semaphore(row: Dict[str, Any]) -> TaskResult:
async with semaphore:
return await run_single_row(row)

tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows]

for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
task_results.append(result)
except Exception as e:
task_results.append(TaskResult(
input=completed_task.task_input,
error=str(e),
))
continue

return task_results
38 changes: 38 additions & 0 deletions packages/traceloop-sdk/traceloop/sdk/experiment/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,41 @@ class CreateTaskResponse(BaseModel):
"""Model for create task response"""

id: str


class TaskResult(BaseModel):
"""Model for a single task result"""

input: Dict[str, Any]
output: Optional[Dict[str, Any]] = None
error: Optional[str] = None


class GithubContext(BaseModel):
"""Model for GitHub context"""

repository: str
pr_url: str
commit_hash: str
actor: str


class RunInGithubRequest(BaseModel):
"""Model for bulk GitHub experiment execution request"""

experiment_slug: str
dataset_slug: Optional[str] = None
dataset_version: Optional[str] = None
evaluator_slugs: Optional[List[str]] = None
task_results: List[TaskResult]
github_context: GithubContext
experiment_metadata: Optional[Dict[str, Any]] = None
experiment_run_metadata: Optional[Dict[str, Any]] = None


class RunInGithubResponse(BaseModel):
"""Model for bulk GitHub experiment execution response"""

experiment_id: str
experiment_slug: str
run_id: str