Skip to content

Commit 97ce0dd

Browse files
simorenohtvaron3jeet1995
authored
[Cosmos] Per-Partition Automatic Failover (#41588)
* sync PPAF * async changes * Update test_per_partition_automatic_failover_async.py * CI fixes * changelog * broken link * Update test_location_cache.py * change PPAF detection logic * Update _global_partition_endpoint_manager_circuit_breaker_core.py * Update _global_partition_endpoint_manager_circuit_breaker_core.py * fix tests and remove environment variable * fix tests * revert excluded locations change * fix analyze * test excluded locations * Add different error handling for 503 and 408s, update README * mypy, cspell, pylint * remove tag from tests since config is service based * add threshold-based retries for 408, 5xx errors * update constant use, rollback session token PR change * threshold based retries * Update _base.py * cspell, test fixes * Update _service_unavailable_retry_policy.py * mypy, pylint * 503 behavior change, use regional contexts * mypy, pylint, tests * special-casing 503s * small fix * exclude region tests * session retry tests * pylint, cspell * change errors since 503 is now retried directly * Update sdk/cosmos/azure-cosmos/README.md Co-authored-by: Abhijeet Mohanty <mabhijeet1995@gmail.com> * address comments update changelog, update docs, add typehints and documentation * Update _service_unavailable_retry_policy.py * small test updates for 503 behavior * further comments * Update test_per_partition_circuit_breaker_sm_mrr.py * test fixes * Update test_excluded_locations.py * small improvement to region-finding * pylint * Update _global_partition_endpoint_manager_per_partition_automatic_failover.py * address comments, add threshold lock * add more comments * edge cases * changes from testing * pylint * fixes pylint/mypy * mypy complaining about assigning str to none * testing changes - will roll back later * Update _endpoint_discovery_retry_policy.py * Update _asynchronous_request.py * add user agent feature flags * Update test_per_partition_automatic_failover_async.py * move user agent logic * sync and async match, remove print statements * leftover timer * Update _retry_utility.py * use constants * pylint * Update CHANGELOG.md * react to comments * Update _retry_utility.py * mypy pylint * test fixes * add lock to failure additions --------- Co-authored-by: tvaron3 <tomas.varon1802@gmail.com> Co-authored-by: Abhijeet Mohanty <mabhijeet1995@gmail.com>
1 parent acf59b6 commit 97ce0dd

File tree

46 files changed

+1709
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1709
-166
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
### 4.14.3 (Unreleased)
44

55
#### Features Added
6+
* Added support for Per Partition Automatic Failover. To enable this feature, you must follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover). See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588).
67

78
#### Breaking Changes
89

910
#### Bugs Fixed
1011

1112
#### Other Changes
13+
* Added cross-regional retries for 503 (Service Unavailable) errors. See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588).
1214

1315
### 4.14.2 (2025-11-14)
1416

sdk/cosmos/azure-cosmos/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,11 @@ requests to another region:
940940
- `AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED`: Default is a `90` percent failure rate.
941941
- After a partition reaches a 90 percent failure rate for all requests, the SDK will send requests routed to that partition to another region.
942942

943+
### Per Partition Automatic Failover (Public Preview)
944+
Per partition automatic failover enables the SDK to automatically redirect write requests at the partition level to another region based on service-side signals. This feature is available
945+
only for single write region accounts that have at least one read-only region. When per partition automatic failover is enabled, per partition circuit breaker and cross-region hedging is enabled by default, meaning
946+
all its configurable options also apply to per partition automatic failover. To enable this feature, follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover).
947+
943948
## Troubleshooting
944949

945950
### General

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@
4646
if TYPE_CHECKING:
4747
from ._cosmos_client_connection import CosmosClientConnection
4848
from .aio._cosmos_client_connection_async import CosmosClientConnection as AsyncClientConnection
49+
from ._global_partition_endpoint_manager_per_partition_automatic_failover import (
50+
_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover)
4951
from ._request_object import RequestObject
52+
from ._routing.routing_range import PartitionKeyRangeWrapper
5053

5154
# pylint: disable=protected-access
55+
#cspell:ignore PPAF, ppaf
5256

5357
_COMMON_OPTIONS = {
5458
'initial_headers': 'initialHeaders',

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
"""Class for defining internal constants in the Azure Cosmos database service.
2323
"""
2424

25-
25+
from enum import IntEnum
2626
from typing_extensions import Literal
27+
# cspell:ignore PPAF
2728

2829
# cspell:ignore reranker
2930

@@ -40,6 +41,7 @@ class _Constants:
4041
Name: Literal["name"] = "name"
4142
DatabaseAccountEndpoint: Literal["databaseAccountEndpoint"] = "databaseAccountEndpoint"
4243
DefaultEndpointsRefreshTime: int = 5 * 60 * 1000 # milliseconds
44+
EnablePerPartitionFailoverBehavior: Literal["enablePerPartitionFailoverBehavior"] = "enablePerPartitionFailoverBehavior" #pylint: disable=line-too-long
4345

4446
# ServiceDocument Resource
4547
EnableMultipleWritableLocations: Literal["enableMultipleWriteLocations"] = "enableMultipleWriteLocations"
@@ -74,6 +76,10 @@ class _Constants:
7476
FAILURE_PERCENTAGE_TOLERATED = "AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED"
7577
FAILURE_PERCENTAGE_TOLERATED_DEFAULT: int = 90
7678
# -------------------------------------------------------------------------
79+
# Only applicable when per partition automatic failover is enabled --------
80+
TIMEOUT_ERROR_THRESHOLD_PPAF = "AZURE_COSMOS_TIMEOUT_ERROR_THRESHOLD_FOR_PPAF"
81+
TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10
82+
# -------------------------------------------------------------------------
7783

7884
# Error code translations
7985
ERROR_TRANSLATIONS: dict[int, str] = {
@@ -99,3 +105,22 @@ class Kwargs:
99105
"""Whether to retry write operations if they fail. Used either at client level or request level."""
100106

101107
EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"
108+
109+
class UserAgentFeatureFlags(IntEnum):
110+
"""
111+
User agent feature flags.
112+
Each flag represents a bit in a number to encode what features are enabled. Therefore, the first feature flag
113+
will be 1, the second 2, the third 4, etc. When constructing the user agent suffix, the feature flags will be
114+
used to encode a unique number representing the features enabled. This number will be converted into a hex
115+
string following the prefix "F" to save space in the user agent as it is limited and appended to the user agent
116+
suffix. This number will then be used to determine what features are enabled by decoding the hex string back
117+
to a number and checking what bits are set.
118+
119+
Features being developed should align with the .NET SDK as a source of truth for feature flag assignments:
120+
https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/Diagnostics/UserAgentFeatureFlags.cs
121+
122+
Example:
123+
If the user agent suffix has "F3", this means that flags 1 and 2.
124+
"""
125+
PER_PARTITION_AUTOMATIC_FAILOVER = 1
126+
PER_PARTITION_CIRCUIT_BREAKER = 2

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
HttpResponse # pylint: disable=no-legacy-azure-core-http-response-import
5050

5151
from . import _base as base
52-
from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker
52+
from ._global_partition_endpoint_manager_per_partition_automatic_failover import _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover # pylint: disable=line-too-long
5353
from . import _query_iterable as query_iterable
5454
from . import _runtime_constants as runtime_constants
5555
from . import _session
@@ -176,7 +176,7 @@ def __init__( # pylint: disable=too-many-statements
176176
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
177177

178178
self.UseMultipleWriteLocations = False
179-
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreaker(self)
179+
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(self)
180180

181181
retry_policy = None
182182
if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy):
@@ -2688,12 +2688,15 @@ def GetDatabaseAccount(
26882688
database_account._ReadableLocations = result[Constants.ReadableLocations]
26892689
if Constants.EnableMultipleWritableLocations in result:
26902690
database_account._EnableMultipleWritableLocations = result[
2691-
Constants.EnableMultipleWritableLocations
2692-
]
2691+
Constants.EnableMultipleWritableLocations]
26932692

26942693
self.UseMultipleWriteLocations = (
26952694
self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations
26962695
)
2696+
2697+
if Constants.EnablePerPartitionFailoverBehavior in result:
2698+
database_account._EnablePerPartitionFailoverBehavior = result[Constants.EnablePerPartitionFailoverBehavior]
2699+
26972700
if response_hook:
26982701
response_hook(last_response_headers, result)
26992702
return database_account

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def _get_client_settings(global_endpoint_manager: Optional[_GlobalEndpointManage
180180
gem_client = global_endpoint_manager.client
181181
if gem_client and gem_client.connection_policy:
182182
connection_policy: ConnectionPolicy = gem_client.connection_policy
183-
client_preferred_regions = connection_policy.PreferredLocations
183+
client_preferred_regions = global_endpoint_manager.location_cache.effective_preferred_locations
184184
client_excluded_regions = connection_policy.ExcludedLocations
185185

186186
if global_endpoint_manager.location_cache:

sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,9 @@
2323
Azure Cosmos database service.
2424
"""
2525

26-
import logging
27-
from azure.cosmos.documents import _OperationType
28-
29-
logger = logging.getLogger(__name__)
30-
logger.setLevel(logging.INFO)
31-
log_formatter = logging.Formatter("%(levelname)s:%(message)s")
32-
log_handler = logging.StreamHandler()
33-
log_handler.setFormatter(log_formatter)
34-
logger.addHandler(log_handler)
26+
# cspell:ignore PPAF
3527

28+
from azure.cosmos.documents import _OperationType
3629

3730
class EndpointDiscoveryRetryPolicy(object):
3831
"""The endpoint discovery retry policy class used for geo-replicated database accounts
@@ -43,8 +36,9 @@ class EndpointDiscoveryRetryPolicy(object):
4336
Max_retry_attempt_count = 120
4437
Retry_after_in_milliseconds = 1000
4538

46-
def __init__(self, connection_policy, global_endpoint_manager, *args):
39+
def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, *args):
4740
self.global_endpoint_manager = global_endpoint_manager
41+
self.pk_range_wrapper = pk_range_wrapper
4842
self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
4943
self.failover_retry_count = 0
5044
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
@@ -70,6 +64,22 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
7064

7165
self.failover_retry_count += 1
7266

67+
# set the refresh_needed flag to ensure that endpoint list is
68+
# refreshed with new writable and readable locations
69+
self.global_endpoint_manager.refresh_needed = True
70+
71+
# If per partition automatic failover is applicable, we mark the current endpoint as unavailable
72+
# and resolve the service endpoint for the partition range - otherwise, continue the default retry logic
73+
if self.global_endpoint_manager.is_per_partition_automatic_failover_applicable(self.request):
74+
partition_level_info = self.global_endpoint_manager.partition_range_to_failover_info[self.pk_range_wrapper]
75+
location = self.global_endpoint_manager.location_cache.get_location_from_endpoint(
76+
str(self.request.location_endpoint_to_route))
77+
regional_endpoint = (self.global_endpoint_manager.location_cache.
78+
account_read_regional_routing_contexts_by_location.get(location))
79+
partition_level_info.unavailable_regional_endpoints[location] = regional_endpoint
80+
self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper)
81+
return True
82+
7383
if self.request.location_endpoint_to_route:
7484
context = self.__class__.__name__
7585
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
@@ -82,16 +92,11 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
8292
self.request.location_endpoint_to_route,
8393
True, context)
8494

85-
# set the refresh_needed flag to ensure that endpoint list is
86-
# refreshed with new writable and readable locations
87-
self.global_endpoint_manager.refresh_needed = True
88-
8995
# clear previous location-based routing directive
9096
self.request.clear_route_to_location()
9197

9298
# set location-based routing directive based on retry count
93-
# simulating single master writes by ensuring usePreferredLocations
94-
# is set to false
99+
# simulating single master writes by ensuring usePreferredLocations is set to false
95100
# reasoning being that 403.3 is only expected for write region failover in single writer account
96101
# and we must rely on account locations as they are the source of truth
97102
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)

sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
if TYPE_CHECKING:
3737
from azure.cosmos._cosmos_client_connection import CosmosClientConnection
3838

39+
#cspell:ignore ppcb
40+
3941
class _GlobalPartitionEndpointManagerForCircuitBreaker(_GlobalEndpointManager):
4042
"""
4143
This internal class implements the logic for partition endpoint management for
@@ -93,16 +95,17 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
9395

9496
return PartitionKeyRangeWrapper(partition_range, container_rid)
9597

96-
def record_failure(
98+
def record_ppcb_failure(
9799
self,
98-
request: RequestObject
99-
) -> None:
100+
request: RequestObject,
101+
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None)-> None:
100102
if self.is_circuit_breaker_applicable(request):
101-
pk_range_wrapper = self.create_pk_range_wrapper(request)
103+
if pk_range_wrapper is None:
104+
pk_range_wrapper = self.create_pk_range_wrapper(request)
102105
if pk_range_wrapper:
103106
self.global_partition_endpoint_manager_core.record_failure(request, pk_range_wrapper)
104107

105-
def resolve_service_endpoint_for_partition(
108+
def _resolve_service_endpoint_for_partition_circuit_breaker(
106109
self,
107110
request: RequestObject,
108111
pk_range_wrapper: Optional[PartitionKeyRangeWrapper]
@@ -113,11 +116,12 @@ def resolve_service_endpoint_for_partition(
113116
pk_range_wrapper)
114117
return self._resolve_service_endpoint(request)
115118

116-
def record_success(
119+
def record_ppcb_success(
117120
self,
118-
request: RequestObject
119-
) -> None:
120-
if self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request):
121-
pk_range_wrapper = self.create_pk_range_wrapper(request)
121+
request: RequestObject,
122+
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None:
123+
if self.is_circuit_breaker_applicable(request):
124+
if pk_range_wrapper is None:
125+
pk_range_wrapper = self.create_pk_range_wrapper(request)
122126
if pk_range_wrapper:
123127
self.global_partition_endpoint_manager_core.record_success(request, pk_range_wrapper)

sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker_core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2020
# SOFTWARE.
2121

22+
# pylint: disable=protected-access
23+
2224
"""Internal class for global endpoint manager for circuit breaker.
2325
"""
2426
import logging
@@ -60,7 +62,10 @@ def is_circuit_breaker_applicable(self, request: RequestObject) -> bool:
6062
return False
6163

6264
circuit_breaker_enabled = os.environ.get(Constants.CIRCUIT_BREAKER_ENABLED_CONFIG,
63-
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT) == "True"
65+
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT).lower() == "true"
66+
if not circuit_breaker_enabled and self.client._global_endpoint_manager is not None:
67+
if self.client._global_endpoint_manager._database_account_cache is not None:
68+
circuit_breaker_enabled = self.client._global_endpoint_manager._database_account_cache._EnablePerPartitionFailoverBehavior is True # pylint: disable=line-too-long
6469
if not circuit_breaker_enabled:
6570
return False
6671

0 commit comments

Comments
 (0)