Skip to content

Commit f152db6

Browse files
authored
[automatic failover] Integrate circuitbreaker into each DefaultEndpoint/PubSubEndpoint (#3543)
* - move CB creation responsibility from RedisDatabase to client * - introduce interface for CB * - add CircuitBreaker interface - introduce 'CircuitBreakerGeneration' to track CB state changes and issue 'recordResult' on correct stateholder - apply command rejections whenCB is not CLOSED * - fix typo * - add metricsWindowSize to CircuitBreakerConfig - renaming DatabaseEndpoint.bind - add java docs - add tests for Command.onComplete callbacsk for registered in DatabaseEndpoint - introduce toxiproxy - add circuitbreaker test to veify metrics collection and failover triggers * - fix test * - fix failing test due to order of listeners in CB state change events * on feedbacks from @ggivo - drop record functions from CB interface - revisit exposed functions on CB impl - handle and record exception in databaseendpoint.write - fix tests - get rid of thread.sleep's in tests * - remove thread.sleep from test * - format * - limit visibility - improve metrics objects for testability - drop use of thread.sleep in DatabaseEndpointCallbackTests * - revisit the tests to provide the assertions they claim in comments. * - test to check commands failing after endpoint switch * - formatting * - change accesibility of CircuitBreakerGeneration - drop metricsFactory instance approach - fix naming typo - drop TestMetricsFActory - improve reflectinTestUtils * feedback from @ggivo - drop recordFailure/recordsuccess from CircuitBreakerImpl * feedback from @ggivo - revisit CircuitBreakerGeneration interface
1 parent be6a0d1 commit f152db6

25 files changed

+2986
-442
lines changed

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,14 @@
565565
<scope>test</scope>
566566
</dependency>
567567

568+
<!-- Toxiproxy for network failure simulation -->
569+
<dependency>
570+
<groupId>eu.rekawek.toxiproxy</groupId>
571+
<artifactId>toxiproxy-java</artifactId>
572+
<version>2.1.11</version>
573+
<scope>test</scope>
574+
</dependency>
575+
568576
<!-- JMH -->
569577

570578
<dependency>

src/main/java/io/lettuce/core/failover/CircuitBreaker.java

Lines changed: 33 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,12 @@
66
import java.util.Arrays;
77
import java.util.HashSet;
88
import java.util.Set;
9-
import java.util.concurrent.ConcurrentHashMap;
109
import java.util.concurrent.TimeoutException;
1110
import java.util.concurrent.atomic.AtomicReference;
1211

13-
import io.lettuce.core.internal.LettuceAssert;
14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
1712
import io.lettuce.core.RedisCommandTimeoutException;
1813
import io.lettuce.core.RedisConnectionException;
1914
import io.lettuce.core.failover.api.CircuitBreakerStateListener;
20-
import io.lettuce.core.failover.metrics.CircuitBreakerMetrics;
21-
import io.lettuce.core.failover.metrics.MetricsFactory;
2215
import io.lettuce.core.failover.metrics.MetricsSnapshot;
2316

2417
/**
@@ -31,227 +24,67 @@
3124
* @author Ali Takavci
3225
* @since 7.1
3326
*/
34-
public class CircuitBreaker implements Closeable {
35-
36-
private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class);
37-
38-
private final CircuitBreakerConfig config;
39-
40-
private final AtomicReference<CircuitBreakerStateHolder> stateRef;
41-
42-
private final Set<CircuitBreakerStateListener> listeners = ConcurrentHashMap.newKeySet();
43-
44-
private final Set<Class<? extends Throwable>> trackedExceptions;
45-
46-
/**
47-
* Create a circuit breaker instance.
48-
*/
49-
public CircuitBreaker(CircuitBreakerConfig config) {
50-
LettuceAssert.notNull(config, "CircuitBreakerConfig must not be null");
51-
52-
this.config = config;
53-
this.trackedExceptions = new HashSet<>(config.trackedExceptions);
54-
this.stateRef = new AtomicReference<>(
55-
new CircuitBreakerStateHolder(State.CLOSED, MetricsFactory.createDefaultMetrics()));
56-
}
57-
58-
/**
59-
* Get the metrics tracked by this circuit breaker.
60-
* <p>
61-
* This is only for internal use and testing purposes.
62-
*
63-
* @return the circuit breaker metrics
64-
*/
65-
CircuitBreakerMetrics getMetrics() {
66-
return stateRef.get().metrics;
67-
}
27+
public interface CircuitBreaker extends Closeable {
6828

6929
/**
7030
* Get a snapshot of the current metrics within the time window. Use the snapshot to access success count, failure count,
7131
* total count, and failure rate.
7232
*
7333
* @return an immutable snapshot of current metrics
7434
*/
75-
public MetricsSnapshot getSnapshot() {
76-
return stateRef.get().metrics.getSnapshot();
77-
}
78-
79-
@Override
80-
public String toString() {
81-
CircuitBreakerStateHolder current = stateRef.get();
82-
return "CircuitBreaker{" + "state=" + current.state + ", metrics=" + current.metrics + ", config=" + config + '}';
83-
}
84-
85-
public boolean isCircuitBreakerTrackedException(Throwable throwable) {
86-
Class<? extends Throwable> errorClass = throwable.getClass();
87-
for (Class<? extends Throwable> trackedException : trackedExceptions) {
88-
if (trackedException.isAssignableFrom(errorClass)) {
89-
return true;
90-
}
91-
}
92-
return false;
93-
}
94-
95-
public void recordResult(Throwable error) {
96-
if (error != null && isCircuitBreakerTrackedException(error)) {
97-
recordFailure();
98-
} else {
99-
recordSuccess();
100-
}
101-
}
102-
103-
public void recordFailure() {
104-
stateRef.get().metrics.recordFailure();
105-
evaluateMetrics();
106-
}
107-
108-
public void recordSuccess() {
109-
stateRef.get().metrics.recordSuccess();
110-
}
35+
public MetricsSnapshot getSnapshot();
11136

11237
/**
113-
* Evaluate the current metrics to determine if the circuit breaker should transition to a new state.
38+
* Get the current generation of the circuit breaker. This is used to track the state and metrics of the circuit breaker at
39+
* the time of command execution.
11440
*
115-
* <p>
116-
* This method checks the failure rate and failure count against the configured thresholds. If the thresholds are met, the
117-
* circuit breaker transitions to the OPEN state. Metrics are reset when the state changes.
118-
* </p>
119-
*
120-
* @return an immutable snapshot of current metrics
41+
* @return the current generation of the circuit breaker
12142
*/
122-
MetricsSnapshot evaluateMetrics() {
123-
CircuitBreakerStateHolder current = stateRef.get();
124-
MetricsSnapshot snapshot = current.metrics.getSnapshot();
125-
boolean evaluationResult = snapshot.getFailureRate() >= config.getFailureRateThreshold()
126-
&& snapshot.getFailureCount() >= config.getMinimumNumberOfFailures();
127-
if (evaluationResult) {
128-
stateTransitionTo(State.OPEN);
129-
}
130-
return snapshot;
131-
}
43+
public CircuitBreakerGeneration getGeneration();
13244

13345
/**
134-
* Switch the circuit breaker to the specified state. This method is used to force the circuit breaker to a specific state.
46+
* Get the current state of the circuit breaker.
13547
*
136-
* <p>
137-
* This method does not evaluate the metrics to determine if the state transition is valid. It simply transitions to the
138-
* specified state. Metrics are reset when the state changes.
139-
* </p>
140-
*
141-
* @param newState the target state
48+
* @return the current state
14249
*/
143-
public void transitionTo(State newState) {
144-
stateTransitionTo(newState);
145-
}
50+
public State getCurrentState();
14651

14752
/**
148-
* Atomically transition to a new state with fresh metrics.
149-
* <p>
150-
* This method uses lock-free CAS to ensure that state change and metrics reset happen atomically. Whenever the circuit
151-
* breaker transitions to a new state, a fresh metrics instance is created, providing a clean slate for tracking metrics in
152-
* the new state.
153-
* <p>
154-
* If the state is already the target state, no transition occurs.
53+
* Check if the circuit breaker is in the closed state.
15554
*
156-
* @param newState the target state
55+
* @return {@code true} if the circuit breaker is in the closed state
15756
*/
158-
private void stateTransitionTo(State newState) {
159-
while (true) {
160-
CircuitBreakerStateHolder current = stateRef.get();
161-
162-
// No transition needed if already in target state
163-
if (current.state == newState) {
164-
return;
165-
}
166-
167-
// Always create fresh metrics on state transition
168-
CircuitBreakerMetrics nextMetrics = MetricsFactory.createDefaultMetrics();
169-
170-
CircuitBreakerStateHolder next = new CircuitBreakerStateHolder(newState, nextMetrics);
171-
172-
// Atomically swap if current state hasn't changed
173-
if (stateRef.compareAndSet(current, next)) {
174-
fireStateChanged(current.state, newState);
175-
return;
176-
}
177-
// CAS failed, retry with updated current state
178-
}
179-
}
180-
181-
public State getCurrentState() {
182-
return stateRef.get().state;
183-
}
57+
public boolean isClosed();
18458

18559
/**
18660
* Add a listener for circuit breaker state change events.
18761
*
18862
* @param listener the listener to add, must not be {@code null}
18963
*/
190-
public void addListener(CircuitBreakerStateListener listener) {
191-
listeners.add(listener);
192-
}
64+
public void addListener(CircuitBreakerStateListener listener);
19365

19466
/**
19567
* Remove a listener for circuit breaker state change events.
19668
*
19769
* @param listener the listener to remove, must not be {@code null}
19870
*/
199-
public void removeListener(CircuitBreakerStateListener listener) {
200-
listeners.remove(listener);
201-
}
202-
203-
/**
204-
* Fire a state change event to all registered listeners.
205-
*
206-
* @param previousState the previous state
207-
* @param newState the new state
208-
*/
209-
private void fireStateChanged(State previousState, State newState) {
210-
CircuitBreakerStateChangeEvent event = new CircuitBreakerStateChangeEvent(this, previousState, newState);
211-
for (CircuitBreakerStateListener listener : listeners) {
212-
try {
213-
listener.onCircuitBreakerStateChange(event);
214-
} catch (Exception e) {
215-
// Ignore listener exceptions to prevent one bad listener from affecting others
216-
log.error("Error notifying listener " + listener + " of state change " + event, e);
217-
}
218-
}
219-
}
71+
public void removeListener(CircuitBreakerStateListener listener);
22072

22173
@Override
222-
public void close() {
223-
listeners.clear();
224-
}
74+
public void close();
22575

22676
public enum State {
22777
CLOSED, OPEN
22878
}
22979

230-
/**
231-
* Immutable holder for circuit breaker state and metrics.
232-
* <p>
233-
* This class enables atomic updates of both state and metrics using {@link AtomicReference}. When a state transition
234-
* occurs, a new instance is created with fresh metrics.
235-
*/
236-
private static final class CircuitBreakerStateHolder {
237-
238-
final State state;
239-
240-
final CircuitBreakerMetrics metrics;
241-
242-
CircuitBreakerStateHolder(State state, CircuitBreakerMetrics metrics) {
243-
this.state = state;
244-
this.metrics = metrics;
245-
}
246-
247-
}
248-
24980
public static class CircuitBreakerConfig {
25081

25182
private final static float DEFAULT_FAILURE_RATE_THRESHOLD = 10;
25283

25384
private final static int DEFAULT_MINIMUM_NUMBER_OF_FAILURES = 1000;
25485

86+
private final static int DEFAULT_METRICS_WINDOW_SIZE = 2;
87+
25588
private final static Set<Class<? extends Throwable>> DEFAULT_TRACKED_EXCEPTIONS = new HashSet<>(Arrays.asList(
25689

25790
// Connection failures
@@ -273,15 +106,19 @@ public static class CircuitBreakerConfig {
273106

274107
private final int minimumNumberOfFailures;
275108

109+
private final int metricsWindowSize;
110+
276111
private CircuitBreakerConfig() {
277-
this(DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_MINIMUM_NUMBER_OF_FAILURES, DEFAULT_TRACKED_EXCEPTIONS);
112+
this(DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_MINIMUM_NUMBER_OF_FAILURES, DEFAULT_TRACKED_EXCEPTIONS,
113+
DEFAULT_METRICS_WINDOW_SIZE);
278114
}
279115

280116
public CircuitBreakerConfig(float failureThreshold, int minimumNumberOfFailures,
281-
Set<Class<? extends Throwable>> trackedExceptions) {
117+
Set<Class<? extends Throwable>> trackedExceptions, int metricsWindowSize) {
282118
this.trackedExceptions = trackedExceptions;
283119
this.failureThreshold = failureThreshold;
284120
this.minimumNumberOfFailures = minimumNumberOfFailures;
121+
this.metricsWindowSize = metricsWindowSize;
285122
}
286123

287124
public Set<Class<? extends Throwable>> getTrackedExceptions() {
@@ -296,6 +133,16 @@ public int getMinimumNumberOfFailures() {
296133
return minimumNumberOfFailures;
297134
}
298135

136+
public int getMetricsWindowSize() {
137+
return metricsWindowSize;
138+
}
139+
140+
@Override
141+
public String toString() {
142+
return "CircuitBreakerConfig{" + "trackedExceptions=" + trackedExceptions + ", failureThreshold=" + failureThreshold
143+
+ ", minimumNumberOfFailures=" + minimumNumberOfFailures + ", metricsWindowSize=" + metricsWindowSize + '}';
144+
}
145+
299146
}
300147

301148
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.lettuce.core.failover;
2+
3+
/**
4+
* Represents a specific generation of a circuit breaker. This interface provides methods to record the result of a command
5+
* execution for the metrics tracking of that specific generation.
6+
*
7+
* @author Ali Takavci
8+
* @since 7.1
9+
*/
10+
public interface CircuitBreakerGeneration {
11+
12+
/**
13+
* Record the result of a command execution for the metrics tracking of this generation. * @param error the error, if any
14+
*/
15+
void recordResult(Throwable error);
16+
17+
}

0 commit comments

Comments
 (0)