-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add local timestamps to request and response models - include provider timestamp in provider_details
#3598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add local timestamps to request and response models - include provider timestamp in provider_details
#3598
Changes from 10 commits
8d52d65
8c69124
de6d989
97cad05
e041398
ed199ca
47af45e
f9efa8a
7722ee9
bf1c640
76aad15
0e4500a
29e84ae
5d0eff6
b895578
54bcb92
bad15b5
9f9bf06
4176971
8652eb1
e811f4e
a501928
8e1670e
702b216
754c782
5327aea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
| from pydantic_ai._function_schema import _takes_ctx as is_takes_ctx # type: ignore | ||
| from pydantic_ai._instrumentation import DEFAULT_INSTRUMENTATION_VERSION | ||
| from pydantic_ai._tool_manager import ToolManager | ||
| from pydantic_ai._utils import dataclasses_no_defaults_repr, get_union_args, is_async_callable, run_in_executor | ||
| from pydantic_ai._utils import dataclasses_no_defaults_repr, get_union_args, is_async_callable, now_utc, run_in_executor | ||
| from pydantic_ai.builtin_tools import AbstractBuiltinTool | ||
| from pydantic_graph import BaseNode, GraphRunContext | ||
| from pydantic_graph.beta import Graph, GraphBuilder | ||
|
|
@@ -437,6 +437,7 @@ async def stream( | |
| assert not self._did_stream, 'stream() should only be called once per node' | ||
|
|
||
| model_settings, model_request_parameters, message_history, run_context = await self._prepare_request(ctx) | ||
| self.request.timestamp = now_utc() | ||
|
||
| async with ctx.deps.model.request_stream( | ||
| message_history, model_settings, model_request_parameters, run_context | ||
| ) as streamed_response: | ||
|
|
@@ -469,6 +470,7 @@ async def _make_request( | |
| return self._result # pragma: no cover | ||
|
|
||
| model_settings, model_request_parameters, message_history, _ = await self._prepare_request(ctx) | ||
| self.request.timestamp = now_utc() | ||
| model_response = await ctx.deps.model.request(message_history, model_settings, model_request_parameters) | ||
| ctx.state.usage.requests += 1 | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -994,6 +994,9 @@ class ModelRequest: | |
|
|
||
| _: KW_ONLY | ||
|
|
||
| timestamp: datetime = field(default_factory=_now_utc) | ||
|
||
| """The timestamp when the request was sent to the model.""" | ||
|
|
||
| instructions: str | None = None | ||
| """The instructions for the model.""" | ||
|
|
||
|
|
@@ -1235,9 +1238,10 @@ class ModelResponse: | |
| """The name of the model that generated the response.""" | ||
|
|
||
| timestamp: datetime = field(default_factory=_now_utc) | ||
| """The timestamp of the response. | ||
| """The timestamp when the response was received locally. | ||
|
|
||
| If the model provides a timestamp in the response (as OpenAI does) that will be used. | ||
| This is always a high-precision local datetime. Provider-specific timestamps | ||
| (if available) are stored in `provider_details['timestamp']`. | ||
| """ | ||
|
|
||
| kind: Literal['response'] = 'response' | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -491,6 +491,8 @@ def _process_response(self, response: GenerateContentResponse) -> ModelResponse: | |
| raw_finish_reason = candidate.finish_reason | ||
| if raw_finish_reason: # pragma: no branch | ||
| vendor_details = {'finish_reason': raw_finish_reason.value} | ||
| if response.create_time is not None: | ||
|
||
| vendor_details['timestamp'] = response.create_time | ||
| finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) | ||
|
|
||
| if candidate.content is None or candidate.content.parts is None: | ||
|
|
@@ -528,9 +530,10 @@ async def _process_streamed_response( | |
| model_request_parameters=model_request_parameters, | ||
| _model_name=first_chunk.model_version or self._model_name, | ||
| _response=peekable_response, | ||
| _timestamp=first_chunk.create_time or _utils.now_utc(), | ||
| _timestamp=_utils.now_utc(), | ||
|
||
| _provider_name=self._provider.name, | ||
| _provider_url=self._provider.base_url, | ||
| _provider_timestamp=first_chunk.create_time, | ||
| ) | ||
|
|
||
| async def _map_messages( | ||
|
|
@@ -655,6 +658,7 @@ class GeminiStreamedResponse(StreamedResponse): | |
| _timestamp: datetime | ||
| _provider_name: str | ||
| _provider_url: str | ||
| _provider_timestamp: datetime | None = None | ||
|
|
||
| async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901 | ||
| code_execution_tool_call_id: str | None = None | ||
|
|
@@ -670,9 +674,15 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: | |
| self.provider_response_id = chunk.response_id | ||
|
|
||
| raw_finish_reason = candidate.finish_reason | ||
| provider_details_dict: dict[str, Any] = {} | ||
| if raw_finish_reason: | ||
| self.provider_details = {'finish_reason': raw_finish_reason.value} | ||
| provider_details_dict['finish_reason'] = raw_finish_reason.value | ||
| self.finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) | ||
| if self._provider_timestamp is not None: | ||
| # _provider_timestamp is always None in Google streaming cassettes | ||
| provider_details_dict['timestamp'] = self._provider_timestamp # pragma: no cover | ||
| if provider_details_dict: | ||
| self.provider_details = provider_details_dict | ||
|
|
||
| # Google streams the grounding metadata (including the web search queries and results) | ||
| # _after_ the text that was generated using it, so it would show up out of order in the stream, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -320,7 +320,7 @@ async def _completions_create( | |
|
|
||
| def _process_response(self, response: chat.ChatCompletion) -> ModelResponse: | ||
| """Process a non-streamed response, and prepare a message to return.""" | ||
| timestamp = number_to_datetime(response.created) | ||
| timestamp = _utils.now_utc() | ||
|
||
| choice = response.choices[0] | ||
| items: list[ModelResponsePart] = [] | ||
| if choice.message.reasoning is not None: | ||
|
|
@@ -340,7 +340,9 @@ def _process_response(self, response: chat.ChatCompletion) -> ModelResponse: | |
| items.append(ToolCallPart(tool_name=c.function.name, args=c.function.arguments, tool_call_id=c.id)) | ||
|
|
||
| raw_finish_reason = choice.finish_reason | ||
| provider_details = {'finish_reason': raw_finish_reason} | ||
| provider_details: dict[str, Any] = {'finish_reason': raw_finish_reason} | ||
| if response.created: # pragma: no branch | ||
| provider_details['timestamp'] = number_to_datetime(response.created) | ||
| finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) | ||
| return ModelResponse( | ||
| parts=items, | ||
|
|
@@ -369,8 +371,9 @@ async def _process_streamed_response( | |
| _response=peekable_response, | ||
| _model_name=first_chunk.model, | ||
| _model_profile=self.profile, | ||
| _timestamp=number_to_datetime(first_chunk.created), | ||
| _timestamp=_utils.now_utc(), | ||
| _provider_name=self._provider.name, | ||
| _provider_timestamp=first_chunk.created, | ||
| ) | ||
|
|
||
| def _get_tools(self, model_request_parameters: ModelRequestParameters) -> list[chat.ChatCompletionToolParam]: | ||
|
|
@@ -524,6 +527,7 @@ class GroqStreamedResponse(StreamedResponse): | |
| _response: AsyncIterable[chat.ChatCompletionChunk] | ||
| _timestamp: datetime | ||
| _provider_name: str | ||
| _provider_timestamp: int | None = None | ||
|
|
||
| async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: # noqa: C901 | ||
| try: | ||
|
|
@@ -541,9 +545,14 @@ async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]: | |
| except IndexError: | ||
| continue | ||
|
|
||
| provider_details_dict: dict[str, Any] = {} | ||
| if raw_finish_reason := choice.finish_reason: | ||
| self.provider_details = {'finish_reason': raw_finish_reason} | ||
| provider_details_dict['finish_reason'] = raw_finish_reason | ||
| self.finish_reason = _FINISH_REASON_MAP.get(raw_finish_reason) | ||
| if self._provider_timestamp is not None: # pragma: no branch | ||
| provider_details_dict['timestamp'] = number_to_datetime(self._provider_timestamp) | ||
| if provider_details_dict: # pragma: no branch | ||
| self.provider_details = provider_details_dict | ||
|
|
||
| if choice.delta.reasoning is not None: | ||
| if not reasoning: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this from this PR please ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will go away when I update the branch