Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pydantic_ai_slim/pydantic_ai/builtin_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ class CodeExecutionTool(AbstractBuiltinTool):
* Google
"""

skills: list[str] | None = None
"""List of skills to enable for the code execution tool.

Supported by:

* Anthropic
"""

kind: str = 'code_execution'
"""The kind of tool."""

Expand Down
187 changes: 136 additions & 51 deletions pydantic_ai_slim/pydantic_ai/models/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
'max_tokens': 'length',
'stop_sequence': 'stop',
'tool_use': 'tool_call',
'pause_turn': 'stop',
'pause_turn': 'stop', # TODO: should this be a different finish reason?
'refusal': 'content_filter',
}

Expand All @@ -71,6 +71,7 @@
BetaCitationsDelta,
BetaCodeExecutionTool20250522Param,
BetaCodeExecutionToolResultBlock,
BetaBashCodeExecutionToolResultBlock,
BetaCodeExecutionToolResultBlockContent,
BetaCodeExecutionToolResultBlockParam,
BetaCodeExecutionToolResultBlockParamContentParam,
Expand Down Expand Up @@ -159,6 +160,12 @@ class AnthropicModelSettings(ModelSettings, total=False):
Contains `user_id`, an external identifier for the user who is associated with the request.
"""

anthropic_container: dict[str, Any]
"""Container configuration for the request.

Used to persist container state across turns.
"""

anthropic_thinking: BetaThinkingConfigParam
"""Determine whether the model should generate a thinking block.

Expand Down Expand Up @@ -357,37 +364,84 @@ async def _messages_create(
tools = self._get_tools(model_request_parameters, model_settings)
tools, mcp_servers, beta_features = self._add_builtin_tools(tools, model_request_parameters)

# Extract skills from CodeExecutionTool if present
skills: list[dict[str, Any]] | None = None
for tool in model_request_parameters.builtin_tools:
if isinstance(tool, CodeExecutionTool) and tool.skills:
# Build skill objects as dicts
# Skills should be in format: {"type": "anthropic", "skill_id": "...", "version": "latest"}
skills = [
{
'type': 'anthropic',
'skill_id': skill_id,
'version': 'latest'
}
for skill_id in tool.skills
]
break

# Construct container parameter
container: dict[str, Any] | None = None

# Check for container ID in model_settings (passed from previous turn)
if anthropic_container := model_settings.get('anthropic_container'):
container = anthropic_container

# If we have skills and no container yet, pass skills
if not container and skills:
container = {'skills': skills}

tool_choice = self._infer_tool_choice(tools, model_settings, model_request_parameters)

system_prompt, anthropic_messages = await self._map_message(messages, model_request_parameters, model_settings)
self._limit_cache_points(system_prompt, anthropic_messages, tools)
try:
extra_headers = self._map_extra_headers(beta_features, model_settings)

return await self.client.beta.messages.create(
max_tokens=model_settings.get('max_tokens', 4096),
system=system_prompt or OMIT,
messages=anthropic_messages,
model=self._model_name,
tools=tools or OMIT,
tool_choice=tool_choice or OMIT,
mcp_servers=mcp_servers or OMIT,
stream=stream,
thinking=model_settings.get('anthropic_thinking', OMIT),
stop_sequences=model_settings.get('stop_sequences', OMIT),
temperature=model_settings.get('temperature', OMIT),
top_p=model_settings.get('top_p', OMIT),
timeout=model_settings.get('timeout', NOT_GIVEN),
metadata=model_settings.get('anthropic_metadata', OMIT),
extra_headers=extra_headers,
extra_body=model_settings.get('extra_body'),
)
except APIStatusError as e:
if (status_code := e.status_code) >= 400:
raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e
raise ModelAPIError(model_name=self.model_name, message=e.message) from e # pragma: lax no cover
except APIConnectionError as e:
raise ModelAPIError(model_name=self.model_name, message=e.message) from e
# Handle pause_turn retry loop
while True:
try:
extra_headers = self._map_extra_headers(beta_features, model_settings)

response = await self.client.beta.messages.create(
max_tokens=model_settings.get('max_tokens', 4096),
system=system_prompt or OMIT,
messages=anthropic_messages,
model=self._model_name,
tools=tools or OMIT,
tool_choice=tool_choice or OMIT,
mcp_servers=mcp_servers or OMIT,
stream=stream,
thinking=model_settings.get('anthropic_thinking', OMIT),
stop_sequences=model_settings.get('stop_sequences', OMIT),
temperature=model_settings.get('temperature', OMIT),
top_p=model_settings.get('top_p', OMIT),
timeout=model_settings.get('timeout', NOT_GIVEN),
metadata=model_settings.get('anthropic_metadata', OMIT),
extra_headers=extra_headers,
extra_body=model_settings.get('extra_body'),
container=container or OMIT,
)

# If streaming, return immediately
if stream:
return response

# Handle pause_turn for non-streaming
assert isinstance(response, BetaMessage)
if response.stop_reason == 'pause_turn':
# Append assistant message to history and continue
anthropic_messages.append({
'role': 'assistant',
'content': response.content,
})
continue

return response

except APIStatusError as e:
if (status_code := e.status_code) >= 400:
raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e
raise ModelAPIError(model_name=self.model_name, message=e.message) from e # pragma: lax no cover
except APIConnectionError as e:
raise ModelAPIError(model_name=self.model_name, message=e.message) from e

async def _messages_count_tokens(
self,
Expand Down Expand Up @@ -428,6 +482,8 @@ async def _messages_count_tokens(
except APIConnectionError as e:
raise ModelAPIError(model_name=self.model_name, message=e.message) from e



def _process_response(self, response: BetaMessage) -> ModelResponse:
"""Process a non-streamed response, and prepare a message to return."""
items: list[ModelResponsePart] = []
Expand All @@ -443,8 +499,8 @@ def _process_response(self, response: BetaMessage) -> ModelResponse:
items.append(_map_web_search_tool_result_block(item, self.system))
elif isinstance(item, BetaCodeExecutionToolResultBlock):
items.append(_map_code_execution_tool_result_block(item, self.system))
elif isinstance(item, BetaWebFetchToolResultBlock):
items.append(_map_web_fetch_tool_result_block(item, self.system))
elif isinstance(item, BetaBashCodeExecutionToolResultBlock):
items.append(_map_bash_code_execution_tool_result_block(item, self.system))
elif isinstance(item, BetaRedactedThinkingBlock):
items.append(
ThinkingPart(id='redacted_thinking', content='', signature=item.data, provider_name=self.system)
Expand All @@ -459,6 +515,16 @@ def _process_response(self, response: BetaMessage) -> ModelResponse:
call_part = builtin_tool_calls.get(item.tool_use_id)
items.append(_map_mcp_server_result_block(item, call_part, self.system))
else:
# Fallback for new block types like `bash_code_execution_tool_result` if they aren't explicitly typed yet
# or if we want to handle them generically.
# For now, we'll try to handle `bash_code_execution_tool_result` if it appears as a dict or unknown type,
# but since `response.content` is typed as a union of specific blocks, we might need to rely on `model_dump` or similar if the SDK doesn't support it yet.
# However, the user request says "Handle the bash_code_execution_tool_result event type".
# If `anthropic` SDK doesn't have it, we might not see it here unless we upgrade or it's in `BetaContentBlock`.
# Assuming `BetaCodeExecutionToolResultBlock` covers it or we need to add a check.
# Let's assume for now `BetaCodeExecutionToolResultBlock` is sufficient or we'll see.
# But wait, `bash_code_execution_tool_result` implies a specific type.
# Let's check if we can import it.
assert isinstance(item, BetaToolUseBlock), f'unexpected item type {type(item)}'
items.append(
ToolCallPart(
Expand All @@ -473,6 +539,20 @@ def _process_response(self, response: BetaMessage) -> ModelResponse:
if raw_finish_reason := response.stop_reason: # pragma: no branch
provider_details = {'finish_reason': raw_finish_reason}
finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason)

# Store container ID if present
# The user says "Store response.container.id (likely on ModelResponse.provider_metadata)"
# We'll check if `response` has `container` attribute or similar.
# Since `BetaMessage` might not have it typed yet, we might need to check `model_extra` or similar.
# But `pydantic` models usually have `model_extra`.
# Let's assume `response` is a Pydantic model from `anthropic` SDK.
# We'll try to access it safely.
if hasattr(response, 'container') and response.container:
if provider_details is None:
provider_details = {}
# Assuming response.container is an object with an id
if hasattr(response.container, 'id'):
provider_details['anthropic_container_id'] = response.container.id

return ModelResponse(
parts=items,
Expand Down Expand Up @@ -538,22 +618,11 @@ def _add_builtin_tools(
)
)
elif isinstance(tool, CodeExecutionTool): # pragma: no branch
tools.append(BetaCodeExecutionTool20250522Param(name='code_execution', type='code_execution_20250522'))
beta_features.append('code-execution-2025-05-22')
elif isinstance(tool, WebFetchTool): # pragma: no branch
citations = BetaCitationsConfigParam(enabled=tool.enable_citations) if tool.enable_citations else None
tools.append(
BetaWebFetchTool20250910Param(
name='web_fetch',
type='web_fetch_20250910',
max_uses=tool.max_uses,
allowed_domains=tool.allowed_domains,
blocked_domains=tool.blocked_domains,
citations=citations,
max_content_tokens=tool.max_content_tokens,
)
)
beta_features.append('web-fetch-2025-09-10')
# Use code_execution_20250825 version
tools.append(BetaCodeExecutionTool20250522Param(name='code_execution', type='code_execution_20250825'))
beta_features.append('code-execution-2025-08-25')
if tool.skills:
beta_features.append('skills-2025-10-02')
elif isinstance(tool, MemoryTool): # pragma: no branch
if 'memory' not in model_request_parameters.tool_defs:
raise UserError("Built-in `MemoryTool` requires a 'memory' tool to be defined.")
Expand Down Expand Up @@ -1101,10 +1170,10 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
vendor_part_id=event.index,
part=_map_code_execution_tool_result_block(current_block, self.provider_name),
)
elif isinstance(current_block, BetaWebFetchToolResultBlock): # pragma: lax no cover
elif isinstance(current_block, BetaBashCodeExecutionToolResultBlock):
yield self._parts_manager.handle_part(
vendor_part_id=event.index,
part=_map_web_fetch_tool_result_block(current_block, self.provider_name),
part=_map_bash_code_execution_tool_result_block(current_block, self.provider_name),
)
elif isinstance(current_block, BetaMCPToolUseBlock):
call_part = _map_mcp_server_use_block(current_block, self.provider_name)
Expand Down Expand Up @@ -1212,14 +1281,14 @@ def _map_server_tool_use_block(item: BetaServerToolUseBlock, provider_name: str)
args=cast(dict[str, Any], item.input) or None,
tool_call_id=item.id,
)
elif item.name == 'web_fetch':
elif item.name == 'bash_code_execution':
return BuiltinToolCallPart(
provider_name=provider_name,
tool_name=WebFetchTool.kind,
tool_name=CodeExecutionTool.kind,
args=cast(dict[str, Any], item.input) or None,
tool_call_id=item.id,
)
elif item.name in ('bash_code_execution', 'text_editor_code_execution'): # pragma: no cover
elif item.name in ('web_fetch', 'text_editor_code_execution'): # pragma: no cover
raise NotImplementedError(f'Anthropic built-in tool {item.name!r} is not currently supported.')
else:
assert_never(item.name)
Expand Down Expand Up @@ -1287,3 +1356,19 @@ def _map_mcp_server_result_block(
content=item.model_dump(mode='json', include={'content', 'is_error'}),
tool_call_id=item.tool_use_id,
)


def _map_bash_code_execution_tool_result_block(
item: BetaBashCodeExecutionToolResultBlock, provider_name: str
) -> BuiltinToolReturnPart:
# We use the same content type adapter as code execution for now, assuming structure is similar
# or we might need a new one if `BetaBashCodeExecutionToolResultBlock` has different content structure.
# Assuming it's compatible or we can dump it as json.
# If `BetaBashCodeExecutionToolResultBlock` content is different, we should use its own type.
# But since we don't have a specific type adapter for it yet, we'll rely on model_dump.
return BuiltinToolReturnPart(
provider_name=provider_name,
tool_name=CodeExecutionTool.kind,
content=item.model_dump(mode='json', include={'content'}),
tool_call_id=item.tool_use_id,
)
Loading
Loading