Skip to content

Commit c2890be

Browse files
authored
[automatic-failover] Support double-threshold logic with circuitbreaker (#3522)
* - simplfy tracking exceptions check - add metrics evaluation tests for double-threshold - add more tests on CB evaluates metrics and state transition, including edge cases * - tune number of success/failures in test case * - Add recordResult(Throwable), recordSuccess(), and recordFailure() public methods to CircuitBreaker - Add getSnapshot() public method to expose metrics directly - Change getMetrics() to package-private (internal use only) - Simplify handleFailure() in endpoint implementations to use recordResult() - Update all tests to use new public API - Drop repeating test case shouldOpenImmediatelyWhenMinimumCountReachedAndRateIsZero * - fix test cases; drop unnecessary calls to evaluateMetrics when there is call to recordFailure
1 parent b6fbc4e commit c2890be

File tree

6 files changed

+545
-53
lines changed

6 files changed

+545
-53
lines changed

src/main/java/io/lettuce/core/failover/CircuitBreaker.java

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.lettuce.core.failover.api.CircuitBreakerStateListener;
1919
import io.lettuce.core.failover.metrics.CircuitBreakerMetrics;
2020
import io.lettuce.core.failover.metrics.CircuitBreakerMetricsImpl;
21+
import io.lettuce.core.failover.metrics.MetricsSnapshot;
2122

2223
/**
2324
* Circuit breaker for tracking command metrics and managing circuit breaker state. Wraps CircuitBreakerMetrics and exposes it
@@ -36,55 +37,80 @@ public class CircuitBreaker implements Closeable {
3637

3738
private volatile State currentState = State.CLOSED;
3839

39-
private Predicate<Throwable> exceptionsPredicate;
40-
4140
private final Set<CircuitBreakerStateListener> listeners = ConcurrentHashMap.newKeySet();
4241

42+
private final Set<Class<? extends Throwable>> trackedExceptions;
43+
4344
/**
4445
* Create a circuit breaker instance.
4546
*/
4647
public CircuitBreaker(CircuitBreakerConfig config) {
4748
this.metrics = new CircuitBreakerMetricsImpl();
4849
this.config = config;
49-
this.exceptionsPredicate = createExceptionsPredicate(config.trackedExceptions);
50+
this.trackedExceptions = new HashSet<>(config.trackedExceptions);
5051
}
5152

5253
/**
5354
* Get the metrics tracked by this circuit breaker.
54-
*
55+
* <p>
56+
* This is only for internal use and testing purposes.
57+
*
5558
* @return the circuit breaker metrics
5659
*/
57-
public CircuitBreakerMetrics getMetrics() {
60+
CircuitBreakerMetrics getMetrics() {
5861
return metrics;
5962
}
6063

64+
/**
65+
* Get a snapshot of the current metrics within the time window. Use the snapshot to access success count, failure count,
66+
* total count, and failure rate.
67+
*
68+
* @return an immutable snapshot of current metrics
69+
*/
70+
public MetricsSnapshot getSnapshot() {
71+
return metrics.getSnapshot();
72+
}
73+
6174
@Override
6275
public String toString() {
6376
return "CircuitBreaker{" + "metrics=" + metrics + ", config=" + config + '}';
6477
}
6578

66-
public boolean isCircuitBreakerTrackedException(Throwable error) {
67-
return exceptionsPredicate.test(error);
79+
public boolean isCircuitBreakerTrackedException(Throwable throwable) {
80+
Class<? extends Throwable> errorClass = throwable.getClass();
81+
for (Class<? extends Throwable> trackedException : trackedExceptions) {
82+
if (trackedException.isAssignableFrom(errorClass)) {
83+
return true;
84+
}
85+
}
86+
return false;
6887
}
6988

70-
private static Predicate<Throwable> createExceptionsPredicate(Set<Class<? extends Throwable>> trackedExceptions) {
71-
return throwable -> {
72-
Class<? extends Throwable> errorClass = throwable.getClass();
73-
for (Class<? extends Throwable> trackedException : trackedExceptions) {
74-
if (trackedException.isAssignableFrom(errorClass)) {
75-
return true;
76-
}
77-
}
78-
return false;
79-
};
89+
public void recordResult(Throwable error) {
90+
if (error != null && isCircuitBreakerTrackedException(error)) {
91+
recordFailure();
92+
} else {
93+
recordSuccess();
94+
}
95+
}
96+
97+
public void recordFailure() {
98+
metrics.recordFailure();
99+
evaluateMetrics();
100+
}
101+
102+
public void recordSuccess() {
103+
metrics.recordSuccess();
80104
}
81105

82-
public void evaluateMetrics() {
83-
boolean evaluationResult = metrics.getSnapshot().getFailureRate() >= config.getFailureRateThreshold()
84-
&& metrics.getSnapshot().getFailureCount() >= config.getMinimumNumberOfFailures();
106+
public MetricsSnapshot evaluateMetrics() {
107+
MetricsSnapshot snapshot = metrics.getSnapshot();
108+
boolean evaluationResult = snapshot.getFailureRate() >= config.getFailureRateThreshold()
109+
&& snapshot.getFailureCount() >= config.getMinimumNumberOfFailures();
85110
if (evaluationResult) {
86111
stateTransitionTo(State.OPEN);
87112
}
113+
return snapshot;
88114
}
89115

90116
private void stateTransitionTo(State newState) {

src/main/java/io/lettuce/core/failover/DatabaseEndpointImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,7 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
7777
}
7878

7979
private void handleFailure(Object output, Throwable error) {
80-
if (error != null && circuitBreaker.isCircuitBreakerTrackedException(error)) {
81-
circuitBreaker.getMetrics().recordFailure();
82-
circuitBreaker.evaluateMetrics();
83-
} else {
84-
circuitBreaker.getMetrics().recordSuccess();
85-
}
80+
circuitBreaker.recordResult(error);
8681
}
8782

8883
@Override

src/main/java/io/lettuce/core/failover/DatabasePubSubEndpointImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,7 @@ public <K1, V1, T> RedisCommand<K1, V1, T> write(RedisCommand<K1, V1, T> command
7878
}
7979

8080
private void handleFailure(Object output, Throwable error) {
81-
if (error != null && circuitBreaker.isCircuitBreakerTrackedException(error)) {
82-
circuitBreaker.getMetrics().recordFailure();
83-
circuitBreaker.evaluateMetrics();
84-
} else {
85-
circuitBreaker.getMetrics().recordSuccess();
86-
}
81+
circuitBreaker.recordResult(error);
8782
}
8883

8984
@Override

src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ void shouldTrackSuccessfulCommands() {
4444
// Get metrics
4545
CircuitBreaker cb = connection.getCircuitBreaker(endpoint);
4646
assertNotNull(cb);
47-
assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
48-
assertThat(cb.getMetrics().getSnapshot().getFailureCount()).isEqualTo(0);
47+
assertThat(cb.getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
48+
assertThat(cb.getSnapshot().getFailureCount()).isEqualTo(0);
4949

5050
connection.close();
5151
}
@@ -62,8 +62,8 @@ void shouldTrackMultipleCommands() {
6262

6363
// Get metrics
6464
CircuitBreaker cb = connection.getCircuitBreaker(endpoint);
65-
assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(3);
66-
assertThat(cb.getMetrics().getSnapshot().getFailureCount()).isEqualTo(0);
65+
assertThat(cb.getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(3);
66+
assertThat(cb.getSnapshot().getFailureCount()).isEqualTo(0);
6767

6868
connection.close();
6969
}
@@ -91,8 +91,8 @@ void shouldIsolatMetricsPerEndpoint() {
9191
CircuitBreaker cb2 = connection.getCircuitBreaker(secondEndpoint);
9292

9393
// Verify isolation - each endpoint has its own metrics
94-
assertThat(cb1.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
95-
assertThat(cb2.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
94+
assertThat(cb1.getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
95+
assertThat(cb2.getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(1);
9696

9797
connection.close();
9898
}
@@ -117,7 +117,7 @@ void shouldMaintainMetricsAfterSwitch() {
117117
// Execute command on first endpoint
118118
connection.sync().set("key1", "value1");
119119
CircuitBreaker cb1Before = connection.getCircuitBreaker(firstEndpoint);
120-
long successes1Before = cb1Before.getMetrics().getSnapshot().getSuccessCount();
120+
long successes1Before = cb1Before.getSnapshot().getSuccessCount();
121121

122122
// Switch to second endpoint
123123
List<RedisURI> endpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false)
@@ -134,7 +134,7 @@ void shouldMaintainMetricsAfterSwitch() {
134134

135135
// Verify metrics for first endpoint are unchanged
136136
CircuitBreaker cb1After = connection.getCircuitBreaker(firstEndpoint);
137-
assertThat(cb1After.getMetrics().getSnapshot().getSuccessCount()).isEqualTo(successes1Before);
137+
assertThat(cb1After.getSnapshot().getSuccessCount()).isEqualTo(successes1Before);
138138

139139
connection.close();
140140
}
@@ -151,8 +151,8 @@ void shouldExposeMetricsViaCircuitBreaker() {
151151
// Get circuit breaker and verify metrics are accessible
152152
CircuitBreaker cb = connection.getCircuitBreaker(endpoint);
153153
assertNotNull(cb);
154-
assertNotNull(cb.getMetrics());
155-
assertThat(cb.getMetrics().getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(2);
154+
assertNotNull(cb.getSnapshot());
155+
assertThat(cb.getSnapshot().getSuccessCount()).isGreaterThanOrEqualTo(2);
156156

157157
connection.close();
158158
}

src/test/java/io/lettuce/core/failover/CircuitBreakerStateListenerTests.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ void shouldNotifyListenerOnStateChange() {
3838

3939
// When - trigger state change by recording failures
4040
for (int i = 0; i < 10; i++) {
41-
circuitBreaker.getMetrics().recordFailure();
41+
circuitBreaker.recordFailure();
4242
}
43-
circuitBreaker.evaluateMetrics();
4443

4544
// Then
4645
assertThat(listener.events).hasSize(1);
@@ -61,9 +60,8 @@ void shouldNotifyMultipleListeners() {
6160

6261
// When - trigger state change
6362
for (int i = 0; i < 10; i++) {
64-
circuitBreaker.getMetrics().recordFailure();
63+
circuitBreaker.recordFailure();
6564
}
66-
circuitBreaker.evaluateMetrics();
6765

6866
// Then
6967
assertThat(listener1.events).hasSize(1);
@@ -79,9 +77,8 @@ void shouldNotNotifyRemovedListener() {
7977

8078
// When - trigger state change
8179
for (int i = 0; i < 10; i++) {
82-
circuitBreaker.getMetrics().recordFailure();
80+
circuitBreaker.recordFailure();
8381
}
84-
circuitBreaker.evaluateMetrics();
8582

8683
// Then
8784
assertThat(listener.events).isEmpty();
@@ -94,7 +91,7 @@ void shouldNotNotifyIfStateDoesNotChange() {
9491
circuitBreaker.addListener(listener);
9592

9693
// When - evaluate without enough failures
97-
circuitBreaker.getMetrics().recordSuccess();
94+
circuitBreaker.recordSuccess();
9895
circuitBreaker.evaluateMetrics();
9996

10097
// Then
@@ -111,9 +108,8 @@ void shouldHandleListenerExceptionsGracefully() {
111108

112109
// When - trigger state change
113110
for (int i = 0; i < 10; i++) {
114-
circuitBreaker.getMetrics().recordFailure();
111+
circuitBreaker.recordFailure();
115112
}
116-
circuitBreaker.evaluateMetrics();
117113

118114
// Then - normal listener should still receive the event
119115
assertThat(normalListener.events).hasSize(1);
@@ -129,9 +125,8 @@ void shouldIncludeTimestampInEvent() throws InterruptedException {
129125
// When
130126
Thread.sleep(10); // Small delay to ensure timestamp difference
131127
for (int i = 0; i < 10; i++) {
132-
circuitBreaker.getMetrics().recordFailure();
128+
circuitBreaker.recordFailure();
133129
}
134-
circuitBreaker.evaluateMetrics();
135130
long afterTimestamp = System.currentTimeMillis();
136131

137132
// Then

0 commit comments

Comments
 (0)