Skip to content

Commit 9f7a2fb

Browse files
authored
absolute timeout fix (#42652)
1 parent 5a9f158 commit 9f7a2fb

24 files changed

+813
-102
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#### Breaking Changes
99

1010
#### Bugs Fixed
11+
* Fixed bug where client timeout/read_timeout values were not properly enforced[PR 42652](https://github.com/Azure/azure-sdk-for-python/pull/42652).
1112
* Fixed bug when passing in None for some option in `query_items` would cause unexpected errors. See [PR 44098](https://github.com/Azure/azure-sdk-for-python/pull/44098)
1213

1314
#### Other Changes

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"""
2424

2525
import base64
26+
import time
2627
from email.utils import formatdate
2728
import json
2829
import uuid
@@ -113,6 +114,13 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
113114
for key, value in _COMMON_OPTIONS.items():
114115
if key in kwargs:
115116
options[value] = kwargs.pop(key)
117+
if 'read_timeout' in kwargs:
118+
options['read_timeout'] = kwargs['read_timeout']
119+
if 'timeout' in kwargs:
120+
options['timeout'] = kwargs['timeout']
121+
122+
123+
options[Constants.OperationStartTime] = time.time()
116124
if_match, if_none_match = _get_match_headers(kwargs)
117125
if if_match:
118126
options['accessCondition'] = {'type': 'IfMatch', 'condition': if_match}

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,21 @@
2626
from typing_extensions import Literal
2727
# cspell:ignore PPAF
2828

29-
# cspell:ignore reranker
29+
class TimeoutScope:
30+
"""Defines the scope of timeout application"""
31+
OPERATION: Literal["operation"] = "operation" # Apply timeout to entire logical operation
32+
PAGE: Literal["page"] = "page" # Apply timeout to individual page requests
3033

34+
# cspell:ignore reranker
3135

3236
class _Constants:
3337
"""Constants used in the azure-cosmos package"""
3438

3539
UserConsistencyPolicy: Literal["userConsistencyPolicy"] = "userConsistencyPolicy"
3640
DefaultConsistencyLevel: Literal["defaultConsistencyLevel"] = "defaultConsistencyLevel"
41+
OperationStartTime: Literal["operationStartTime"] = "operationStartTime"
42+
# whether to apply timeout to the whole logical operation or just a page request
43+
TimeoutScope: Literal["timeoutScope"] = "timeoutScope"
3744

3845
# GlobalDB related constants
3946
WritableLocations: Literal["writableLocations"] = "writableLocations"

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,6 +3172,18 @@ def __QueryFeed( # pylint: disable=too-many-locals, too-many-statements, too-ma
31723172
"""
31733173
if options is None:
31743174
options = {}
3175+
read_timeout = options.get("read_timeout")
3176+
if read_timeout is not None:
3177+
# we currently have a gap where kwargs are not getting passed correctly down the pipeline. In order to make
3178+
# absolute time out work, we are passing read_timeout via kwargs as a temporary fix
3179+
kwargs.setdefault("read_timeout", read_timeout)
3180+
3181+
operation_start_time = options.get(Constants.OperationStartTime)
3182+
if operation_start_time is not None:
3183+
kwargs.setdefault(Constants.OperationStartTime, operation_start_time)
3184+
timeout = options.get("timeout")
3185+
if timeout is not None:
3186+
kwargs.setdefault("timeout", timeout)
31753187

31763188
if query:
31773189
__GetBodiesFromQueryResult = result_fn

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,17 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
131131
return fetched_items
132132

133133
async def _fetch_items_helper_with_retries(self, fetch_function):
134-
async def callback():
134+
# TODO: Properly propagate kwargs from retry utility to fetch function
135+
# the callback keep the **kwargs parameter to maintain compatibility with the retry utility's execution pattern.
136+
# ExecuteAsync passes retry context parameters (timeout, operation start time, logger, etc.)
137+
# The callback need to accept these parameters even if unused
138+
# Removing **kwargs results in a TypeError when ExecuteAsync tries to pass these parameters
139+
async def callback(**kwargs): # pylint: disable=unused-argument
135140
return await self._fetch_items_helper_no_retries(fetch_function)
136141

137-
return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)
142+
return await _retry_utility_async.ExecuteAsync(
143+
self._client, self._client._global_endpoint_manager, callback, **self._options
144+
)
138145

139146

140147
class _DefaultQueryExecutionContext(_QueryExecutionContextBase):

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,19 @@ def __init__(self, client, resource_link, query, options, fetch_function,
6666
async def _create_execution_context_with_query_plan(self):
6767
self._fetched_query_plan = True
6868
query_to_use = self._query if self._query is not None else "Select * from root r"
69-
query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway
70-
(query_to_use, self._resource_link, self._options.get('excludedLocations')))
69+
query_plan = await self._client._GetQueryPlanThroughGateway(
70+
query_to_use,
71+
self._resource_link,
72+
self._options.get('excludedLocations'),
73+
read_timeout=self._options.get('read_timeout')
74+
)
75+
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
7176
qe_info = getattr(query_execution_info, "_query_execution_info", None)
7277
if isinstance(qe_info, dict) and isinstance(query_to_use, dict):
7378
params = query_to_use.get("parameters")
7479
if params is not None:
7580
query_execution_info._query_execution_info['parameters'] = params
81+
7682
self._execution_context = await self._create_pipelined_execution_context(query_execution_info)
7783

7884
async def __anext__(self):

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,15 @@ def _fetch_items_helper_no_retries(self, fetch_function):
129129
return fetched_items
130130

131131
def _fetch_items_helper_with_retries(self, fetch_function):
132-
def callback():
132+
# TODO: Properly propagate kwargs from retry utility to fetch function
133+
# the callback keep the **kwargs parameter to maintain compatibility with the retry utility's execution pattern.
134+
# ExecuteAsync passes retry context parameters (timeout, operation start time, logger, etc.)
135+
# The callback need to accept these parameters even if unused
136+
# Removing **kwargs results in a TypeError when ExecuteAsync tries to pass these parameters
137+
def callback(**kwargs): # pylint: disable=unused-argument
133138
return self._fetch_items_helper_no_retries(fetch_function)
134139

135-
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)
140+
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback, **self._options)
136141

137142
next = __next__ # Python 2 compatibility.
138143

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/execution_dispatcher.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,19 @@ def __init__(self, client, resource_link, query, options, fetch_function, respon
9696
def _create_execution_context_with_query_plan(self):
9797
self._fetched_query_plan = True
9898
query_to_use = self._query if self._query is not None else "Select * from root r"
99-
query_execution_info = _PartitionedQueryExecutionInfo(self._client._GetQueryPlanThroughGateway
100-
(query_to_use, self._resource_link, self._options.get('excludedLocations')))
101-
99+
query_plan = self._client._GetQueryPlanThroughGateway(
100+
query_to_use,
101+
self._resource_link,
102+
self._options.get('excludedLocations'),
103+
read_timeout=self._options.get('read_timeout')
104+
)
105+
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
102106
qe_info = getattr(query_execution_info, "_query_execution_info", None)
103107
if isinstance(qe_info, dict) and isinstance(query_to_use, dict):
104108
params = query_to_use.get("parameters")
105109
if params is not None:
106110
query_execution_info._query_execution_info['parameters'] = params
111+
107112
self._execution_context = self._create_pipelined_execution_context(query_execution_info)
108113

109114
def __next__(self):

sdk/cosmos/azure-cosmos/azure/cosmos/_query_iterable.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121

2222
"""Iterable query results in the Azure Cosmos database service.
2323
"""
24+
import time
2425
from azure.core.paging import PageIterator # type: ignore
26+
from azure.cosmos._constants import _Constants, TimeoutScope
2527
from azure.cosmos._execution_context import execution_dispatcher
28+
from azure.cosmos import exceptions
2629

2730
# pylint: disable=protected-access
2831

@@ -99,6 +102,17 @@ def _fetch_next(self, *args): # pylint: disable=unused-argument
99102
:return: List of results.
100103
:rtype: list
101104
"""
105+
timeout = self._options.get('timeout')
106+
# reset the operation start time if it's a paged request
107+
if timeout and self._options.get(_Constants.TimeoutScope) != TimeoutScope.OPERATION:
108+
self._options[_Constants.OperationStartTime] = time.time()
109+
110+
# Check timeout before fetching next block
111+
if timeout:
112+
elapsed = time.time() - self._options.get(_Constants.OperationStartTime)
113+
if elapsed >= timeout:
114+
raise exceptions.CosmosClientTimeoutError()
115+
102116
block = self._ex_context.fetch_next_block()
103117
if not block:
104118
raise StopIteration

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@
3939
from . import _session_retry_policy
4040
from . import _timeout_failover_retry_policy
4141
from . import exceptions
42+
from ._constants import _Constants
4243
from .documents import _OperationType
4344
from .exceptions import CosmosHttpResponseError
4445
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes, ResourceType
4546
from ._cosmos_http_logging_policy import _log_diagnostics_error
4647

47-
4848
# pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches
4949
# cspell:ignore PPAF,ppaf,ppcb
5050

@@ -65,6 +65,13 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
6565
:returns: the result of running the passed in function as a (result, headers) tuple
6666
:rtype: tuple of (dict, dict)
6767
"""
68+
# Capture the client timeout and start time at the beginning
69+
timeout = kwargs.get('timeout')
70+
operation_start_time = kwargs.get(_Constants.OperationStartTime, time.time())
71+
72+
# Track the last error for chaining
73+
last_error = None
74+
6875
pk_range_wrapper = None
6976
if args and (global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0]) or
7077
global_endpoint_manager.is_circuit_breaker_applicable(args[0])):
@@ -115,14 +122,25 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
115122
client, client._container_properties_cache, None, *args)
116123

117124
while True:
118-
client_timeout = kwargs.get('timeout')
119125
start_time = time.time()
126+
# Check timeout before executing function
127+
if timeout:
128+
elapsed = time.time() - operation_start_time
129+
if elapsed >= timeout:
130+
raise exceptions.CosmosClientTimeoutError(error=last_error)
131+
120132
try:
121133
if args:
122134
result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)
123135
global_endpoint_manager.record_success(args[0], pk_range_wrapper)
124136
else:
125137
result = ExecuteFunction(function, *args, **kwargs)
138+
# Check timeout after successful execution
139+
if timeout:
140+
elapsed = time.time() - operation_start_time
141+
if elapsed >= timeout:
142+
raise exceptions.CosmosClientTimeoutError(error=last_error)
143+
126144
if not client.last_response_headers:
127145
client.last_response_headers = {}
128146

@@ -163,6 +181,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
163181

164182
return result
165183
except exceptions.CosmosHttpResponseError as e:
184+
last_error = e
166185
if request:
167186
# update session token for relevant operations
168187
client._UpdateSessionIfRequired(request.headers, {}, e.headers)
@@ -236,12 +255,13 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
236255
client.session.clear_session_token(client.last_response_headers)
237256
raise
238257

258+
# Now check timeout before retrying
259+
if timeout:
260+
elapsed = time.time() - operation_start_time
261+
if elapsed >= timeout:
262+
raise exceptions.CosmosClientTimeoutError(error=last_error)
239263
# Wait for retry_after_in_milliseconds time before the next retry
240264
time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0)
241-
if client_timeout:
242-
kwargs['timeout'] = client_timeout - (time.time() - start_time)
243-
if kwargs['timeout'] <= 0:
244-
raise exceptions.CosmosClientTimeoutError()
245265

246266
except ServiceRequestError as e:
247267
if request and _has_database_account_header(request.headers):
@@ -270,6 +290,7 @@ def ExecuteFunction(function, *args, **kwargs):
270290
"""
271291
return function(*args, **kwargs)
272292

293+
273294
def _has_read_retryable_headers(request_headers):
274295
if _OperationType.IsReadOnlyOperation(request_headers.get(HttpHeaders.ThinClientProxyOperationType)):
275296
return True
@@ -345,6 +366,7 @@ def send(self, request):
345366
:raises ~azure.cosmos.exceptions.CosmosClientTimeoutError: Specified timeout exceeded.
346367
:raises ~azure.core.exceptions.ClientAuthenticationError: Authentication failed.
347368
"""
369+
348370
absolute_timeout = request.context.options.pop('timeout', None)
349371
per_request_timeout = request.context.options.pop('connection_timeout', 0)
350372
request_params = request.context.options.pop('request_params', None)
@@ -397,6 +419,7 @@ def send(self, request):
397419
if retry_active:
398420
self.sleep(retry_settings, request.context.transport)
399421
continue
422+
400423
raise err
401424
except CosmosHttpResponseError as err:
402425
raise err

0 commit comments

Comments
 (0)