From 79f550107310f99a9c7ea1f8b766f325ac1aec36 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 23 Nov 2025 21:12:21 +0000 Subject: [PATCH 1/2] feat: Add fake LlamaCloud server for testing extract API Co-authored-by: adrian --- .../testing_utils/__init__.py | 17 + .../testing_utils/llama_cloud/__init__.py | 14 + .../testing_utils/llama_cloud/extract.py | 497 ++++++++++++++++++ .../testing_utils/llama_cloud/matchers.py | 64 +++ .../testing_utils/llama_cloud/server.py | 185 +++++++ .../testing_utils/test_fake_extract.py | 90 ++++ 6 files changed, 867 insertions(+) create mode 100644 py/llama_cloud_services/testing_utils/__init__.py create mode 100644 py/llama_cloud_services/testing_utils/llama_cloud/__init__.py create mode 100644 py/llama_cloud_services/testing_utils/llama_cloud/extract.py create mode 100644 py/llama_cloud_services/testing_utils/llama_cloud/matchers.py create mode 100644 py/llama_cloud_services/testing_utils/llama_cloud/server.py create mode 100644 py/unit_tests/testing_utils/test_fake_extract.py diff --git a/py/llama_cloud_services/testing_utils/__init__.py b/py/llama_cloud_services/testing_utils/__init__.py new file mode 100644 index 00000000..4b642a59 --- /dev/null +++ b/py/llama_cloud_services/testing_utils/__init__.py @@ -0,0 +1,17 @@ +"""Testing utilities for exercising the LlamaCloud SDK offline.""" + +from llama_cloud_services.testing_utils.llama_cloud import ( + FakeLlamaCloudServer, + FileMatcher, + RequestMatcher, + SchemaMatcher, + attach_extract_api, +) + +__all__ = [ + "FakeLlamaCloudServer", + "attach_extract_api", + "FileMatcher", + "RequestMatcher", + "SchemaMatcher", +] diff --git a/py/llama_cloud_services/testing_utils/llama_cloud/__init__.py b/py/llama_cloud_services/testing_utils/llama_cloud/__init__.py new file mode 100644 index 00000000..1492526c --- /dev/null +++ b/py/llama_cloud_services/testing_utils/llama_cloud/__init__.py @@ -0,0 +1,14 @@ +"""LlamaCloud-specific fakes and helpers.""" + +from .extract import ExtractTestingApi, attach_extract_api +from .matchers import FileMatcher, RequestMatcher, SchemaMatcher +from .server import FakeLlamaCloudServer + +__all__ = [ + "ExtractTestingApi", + "FakeLlamaCloudServer", + "FileMatcher", + "RequestMatcher", + "SchemaMatcher", + "attach_extract_api", +] diff --git a/py/llama_cloud_services/testing_utils/llama_cloud/extract.py b/py/llama_cloud_services/testing_utils/llama_cloud/extract.py new file mode 100644 index 00000000..16ae5e5c --- /dev/null +++ b/py/llama_cloud_services/testing_utils/llama_cloud/extract.py @@ -0,0 +1,497 @@ +"""Extract namespace fake server implementation.""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import random +import uuid +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, List, Mapping, Optional + +from llama_cloud import ( + ExtractAgent, + ExtractConfig, + ExtractJob, + ExtractMode, + ExtractRun, + ExtractTarget, + File, + StatusEnum, +) +from llama_cloud.types import ( + ExtractSchemaValidateResponse, + FileIdPresignedUrl, + PaginatedExtractRunsResponse, +) + +from llama_cloud_services.testing_utils.llama_cloud.matchers import ( + MatcherContext, + RequestMatcher, +) +from llama_cloud_services.testing_utils.llama_cloud.server import FakeLlamaCloudServer, FakeRequest + +DEFAULT_CONFIG = ExtractConfig( + extraction_target=ExtractTarget.PER_DOC, + extraction_mode=ExtractMode.MULTIMODAL, +) + +OverrideDataProvider = Callable[[MatcherContext], Mapping[str, Any]] + + +@dataclass(slots=True) +class _ExtractOverride: + matcher: RequestMatcher + provider: OverrideDataProvider + run_status: StatusEnum + job_status: StatusEnum + + +@dataclass(slots=True) +class _FileRecord: + file: File + content: bytes + fingerprint: str + mime_type: str + + +@dataclass(slots=True) +class _JobRecord: + job: ExtractJob + run_id: str + polls: int = 0 + target_status: StatusEnum = StatusEnum.SUCCESS + success_after: int = 1 + + +class ExtractState: + def __init__(self) -> None: + self.agents: Dict[str, ExtractAgent] = {} + self.files: Dict[str, _FileRecord] = {} + self.jobs: Dict[str, _JobRecord] = {} + self.runs: Dict[str, ExtractRun] = {} + self.overrides: List[_ExtractOverride] = [] + self.default_project_id = "00000000-0000-0000-0000-000000000000" + + def next_id(self, prefix: str) -> str: + return f"{prefix}-{uuid.uuid4()}" + + @staticmethod + def now() -> datetime: + return datetime.now(timezone.utc) + + def register_override(self, override: _ExtractOverride) -> None: + self.overrides.append(override) + + def resolve_override(self, ctx: MatcherContext) -> Optional[_ExtractOverride]: + for override in reversed(self.overrides): + if override.matcher.matches(ctx): + return override + return None + + def register_file_shell(self, name: str, external_file_id: str, mime_type: str) -> FileIdPresignedUrl: + file_id = self.next_id("file") + now = self.now() + file = File( + id=file_id, + name=name, + external_file_id=external_file_id, + file_size=0, + file_type=mime_type, + created_at=now, + updated_at=now, + last_modified_at=now, + project_id=self.default_project_id, + data_source_id=self.next_id("ds"), + permission_info={"access": "private"}, + resource_info={"provider": "fake"}, + ) + self.files[file_id] = _FileRecord(file=file, content=b"", fingerprint="", mime_type=mime_type) + return FileIdPresignedUrl( + url="", + expires_at=now + timedelta(minutes=5), + form_fields={}, + file_id=file_id, + ) + + def finalize_file(self, file_id: str, data: bytes) -> None: + record = self.files[file_id] + record.content = data + record.fingerprint = hashlib.sha256(data).hexdigest() + record.file = record.file.copy( + update={ + "file_size": len(data), + "updated_at": self.now(), + "last_modified_at": self.now(), + } + ) + + def ensure_inline_file(self, name: str, data: bytes, mime_type: str) -> _FileRecord: + presigned = self.register_file_shell(name=name, external_file_id=name, mime_type=mime_type) + upload_id = presigned.file_id + self.finalize_file(upload_id, data) + return self.files[upload_id] + + def compute_schema_hash(self, schema: Mapping[str, Any]) -> str: + payload = json.dumps(schema, sort_keys=True) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + def list_runs_for_agent(self, agent_id: str) -> List[ExtractRun]: + return [run for run in self.runs.values() if run.extraction_agent_id == agent_id] + + +class ExtractTestingApi: + """Entry point for configuring and registering the fake extract endpoints.""" + + def __init__(self, server: FakeLlamaCloudServer): + self.server = server + self.state = ExtractState() + self.server.register_namespace("extract", self) + self._register_routes() + + def stub_run( + self, + *, + matcher: Optional[RequestMatcher] = None, + data: Mapping[str, Any] | OverrideDataProvider, + run_status: StatusEnum = StatusEnum.SUCCESS, + job_status: StatusEnum = StatusEnum.SUCCESS, + ) -> None: + def _provider(ctx: MatcherContext) -> Mapping[str, Any]: + return data(ctx) if callable(data) else data + + self.state.register_override( + _ExtractOverride( + matcher=matcher or RequestMatcher(), + provider=_provider, + run_status=run_status, + job_status=job_status, + ) + ) + + def _register_routes(self) -> None: + self.server.add_handler("PUT", "/api/v1/files")(self._handle_generate_presigned_url) + self.server.add_handler("PUT", "/_fake/uploads/{file_id}")(self._handle_file_upload) + self.server.add_handler("GET", "/api/v1/files/{file_id}")(self._handle_get_file) + + self.server.add_handler("POST", "/api/v1/extraction/extraction-agents")(self._handle_create_agent) + self.server.add_handler("GET", "/api/v1/extraction/extraction-agents")(self._handle_list_agents) + self.server.add_handler( + "GET", "/api/v1/extraction/extraction-agents/{extraction_agent_id}" + )(self._handle_get_agent) + self.server.add_handler( + "DELETE", "/api/v1/extraction/extraction-agents/{extraction_agent_id}" + )(self._handle_delete_agent) + self.server.add_handler( + "PUT", "/api/v1/extraction/extraction-agents/{extraction_agent_id}" + )(self._handle_update_agent) + self.server.add_handler( + "GET", "/api/v1/extraction/extraction-agents/by-name/{name}" + )(self._handle_get_agent_by_name) + self.server.add_handler( + "POST", "/api/v1/extraction/extraction-agents/schema/validation" + )(self._handle_validate_schema) + + self.server.add_handler("POST", "/api/v1/extraction/jobs")(self._handle_run_job) + self.server.add_handler("GET", "/api/v1/extraction/jobs/{job_id}")(self._handle_get_job) + self.server.add_handler( + "GET", "/api/v1/extraction/runs/by-job/{job_id}" + )(self._handle_get_run_by_job_id) + self.server.add_handler("GET", "/api/v1/extraction/runs")(self._handle_list_runs) + self.server.add_handler("DELETE", "/api/v1/extraction/runs/{run_id}")(self._handle_delete_run) + self.server.add_handler("POST", "/api/v1/extraction/run")(self._handle_stateless_run) + + def _handle_generate_presigned_url(self, request: FakeRequest) -> tuple[int, Any]: + payload = request.json or {} + name = payload.get("name") or payload.get("external_file_id") or "file" + mime = self._guess_mime_type(name) + presigned = self.state.register_file_shell( + name=name, + external_file_id=payload.get("external_file_id", name), + mime_type=mime, + ) + upload_url = request.url.copy_with(path=f"/_fake/uploads/{presigned.file_id}", query=None) + presigned = presigned.copy(update={"url": str(upload_url)}) + return 200, presigned + + def _handle_file_upload(self, request: FakeRequest) -> tuple[int, Any]: + file_id = request.path_params["file_id"] + self.state.finalize_file(file_id, request.body) + return 200, {"uploaded": True} + + def _handle_get_file(self, request: FakeRequest) -> tuple[int, Any]: + file_id = request.path_params["file_id"] + record = self.state.files[file_id] + return 200, record.file + + def _handle_create_agent(self, request: FakeRequest) -> tuple[int, Any]: + payload = request.json or {} + config = self._parse_config(payload.get("config")) + schema = payload.get("data_schema") or {} + agent_id = self.state.next_id("agent") + now = self.state.now() + agent = ExtractAgent( + id=agent_id, + name=payload["name"], + data_schema=schema, + config=config, + project_id=request.query.get("project_id") or self.state.default_project_id, + custom_configuration=None, + created_at=now, + updated_at=now, + ) + self.state.agents[agent_id] = agent + return 200, agent + + def _handle_list_agents(self, request: FakeRequest) -> tuple[int, Any]: + return 200, list(self.state.agents.values()) + + def _handle_get_agent(self, request: FakeRequest) -> tuple[int, Any]: + agent = self.state.agents[request.path_params["extraction_agent_id"]] + return 200, agent + + def _handle_get_agent_by_name(self, request: FakeRequest) -> tuple[int, Any]: + name = request.path_params["name"] + for agent in self.state.agents.values(): + if agent.name == name: + return 200, agent + return 404, {"detail": "Agent not found"} + + def _handle_delete_agent(self, request: FakeRequest) -> tuple[int, Any]: + self.state.agents.pop(request.path_params["extraction_agent_id"], None) + return 200, {} + + def _handle_update_agent(self, request: FakeRequest) -> tuple[int, Any]: + agent_id = request.path_params["extraction_agent_id"] + payload = request.json or {} + agent = self.state.agents[agent_id] + updated = agent.copy( + update={ + "data_schema": payload.get("data_schema") or agent.data_schema, + "config": self._parse_config(payload.get("config")) or agent.config, + "updated_at": self.state.now(), + } + ) + self.state.agents[agent_id] = updated + return 200, updated + + def _handle_validate_schema(self, request: FakeRequest) -> tuple[int, Any]: + payload = request.json or {} + schema = payload.get("data_schema") or {} + return 200, ExtractSchemaValidateResponse(data_schema=schema) + + def _handle_run_job(self, request: FakeRequest) -> tuple[int, Any]: + payload = request.json or {} + agent_id = payload["extraction_agent_id"] + file_id = payload["file_id"] + agent = self.state.agents[agent_id] + file_record = self.state.files[file_id] + schema = payload.get("data_schema_override") or agent.data_schema + config = self._parse_config(payload.get("config_override")) or agent.config + context = self._build_context(route="/api/v1/extraction/jobs", schema=schema, file=file_record) + job, _ = self._create_job_and_run(agent, file_record, schema, config, context) + return 200, job + + def _handle_get_job(self, request: FakeRequest) -> tuple[int, Any]: + job_id = request.path_params["job_id"] + record = self.state.jobs[job_id] + if record.job.status != record.target_status: + record.polls += 1 + if record.polls >= record.success_after: + record.job = record.job.copy(update={"status": record.target_status, "error": None}) + return 200, record.job + + def _handle_get_run_by_job_id(self, request: FakeRequest) -> tuple[int, Any]: + job_id = request.path_params["job_id"] + record = self.state.jobs[job_id] + run = self.state.runs[record.run_id] + return 200, run + + def _handle_list_runs(self, request: FakeRequest) -> tuple[int, Any]: + agent_id = request.query.get("extraction_agent_id") + skip = int(request.query.get("skip", 0)) + limit = int(request.query.get("limit", 50)) + runs = self.state.list_runs_for_agent(agent_id) if agent_id else list(self.state.runs.values()) + sliced = runs[skip : skip + limit] + response = PaginatedExtractRunsResponse(items=sliced, skip=skip, limit=limit, total=len(runs)) + return 200, response + + def _handle_delete_run(self, request: FakeRequest) -> tuple[int, Any]: + run_id = request.path_params["run_id"] + self.state.runs.pop(run_id, None) + return 200, {} + + def _handle_stateless_run(self, request: FakeRequest) -> tuple[int, Any]: + payload = request.json or {} + schema = payload.get("data_schema") or {} + config = self._parse_config(payload.get("config")) or DEFAULT_CONFIG + text = payload.get("text") + file_record: _FileRecord + if "file_id" in payload: + file_record = self.state.files[payload["file_id"]] + elif "file" in payload: + file_payload = payload["file"] + data = base64.b64decode(file_payload["data"]) + mime = file_payload.get("mime_type", "application/octet-stream") + file_record = self.state.ensure_inline_file(name="stateless-upload", data=data, mime_type=mime) + elif text is not None: + file_record = self.state.ensure_inline_file( + name="stateless-text", + data=text.encode("utf-8"), + mime_type="text/plain", + ) + else: + raise ValueError("stateless run requires file_id, file, or text") + context = self._build_context( + route="/api/v1/extraction/run", schema=schema, file=file_record, text=text + ) + agent = self._ensure_stateless_agent(schema, config) + job, _ = self._create_job_and_run(agent, file_record, schema, config, context) + return 200, job + + def _create_job_and_run( + self, + agent: ExtractAgent, + file_record: _FileRecord, + schema: Mapping[str, Any], + config: ExtractConfig, + context: MatcherContext, + ) -> tuple[ExtractJob, ExtractRun]: + job_id = self.state.next_id("job") + now = self.state.now() + override = self.state.resolve_override(context) + data = override.provider(context) if override else self._generate_data(schema, file_record, context) + run_status = override.run_status if override else StatusEnum.SUCCESS + target_status = override.job_status if override else StatusEnum.SUCCESS + run_id = self.state.next_id("run") + run = ExtractRun( + id=run_id, + job_id=job_id, + extraction_agent_id=agent.id, + data=data, + data_schema=schema, + config=config, + status=run_status, + file=file_record.file, + extraction_metadata={"source": "fake"}, + project_id=agent.project_id, + created_at=now, + updated_at=now, + error=None, + from_ui=False, + ) + job = ExtractJob( + id=job_id, + status=StatusEnum.PENDING, + file=file_record.file, + extraction_agent=agent, + error=None, + ) + self.state.runs[run_id] = run + self.state.jobs[job_id] = _JobRecord( + job=job, + run_id=run_id, + target_status=target_status, + success_after=1 if target_status == StatusEnum.SUCCESS else 2, + ) + return job, run + + def _build_context( + self, + *, + route: str, + schema: Mapping[str, Any], + file: _FileRecord, + text: Optional[str] = None, + ) -> MatcherContext: + return MatcherContext( + route=route, + schema_hash=self.state.compute_schema_hash(schema), + file_id=file.file.id, + file_name=file.file.name, + file_sha256=file.fingerprint or hashlib.sha256(file.content).hexdigest(), + mime_type=file.mime_type, + text_preview=(text or "")[:64] if text else None, + ) + + def _parse_config(self, config: Optional[Mapping[str, Any]]) -> ExtractConfig: + if config is None: + return DEFAULT_CONFIG + if isinstance(config, ExtractConfig): + return config + return ExtractConfig.parse_obj(config) + + def _ensure_stateless_agent(self, schema: Mapping[str, Any], config: ExtractConfig) -> ExtractAgent: + key = self.state.compute_schema_hash(schema) + existing = next((agent for agent in self.state.agents.values() if agent.name == f"stateless-{key}"), None) + if existing: + return existing + now = self.state.now() + agent = ExtractAgent( + id=self.state.next_id("agent"), + name=f"stateless-{key}", + data_schema=schema, + config=config, + project_id=self.state.default_project_id, + custom_configuration=None, + created_at=now, + updated_at=now, + ) + self.state.agents[agent.id] = agent + return agent + + def _generate_data( + self, schema: Mapping[str, Any], file_record: _FileRecord, context: MatcherContext + ) -> Mapping[str, Any]: + fingerprint = context.file_sha256 or hashlib.sha256(file_record.content).hexdigest() + seed = int( + hashlib.sha256(f"{context.schema_hash}:{fingerprint}".encode("utf-8")).hexdigest()[:16], + 16, + ) + rng = random.Random(seed) + return self._render_schema(schema, rng) + + def _render_schema(self, schema: Mapping[str, Any], rng: random.Random) -> Any: + if "enum" in schema: + return schema["enum"][rng.randint(0, len(schema["enum"]) - 1)] + schema_type = schema.get("type") + if schema_type == "object": + properties = schema.get("properties", {}) + return {key: self._render_schema(value, rng) for key, value in properties.items()} + if schema_type == "array": + items = schema.get("items", {"type": "string"}) + length = max(1, min(3, schema.get("minItems", 1))) + return [self._render_schema(items, rng) for _ in range(length)] + if schema_type == "number": + return round(rng.uniform(-100, 100), 2) + if schema_type == "integer": + return rng.randint(0, 100) + if schema_type == "boolean": + return rng.choice([True, False]) + return self._render_string(rng) + + @staticmethod + def _render_string(rng: random.Random) -> str: + letters = "abcdefghijklmnopqrstuvwxyz" + size = rng.randint(5, 10) + return "".join(rng.choice(letters) for _ in range(size)) + + @staticmethod + def _guess_mime_type(name: str) -> str: + extension = name.lower().split(".")[-1] if "." in name else "" + return { + "pdf": "application/pdf", + "txt": "text/plain", + "json": "application/json", + "html": "text/html", + "png": "image/png", + "jpg": "image/jpeg", + }.get(extension, "application/octet-stream") + + +def attach_extract_api(server: FakeLlamaCloudServer) -> ExtractTestingApi: + """Register extract routes on the provided fake server.""" + + return ExtractTestingApi(server) diff --git a/py/llama_cloud_services/testing_utils/llama_cloud/matchers.py b/py/llama_cloud_services/testing_utils/llama_cloud/matchers.py new file mode 100644 index 00000000..753bb28d --- /dev/null +++ b/py/llama_cloud_services/testing_utils/llama_cloud/matchers.py @@ -0,0 +1,64 @@ +"""Composable request matcher objects used by the fake server overrides.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable, Optional + + +@dataclass(slots=True) +class MatcherContext: + """Relevant bits of a request needed for matcher evaluation.""" + + route: str + schema_hash: Optional[str] + file_id: Optional[str] + file_name: Optional[str] + file_sha256: Optional[str] + mime_type: Optional[str] + text_preview: Optional[str] + + +@dataclass(slots=True) +class FileMatcher: + filename: Optional[str] = None + file_id: Optional[str] = None + sha256: Optional[str] = None + mime_type: Optional[str] = None + + def matches(self, ctx: MatcherContext) -> bool: + if self.file_id is not None and ctx.file_id != self.file_id: + return False + if self.filename is not None and ctx.file_name != self.filename: + return False + if self.sha256 is not None and ctx.file_sha256 != self.sha256: + return False + if self.mime_type is not None and ctx.mime_type != self.mime_type: + return False + return True + + +@dataclass(slots=True) +class SchemaMatcher: + hash: Optional[str] = None + + def matches(self, ctx: MatcherContext) -> bool: + if self.hash is None: + return True + return ctx.schema_hash == self.hash + + +@dataclass(slots=True) +class RequestMatcher: + file: Optional[FileMatcher] = None + schema: Optional[SchemaMatcher] = None + predicate: Optional[Callable[[MatcherContext], bool]] = None + + def matches(self, ctx: MatcherContext) -> bool: + if self.file and not self.file.matches(ctx): + return False + if self.schema and not self.schema.matches(ctx): + return False + if self.predicate and not self.predicate(ctx): + return False + return True diff --git a/py/llama_cloud_services/testing_utils/llama_cloud/server.py b/py/llama_cloud_services/testing_utils/llama_cloud/server.py new file mode 100644 index 00000000..6c57d9f2 --- /dev/null +++ b/py/llama_cloud_services/testing_utils/llama_cloud/server.py @@ -0,0 +1,185 @@ +"""Shared fake HTTP infrastructure for simulating the LlamaCloud REST API.""" + +from __future__ import annotations + +import inspect +import json +import re +from dataclasses import dataclass, field +from datetime import date, datetime +from typing import Any, Awaitable, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional + +import httpx +import respx + +DEFAULT_SAAS_URL = "https://api.cloud.llamaindex.ai" + +try: # pragma: no cover - exercised indirectly via tests + from pydantic.v1 import BaseModel as _PydanticBaseModel +except ImportError: # pragma: no cover + from pydantic import BaseModel as _PydanticBaseModel # type: ignore + +HandlerReturn = tuple[int, Any] | httpx.Response +Handler = Callable[["FakeRequest"], HandlerReturn | Awaitable[HandlerReturn]] + + +@dataclass(slots=True) +class FakeRequest: + """Simplified view of an httpx request for handler consumption.""" + + method: str + url: httpx.URL + headers: Mapping[str, str] + query: Dict[str, str] + json: Any + body: bytes + path_params: Dict[str, str] = field(default_factory=dict) + + def get_header(self, key: str, default: Optional[str] = None) -> Optional[str]: + return self.headers.get(key.lower(), default) + + +@dataclass(slots=True) +class _RouteDescriptor: + method: str + path_template: str + handler: Handler + + def parse_path_params(self, path: str) -> Dict[str, str]: + template_segments = [segment for segment in self.path_template.strip("/").split("/") if segment] + path_segments = [segment for segment in path.strip("/").split("/") if segment] + if len(template_segments) != len(path_segments): + return {} + params: Dict[str, str] = {} + for template_segment, path_segment in zip(template_segments, path_segments): + if template_segment.startswith("{") and template_segment.endswith("}"): + params[template_segment[1:-1]] = path_segment + elif template_segment != path_segment: + return {} + return params + + +class FakeLlamaCloudServer: + """Context-managed respx router populated with fake LlamaCloud routes.""" + + def __init__(self, base_urls: Iterable[str] | None = None): + provided = [self._normalize(url) for url in base_urls or [] if url] + if not provided: + provided = [DEFAULT_SAAS_URL] + ordered: List[str] = [] + for url in [*provided, DEFAULT_SAAS_URL]: + if url not in ordered: + ordered.append(url) + self.primary_base_url: str = ordered[0] + self.base_urls: tuple[str, ...] = tuple(ordered) + self.router = respx.MockRouter(assert_all_called=False) + self._routes: List[_RouteDescriptor] = [] + self.namespaces: Dict[str, Any] = {} + + def __enter__(self) -> "FakeLlamaCloudServer": + self.router.__enter__() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.router.__exit__(exc_type, exc, tb) + + def register_namespace(self, name: str, namespace: Any) -> None: + self.namespaces[name] = namespace + + def add_handler( + self, method: str, path_template: str, handler: Optional[Handler] = None + ) -> Callable[[Handler], Handler]: + """Register a handler for every configured base URL.""" + + def decorator(func: Handler) -> Handler: + descriptor = _RouteDescriptor(method=method.upper(), path_template=path_template, handler=func) + self._routes.append(descriptor) + for base in self.base_urls: + pattern = self._compile_regex(base, path_template) + self.router.route(method=method.upper(), url__regex=pattern).mock( + side_effect=lambda request, desc=descriptor: self._invoke(desc, request) + ) + return func + + if handler is not None: + return decorator(handler) + return decorator + + async def _invoke(self, descriptor: _RouteDescriptor, request: httpx.Request) -> httpx.Response: + body = await request.aread() + headers = {k.lower(): v for k, v in request.headers.items()} + fake_request = FakeRequest( + method=request.method, + url=request.url, + headers=headers, + query={k: v for k, v in request.url.params.multi_items()}, + json=self._maybe_parse_json(body, headers.get("content-type")), + body=body, + path_params=descriptor.parse_path_params(request.url.path), + ) + result = descriptor.handler(fake_request) + if inspect.isawaitable(result): + result = await result # type: ignore[assignment] + return self._to_response(request, result) + + @staticmethod + def _maybe_parse_json(body: bytes, content_type: Optional[str]) -> Any: + if not body: + return None + if content_type and "application/json" not in content_type: + try: + return json.loads(body.decode("utf-8")) + except json.JSONDecodeError: + return None + try: + return json.loads(body.decode("utf-8")) + except json.JSONDecodeError: + return None + + @staticmethod + def _normalize(url: str) -> str: + return url.rstrip("/") + + @staticmethod + def _compile_regex(base: str, template: str) -> re.Pattern[str]: + base_clean = base.rstrip("/") + template_segments = [segment for segment in template.strip("/").split("/") if segment] + regex_segments: List[str] = [] + for segment in template_segments: + if segment.startswith("{") and segment.endswith("}"): + regex_segments.append(r"[^/]+") + else: + regex_segments.append(re.escape(segment)) + pattern = f"^{re.escape(base_clean)}/" + "/".join(regex_segments) + "$" + return re.compile(pattern) + + @staticmethod + def _to_response(request: httpx.Request, result: HandlerReturn) -> httpx.Response: + if isinstance(result, httpx.Response): + return result + status, payload = result + if isinstance(payload, httpx.Response): + return payload + content: bytes + headers: MutableMapping[str, str] = {} + if payload is None: + content = b"" + elif isinstance(payload, bytes): + content = payload + elif isinstance(payload, str): + content = payload.encode("utf-8") + headers["content-type"] = "text/plain; charset=utf-8" + else: + content = json.dumps(payload, default=FakeLlamaCloudServer._json_default).encode("utf-8") + headers["content-type"] = "application/json" + return httpx.Response(status_code=status, content=content, headers=headers, request=request) + + @staticmethod + def _json_default(value: Any) -> Any: + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, _PydanticBaseModel): + return value.dict() + if hasattr(value, "value"): + return getattr(value, "value") + raise TypeError(f"Object of type {type(value)} is not JSON serializable") diff --git a/py/unit_tests/testing_utils/test_fake_extract.py b/py/unit_tests/testing_utils/test_fake_extract.py new file mode 100644 index 00000000..28917362 --- /dev/null +++ b/py/unit_tests/testing_utils/test_fake_extract.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import hashlib +from pathlib import Path + +from llama_cloud import ExtractConfig, ExtractMode, ExtractTarget, StatusEnum + +from llama_cloud_services.extract import LlamaExtract +from llama_cloud_services.testing_utils import ( + FakeLlamaCloudServer, + FileMatcher, + RequestMatcher, + attach_extract_api, +) + + +def _write_pdf(tmp_path: Path, name: str, contents: bytes) -> Path: + target = tmp_path / name + target.write_bytes(contents) + return target + + +def _schema() -> dict[str, object]: + return { + "type": "object", + "properties": { + "title": {"type": "string"}, + "total": {"type": "number"}, + "paid": {"type": "boolean"}, + }, + } + + +def _config() -> ExtractConfig: + return ExtractConfig( + extraction_mode=ExtractMode.FAST, + extraction_target=ExtractTarget.PER_DOC, + ) + + +def test_stateless_extract_is_deterministic(tmp_path: Path) -> None: + pdf = _write_pdf(tmp_path, "receipt.pdf", b"fake pdf bytes") + schema = _schema() + config = _config() + + with FakeLlamaCloudServer() as server: + attach_extract_api(server) + extractor = LlamaExtract(api_key="test-key", base_url=server.primary_base_url, verify=False) + + run_one = extractor.extract(schema, config, pdf) + run_two = extractor.extract(schema, config, pdf) + + assert run_one.data == run_two.data + assert run_one.status == run_two.status == StatusEnum.SUCCESS + + +def test_stateless_extract_can_be_stubbed(tmp_path: Path) -> None: + pdf = _write_pdf(tmp_path, "noisebridge.pdf", b"noisebridge receipt") + schema = _schema() + config = _config() + expected_sha = hashlib.sha256(pdf.read_bytes()).hexdigest() + + with FakeLlamaCloudServer() as server: + extract_api = attach_extract_api(server) + extract_api.stub_run( + matcher=RequestMatcher(file=FileMatcher(sha256=expected_sha)), + data={"title": "Noisebridge", "total": 42.0, "paid": True}, + ) + extractor = LlamaExtract(api_key="test-key", base_url=server.primary_base_url, verify=False) + + run = extractor.extract(schema, config, pdf) + + assert run.data["title"] == "Noisebridge" + assert run.data["total"] == 42.0 + assert run.data["paid"] is True + + +def test_agent_based_flow_uses_fake_server(tmp_path: Path) -> None: + pdf = _write_pdf(tmp_path, "invoice.pdf", b"agent file") + schema = _schema() + + with FakeLlamaCloudServer() as server: + attach_extract_api(server) + extractor = LlamaExtract(api_key="test-key", base_url=server.primary_base_url, verify=False) + + agent = extractor.create_agent("unit-agent", schema) + run = agent.extract(pdf) + + assert run.file.name.endswith("invoice.pdf") + assert "title" in run.data and "total" in run.data From 238d4ba45ce013ac87078f6d16307b891558475b Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 24 Nov 2025 00:30:42 +0000 Subject: [PATCH 2/2] feat: Add fake LlamaCloud server for local testing Co-authored-by: adrian --- py/README.md | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/py/README.md b/py/README.md index d07c5807..60e794a7 100644 --- a/py/README.md +++ b/py/README.md @@ -72,6 +72,80 @@ index = LlamaCloudIndex( You can see complete SDK and API documentation for each service on [our official docs](https://docs.cloud.llamaindex.ai/). +## Local Testing Utilities + +The Python package now ships a lightweight fake LlamaCloud server that lets you run +offline tests without touching the real SaaS environment. The utilities live under +`llama_cloud_services.testing_utils` and are powered by [`respx`](https://respx.dev) +so any `httpx` client constructed by the SDK is seamlessly intercepted. + +### Quick start + +```python +import os +from pathlib import Path +from llama_cloud_services.extract import LlamaExtract, ExtractConfig, ExtractTarget +from llama_cloud_services.testing_utils import FakeLlamaCloudServer, attach_extract_api + +schema = { + "type": "object", + "properties": {"title": {"type": "string"}, "total": {"type": "number"}}, +} +config = ExtractConfig(extraction_target=ExtractTarget.PER_DOC) +pdf_path = Path("tests/fixtures/receipt.pdf") + +with FakeLlamaCloudServer(base_urls=[os.environ.get("LLAMA_CLOUD_BASE_URL", "")]) as fake: + attach_extract_api(fake) # registers the extract routes, more namespaces coming soon + extractor = LlamaExtract(api_key="test-key", base_url=fake.primary_base_url, verify=False) + run = extractor.extract(schema, config, pdf_path) + assert run.status.value == "SUCCESS" +``` + +Use the same server for stateful agent tests: + +```python +from llama_cloud_services.testing_utils import FileMatcher, RequestMatcher + +with FakeLlamaCloudServer() as fake: + extract_api = attach_extract_api(fake) + extract_api.stub_run( + matcher=RequestMatcher(file=FileMatcher(filename="noisebridge.pdf")), + data={"title": "Noisebridge", "total": 42.0}, + ) + extractor = LlamaExtract(api_key="test", base_url=fake.primary_base_url, verify=False) + agent = extractor.create_agent("unit-agent", schema) + run = agent.extract(pdf_path) + assert run.data["title"] == "Noisebridge" +``` + +### Reference + +- `FakeLlamaCloudServer(base_urls: Iterable[str] | None = None)` + - Context manager that installs a shared `respx.MockRouter`. The first non-empty + base URL becomes `primary_base_url` for convenience. + - Use `register_namespace(name, obj)` to store helpers on the server if needed. + - `add_handler(method, path_template)` decorator registers HTTP handlers for all + configured base URLs (e.g., both the SaaS default and a custom LLAMA_CLOUD_BASE_URL). + +- `attach_extract_api(server: FakeLlamaCloudServer) -> ExtractTestingApi` + - Registers the `/api/v1/files`, `/api/v1/extraction/*`, and stateless run routes. + - Manages in-memory state for files, agents, jobs, and runs so the public SDK surface + works exactly as it would against production. + +- `ExtractTestingApi.stub_run(...)` + - Allows fine-grained overrides of generated extraction results. Accepts an optional + `RequestMatcher` plus either a dict payload or a callable that receives a + `MatcherContext`. + - You can also override the job/run statuses via the `run_status`/`job_status` + keyword arguments to simulate failures. + +- Matchers (`RequestMatcher`, `FileMatcher`, `SchemaMatcher`) + - Compose match conditions across file metadata (filename, SHA256, MIME type), + schema hash, or custom predicates to pin overrides to specific requests. + +See `unit_tests/testing_utils/test_fake_extract.py` for more end-to-end examples that +cover deterministic stateless extraction, custom overrides, and agent-based flows. + ## Terms of Service See the [Terms of Service Here](../TOS.pdf).