Skip to content

Commit 750f3bf

Browse files
added retry method in base circuit breaker (#1)
* added retry method in base circuit breaker * PR fixes * split retry function from circuit breaker * split retry function from circuit breaker * RP fixes
1 parent 0c311b7 commit 750f3bf

15 files changed

+288
-26
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Python Circuit Breaker Box
22

3-
A Python implementation of the Circuit Breaker pattern with Redis and in-memory storage support.
3+
A Python implementation of the Circuit Breaker pattern.
44

55
## Features
66

@@ -10,8 +10,8 @@ A Python implementation of the Circuit Breaker pattern with Redis and in-memory
1010
- [![Python](https://img.shields.io/badge/Python-3776AB?style=for-the-badge&logo=python&logoColor=FFD43B)](https://python.org) 3.10-3.13 support.
1111
- ⚡ Asynchronous API
1212
- 🔧 Configurable parameters
13+
- 🔄 Retries by [tenacity](https://tenacity.readthedocs.io/en/latest/)
1314
- 🛠️ FastAPI integration through custom exceptions
14-
- 🔄 Exponential backoff by [tenacity](https://tenacity.readthedocs.io/en/latest/) with jitter for retries
1515

1616
## Installation
1717
```bash

circuit_breaker_box/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
from circuit_breaker_box.circuit_breaker_base import BaseCircuitBreaker
22
from circuit_breaker_box.circuit_breaker_in_memory import CircuitBreakerInMemory
33
from circuit_breaker_box.circuit_breaker_redis import CircuitBreakerRedis
4+
from circuit_breaker_box.common_types import ResponseType
45
from circuit_breaker_box.errors import BaseCircuitBreakerError, HostUnavailableError
6+
from circuit_breaker_box.retryer_base import BaseRetrier
7+
from circuit_breaker_box.retryers import Retrier, RetrierCircuitBreaker
58

69

710
__all__ = [
811
"BaseCircuitBreaker",
912
"BaseCircuitBreakerError",
13+
"BaseRetrier",
1014
"CircuitBreakerInMemory",
1115
"CircuitBreakerRedis",
1216
"HostUnavailableError",
17+
"ResponseType",
18+
"Retrier",
19+
"RetrierCircuitBreaker",
1320
]

circuit_breaker_box/circuit_breaker_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
@dataclasses.dataclass(kw_only=True, slots=True)
7-
class BaseCircuitBreaker:
7+
class BaseCircuitBreaker(abc.ABC):
88
reset_timeout_in_seconds: int
99
max_failure_count: int
1010

circuit_breaker_box/circuit_breaker_in_memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13-
@dataclasses.dataclass(kw_only=True, slots=True)
13+
@dataclasses.dataclass(kw_only=True)
1414
class CircuitBreakerInMemory(BaseCircuitBreaker):
1515
max_cache_size: int
1616
cache_hosts_with_errors: TTLCache[typing.Any, typing.Any] = dataclasses.field(init=False)

circuit_breaker_box/circuit_breaker_redis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def _log_attempt(retry_state: tenacity.RetryCallState) -> None:
1717
logger.info("Attempt redis_reconnect: %s", retry_state)
1818

1919

20-
@dataclasses.dataclass(kw_only=True, slots=True)
20+
@dataclasses.dataclass(kw_only=True)
2121
class CircuitBreakerRedis(BaseCircuitBreaker):
2222
redis_connection: "aioredis.Redis[str]"
2323

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import typing
2+
3+
4+
ResponseType = typing.TypeVar("ResponseType")
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import abc
2+
import dataclasses
3+
import logging
4+
import typing
5+
6+
import tenacity
7+
8+
from circuit_breaker_box import ResponseType
9+
10+
11+
logger = logging.getLogger(__name__)
12+
13+
P = typing.ParamSpec("P")
14+
15+
16+
@dataclasses.dataclass(kw_only=True)
17+
class BaseRetrier(abc.ABC, typing.Generic[ResponseType]):
18+
max_retries: int
19+
reraise: bool = True
20+
exceptions_to_retry: tuple[type[Exception]]
21+
stop: tenacity.stop.stop_after_attempt = dataclasses.field(init=False)
22+
wait_strategy: tenacity.wait.wait_exponential_jitter = dataclasses.field(init=False)
23+
retry_cause: tenacity.retry_if_exception_type = dataclasses.field(init=False)
24+
25+
def __post_init__(self) -> None:
26+
self.stop = tenacity.stop_after_attempt(self.max_retries)
27+
self.wait_strategy = tenacity.wait_exponential_jitter()
28+
self.retry_cause = tenacity.retry_if_exception_type(self.exceptions_to_retry)
29+
30+
@abc.abstractmethod
31+
async def retry(
32+
self,
33+
coroutine: typing.Callable[P, typing.Awaitable[ResponseType]],
34+
*args: P.args,
35+
**kwargs: P.kwargs,
36+
) -> ResponseType: ...
37+
38+
@staticmethod
39+
def _log_attempts(retry_state: tenacity.RetryCallState) -> None:
40+
logger.info(
41+
"Attempt: attempt_number: %s, outcome_timestamp: %s",
42+
retry_state.attempt_number,
43+
retry_state.outcome_timestamp,
44+
)

circuit_breaker_box/retryers.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import dataclasses
2+
import logging
3+
import typing
4+
5+
import tenacity
6+
7+
from circuit_breaker_box import BaseCircuitBreaker, BaseRetrier, ResponseType
8+
9+
10+
logger = logging.getLogger(__name__)
11+
12+
P = typing.ParamSpec("P")
13+
14+
15+
@dataclasses.dataclass(kw_only=True)
16+
class Retrier(BaseRetrier[ResponseType]):
17+
async def retry( # type: ignore[return]
18+
self,
19+
coroutine: typing.Callable[P, typing.Awaitable[ResponseType]],
20+
*args: P.args,
21+
**kwargs: P.kwargs,
22+
) -> ResponseType:
23+
for attempt in tenacity.Retrying( # noqa: RET503
24+
stop=self.stop,
25+
wait=self.wait_strategy,
26+
retry=self.retry_cause,
27+
reraise=self.reraise,
28+
before=self._log_attempts,
29+
):
30+
with attempt:
31+
return await coroutine(*args, **kwargs)
32+
33+
34+
@dataclasses.dataclass(kw_only=True)
35+
class RetrierCircuitBreaker(BaseRetrier[ResponseType]):
36+
circuit_breaker: BaseCircuitBreaker
37+
38+
async def retry( # type: ignore[return]
39+
self,
40+
coroutine: typing.Callable[P, typing.Awaitable[ResponseType]],
41+
*args: P.args,
42+
**kwargs: P.kwargs,
43+
) -> ResponseType:
44+
if not (host := str(kwargs.get("host", ""))):
45+
msg = "'host' argument should be defined"
46+
raise ValueError(msg)
47+
48+
for attempt in tenacity.Retrying( # noqa: RET503
49+
stop=self.stop,
50+
wait=self.wait_strategy,
51+
retry=self.retry_cause,
52+
reraise=self.reraise,
53+
before=self._log_attempts,
54+
):
55+
with attempt:
56+
if not await self.circuit_breaker.is_host_available(host):
57+
await self.circuit_breaker.raise_host_unavailable_error(host)
58+
59+
if attempt.retry_state.attempt_number > 1:
60+
await self.circuit_breaker.increment_failures_count(host)
61+
62+
return await coroutine(*args, **kwargs)

examples/example_circuit_breaker.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
11
import asyncio
22
import logging
3-
import typing
4-
5-
import fastapi
63

74
from circuit_breaker_box import CircuitBreakerInMemory
85

96

10-
HTTP_MAX_TRIES = 4
7+
MAX_RETRIES = 4
118
MAX_CACHE_SIZE = 256
129
CIRCUIT_BREAKER_MAX_FAILURE_COUNT = 1
1310
RESET_TIMEOUT_IN_SECONDS = 10
1411
SOME_HOST = "http://example.com/"
1512

1613

17-
class CustomCircuitBreakerInMemory(CircuitBreakerInMemory):
18-
async def raise_host_unavailable_error(self, host: str) -> typing.NoReturn:
19-
raise fastapi.HTTPException(status_code=500, detail=f"Host: {host} is unavailable")
20-
21-
2214
async def main() -> None:
2315
logging.basicConfig(level=logging.DEBUG)
24-
circuit_breaker = CustomCircuitBreakerInMemory(
16+
circuit_breaker = CircuitBreakerInMemory(
2517
reset_timeout_in_seconds=RESET_TIMEOUT_IN_SECONDS,
2618
max_failure_count=CIRCUIT_BREAKER_MAX_FAILURE_COUNT,
2719
max_cache_size=MAX_CACHE_SIZE,
2820
)
2921

3022
assert await circuit_breaker.is_host_available(host=SOME_HOST)
3123

32-
for _ in range(HTTP_MAX_TRIES):
24+
for _ in range(MAX_RETRIES):
3325
await circuit_breaker.increment_failures_count(host=SOME_HOST)
3426

3527
assert await circuit_breaker.is_host_available(host=SOME_HOST) is False

examples/example_retry.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import asyncio
2+
import logging
3+
4+
import httpx
5+
6+
from circuit_breaker_box.retryers import Retrier
7+
8+
9+
MAX_RETRIES = 4
10+
MAX_CACHE_SIZE = 256
11+
CIRCUIT_BREAKER_MAX_FAILURE_COUNT = 3
12+
RESET_TIMEOUT_IN_SECONDS = 10
13+
SOME_HOST = "http://example.com/"
14+
15+
16+
async def main() -> None:
17+
logging.basicConfig(level=logging.DEBUG)
18+
retryer = Retrier[httpx.Response](
19+
max_retries=MAX_RETRIES,
20+
exceptions_to_retry=(ZeroDivisionError,),
21+
)
22+
example_request = httpx.Request("GET", httpx.URL("http://example.com"))
23+
24+
async def foo(request: httpx.Request, host: str) -> httpx.Response: # noqa: ARG001
25+
raise ZeroDivisionError
26+
27+
await retryer.retry(coroutine=foo, request=example_request, host=example_request.url.host)
28+
29+
30+
if __name__ == "__main__":
31+
asyncio.run(main())

0 commit comments

Comments
 (0)