Skip to content

Commit e49708b

Browse files
ggivoCopilot
andauthored
[automatic failover] CAE-1861: Atomic lock-free metrics reset on CircuitBreaker state transitions (#3527)
* abstract clock for easy testing * Improve LockFreeSlidingWindowMetrics: fix bugs and add tests Bug Fixes: - Fix: Ensure snapshot metrics remain accurate after a full window rotation - Fix: events recorded exactly at bucket boundaries were miscounted - Enforce window size % bucket size == 0 - Move LockFreeSlidingWindowMetricsUnitTests to correct package (io.lettuce.core.failover.metrics) * remove unused reset methods * extract interface for MetricsSnapshot - remove snapshotTime - not used & not correctly calcualted - remove reset metrics - unused as of now * add LockFreeSlidingWindowMetrics benchmark test * performance tests moved to metrics package * replace with port from reselience4j * update copyrights * format * clean up javadocs * clean up - fix incorrect javadoc - fix failing benchmark * [automatic failover] Hide failover metrics implementation - CircuitBreakerMetrics, MetricsSnapshot - public - metrics implementation details stay inside io.lettuce.core.failover.metrics - Update CircuitBreaker to obtain its metrics via CircuitBreakerMetricsFactory.createLockFree() * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * rename createLockFree -> createDefaultMetrics * address review comments by @atakavci - remove CircuitBreakerMetrics, CircuitBreakerMetricsImpl - rename SlidingWindowMetrics -> CircuitBreakerMetrics * format * Enforce min-window size of 2 buckets Current implementation requires at least 2 buckets window With windowSize=1, only one node is created with next=null When updateWindow() advances the window it sets HEAD to headNext, which is null for a single-node window On the next call to updateWindow(), tries to access head.next but head is now null, causing: NullPointerException: Cannot read field "next" because "head" is null * Clean-up benchmark - benchmark matrix threads (1,4) window_size ("2", "30", "180") - performs 1_000_000 ops in simulated 5min test window - benchmark record events - benchmark record & read snapshot * remove MetricsPerformanceTests.java - no reliable way to assert on performance, instead added basic benchmark test to benchmark recording/snapshot reading average times - gc benchmarks are available for local testing * reset method removed * reset circuit breaker metrics on state transition * fix test : shouldMaintainMetricsAfterSwitch() CB metrics are updated async on command completion, meaning waiting on command completion threads might proceed before metrics snapshot is updated. * format * evaluateMetrics - javadocs & make it package private * format --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent f7051be commit e49708b

File tree

4 files changed

+188
-29
lines changed

4 files changed

+188
-29
lines changed

src/main/java/io/lettuce/core/failover/CircuitBreaker.java

Lines changed: 91 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import java.util.Set;
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.TimeoutException;
11-
import java.util.function.Predicate;
11+
import java.util.concurrent.atomic.AtomicReference;
1212

1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
@@ -23,6 +23,9 @@
2323
/**
2424
* Circuit breaker for tracking command metrics and managing circuit breaker state. Wraps CircuitBreakerMetrics and exposes it
2525
* via {@link #getMetrics()}.
26+
* <p>
27+
* State transitions and metrics replacement are atomic and lock-free using {@link AtomicReference}. When the circuit breaker
28+
* transitions to a new state, a fresh metrics instance is created atomically.
2629
*
2730
* @author Ali Takavci
2831
* @since 7.1
@@ -31,11 +34,9 @@ public class CircuitBreaker implements Closeable {
3134

3235
private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class);
3336

34-
private final CircuitBreakerMetrics metrics;
35-
3637
private final CircuitBreakerConfig config;
3738

38-
private volatile State currentState = State.CLOSED;
39+
private final AtomicReference<CircuitBreakerStateHolder> stateRef;
3940

4041
private final Set<CircuitBreakerStateListener> listeners = ConcurrentHashMap.newKeySet();
4142

@@ -45,9 +46,10 @@ public class CircuitBreaker implements Closeable {
4546
* Create a circuit breaker instance.
4647
*/
4748
public CircuitBreaker(CircuitBreakerConfig config) {
48-
this.metrics = MetricsFactory.createDefaultMetrics();
4949
this.config = config;
5050
this.trackedExceptions = new HashSet<>(config.trackedExceptions);
51+
this.stateRef = new AtomicReference<>(
52+
new CircuitBreakerStateHolder(State.CLOSED, MetricsFactory.createDefaultMetrics()));
5153
}
5254

5355
/**
@@ -58,7 +60,7 @@ public CircuitBreaker(CircuitBreakerConfig config) {
5860
* @return the circuit breaker metrics
5961
*/
6062
CircuitBreakerMetrics getMetrics() {
61-
return metrics;
63+
return stateRef.get().metrics;
6264
}
6365

6466
/**
@@ -68,12 +70,13 @@ CircuitBreakerMetrics getMetrics() {
6870
* @return an immutable snapshot of current metrics
6971
*/
7072
public MetricsSnapshot getSnapshot() {
71-
return metrics.getSnapshot();
73+
return stateRef.get().metrics.getSnapshot();
7274
}
7375

7476
@Override
7577
public String toString() {
76-
return "CircuitBreaker{" + "metrics=" + metrics + ", config=" + config + '}';
78+
CircuitBreakerStateHolder current = stateRef.get();
79+
return "CircuitBreaker{" + "state=" + current.state + ", metrics=" + current.metrics + ", config=" + config + '}';
7780
}
7881

7982
public boolean isCircuitBreakerTrackedException(Throwable throwable) {
@@ -95,16 +98,27 @@ public void recordResult(Throwable error) {
9598
}
9699

97100
public void recordFailure() {
98-
metrics.recordFailure();
101+
stateRef.get().metrics.recordFailure();
99102
evaluateMetrics();
100103
}
101104

102105
public void recordSuccess() {
103-
metrics.recordSuccess();
106+
stateRef.get().metrics.recordSuccess();
104107
}
105108

106-
public MetricsSnapshot evaluateMetrics() {
107-
MetricsSnapshot snapshot = metrics.getSnapshot();
109+
/**
110+
* Evaluate the current metrics to determine if the circuit breaker should transition to a new state.
111+
*
112+
* <p>
113+
* This method checks the failure rate and failure count against the configured thresholds. If the thresholds are met, the
114+
* circuit breaker transitions to the OPEN state. Metrics are reset when the state changes.
115+
* </p>
116+
*
117+
* @return an immutable snapshot of current metrics
118+
*/
119+
MetricsSnapshot evaluateMetrics() {
120+
CircuitBreakerStateHolder current = stateRef.get();
121+
MetricsSnapshot snapshot = current.metrics.getSnapshot();
108122
boolean evaluationResult = snapshot.getFailureRate() >= config.getFailureRateThreshold()
109123
&& snapshot.getFailureCount() >= config.getMinimumNumberOfFailures();
110124
if (evaluationResult) {
@@ -113,16 +127,56 @@ public MetricsSnapshot evaluateMetrics() {
113127
return snapshot;
114128
}
115129

130+
/**
131+
* Switch the circuit breaker to the specified state. This method is used to force the circuit breaker to a specific state.
132+
*
133+
* <p>
134+
* This method does not evaluate the metrics to determine if the state transition is valid. It simply transitions to the
135+
* specified state. Metrics are reset when the state changes.
136+
* </p>
137+
*
138+
* @param newState the target state
139+
*/
140+
public void transitionTo(State newState) {
141+
stateTransitionTo(newState);
142+
}
143+
144+
/**
145+
* Atomically transition to a new state with fresh metrics.
146+
* <p>
147+
* This method uses lock-free CAS to ensure that state change and metrics reset happen atomically. Whenever the circuit
148+
* breaker transitions to a new state, a fresh metrics instance is created, providing a clean slate for tracking metrics in
149+
* the new state.
150+
* <p>
151+
* If the state is already the target state, no transition occurs.
152+
*
153+
* @param newState the target state
154+
*/
116155
private void stateTransitionTo(State newState) {
117-
State previousState = this.currentState;
118-
if (previousState != newState) {
119-
this.currentState = newState;
120-
fireStateChanged(previousState, newState);
156+
while (true) {
157+
CircuitBreakerStateHolder current = stateRef.get();
158+
159+
// No transition needed if already in target state
160+
if (current.state == newState) {
161+
return;
162+
}
163+
164+
// Always create fresh metrics on state transition
165+
CircuitBreakerMetrics nextMetrics = MetricsFactory.createDefaultMetrics();
166+
167+
CircuitBreakerStateHolder next = new CircuitBreakerStateHolder(newState, nextMetrics);
168+
169+
// Atomically swap if current state hasn't changed
170+
if (stateRef.compareAndSet(current, next)) {
171+
fireStateChanged(current.state, newState);
172+
return;
173+
}
174+
// CAS failed, retry with updated current state
121175
}
122176
}
123177

124178
public State getCurrentState() {
125-
return currentState;
179+
return stateRef.get().state;
126180
}
127181

128182
/**
@@ -166,10 +220,29 @@ public void close() {
166220
listeners.clear();
167221
}
168222

169-
public static enum State {
223+
public enum State {
170224
CLOSED, OPEN
171225
}
172226

227+
/**
228+
* Immutable holder for circuit breaker state and metrics.
229+
* <p>
230+
* This class enables atomic updates of both state and metrics using {@link AtomicReference}. When a state transition
231+
* occurs, a new instance is created with fresh metrics.
232+
*/
233+
private static final class CircuitBreakerStateHolder {
234+
235+
final State state;
236+
237+
final CircuitBreakerMetrics metrics;
238+
239+
CircuitBreakerStateHolder(State state, CircuitBreakerMetrics metrics) {
240+
this.state = state;
241+
this.metrics = metrics;
242+
}
243+
244+
}
245+
173246
public static class CircuitBreakerConfig {
174247

175248
private final static float DEFAULT_FAILURE_RATE_THRESHOLD = 10;

src/main/java/io/lettuce/core/failover/metrics/MetricsSnapshotImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,21 @@ public String toString() {
100100
+ ", failureRate=" + String.format("%.2f", getFailureRate()) + "%" + '}';
101101
}
102102

103+
@Override
104+
public boolean equals(Object o) {
105+
if (this == o) {
106+
return true;
107+
}
108+
if (o == null || getClass() != o.getClass()) {
109+
return false;
110+
}
111+
112+
MetricsSnapshotImpl that = (MetricsSnapshotImpl) o;
113+
114+
if (successCount != that.successCount) {
115+
return false;
116+
}
117+
return failureCount == that.failureCount;
118+
}
119+
103120
}

src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5+
import static org.awaitility.Awaitility.await;
56
import static org.junit.jupiter.api.Assertions.assertNotNull;
67

78
import java.util.List;
@@ -10,6 +11,7 @@
1011

1112
import javax.inject.Inject;
1213

14+
import io.lettuce.core.failover.metrics.MetricsSnapshot;
1315
import org.junit.jupiter.api.Tag;
1416
import org.junit.jupiter.api.Test;
1517
import org.junit.jupiter.api.extension.ExtendWith;
@@ -111,34 +113,54 @@ void shouldThrowExceptionForUnknownEndpoint() {
111113

112114
@Test
113115
void shouldMaintainMetricsAfterSwitch() {
116+
// Given: Connection with multiple endpoints
114117
StatefulRedisMultiDbConnection<String, String> connection = multiDbClient.connect();
115118
RedisURI firstEndpoint = connection.getCurrentEndpoint();
116119

117-
// Execute command on first endpoint
118-
connection.sync().set("key1", "value1");
119-
CircuitBreaker cb1Before = connection.getCircuitBreaker(firstEndpoint);
120-
long successes1Before = cb1Before.getSnapshot().getSuccessCount();
120+
// When: Record successful command on first endpoint
121+
MetricsSnapshot metricsBefore = recordSuccessfulCommand(connection, "key1", "value1");
121122

122-
// Switch to second endpoint
123+
// When: Switch to second endpoint
123124
List<RedisURI> endpoints = StreamSupport.stream(connection.getEndpoints().spliterator(), false)
124125
.collect(Collectors.toList());
125126
RedisURI secondEndpoint = endpoints.stream().filter(uri -> !uri.equals(firstEndpoint)).findFirst()
126127
.orElseThrow(() -> new IllegalStateException("No second endpoint found"));
127128
connection.switchToDatabase(secondEndpoint);
128129

129-
// Execute command on second endpoint
130-
connection.sync().set("key2", "value2");
130+
// When: Record successful commands on second endpoint
131+
recordSuccessfulCommand(connection, "key2", "value2");
132+
recordSuccessfulCommand(connection, "key3", "value3");
131133

132-
// Switch back to first endpoint
134+
// When: Switch back to first endpoint
133135
connection.switchToDatabase(firstEndpoint);
134136

135-
// Verify metrics for first endpoint are unchanged
137+
// Then: Circuit breaker metrics on first endpoint should be maintained
136138
CircuitBreaker cb1After = connection.getCircuitBreaker(firstEndpoint);
137-
assertThat(cb1After.getSnapshot().getSuccessCount()).isEqualTo(successes1Before);
139+
assertThat(cb1After.getSnapshot()).isEqualTo(metricsBefore);
138140

139141
connection.close();
140142
}
141143

144+
/**
145+
* Helper method to record a successful command and wait for metrics to update.
146+
*
147+
* <p>
148+
* Metrics are updated asynchronously post command completion, we need to wait for the metrics to update before proceeding.
149+
* </p>
150+
*
151+
* @param connection
152+
* @param key
153+
* @param value
154+
* @return final success count
155+
*/
156+
private MetricsSnapshot recordSuccessfulCommand(StatefulRedisMultiDbConnection<String, String> connection, String key,
157+
String value) {
158+
CircuitBreaker cb = connection.getCircuitBreaker(connection.getCurrentEndpoint());
159+
MetricsSnapshot metrics = cb.getSnapshot();
160+
connection.sync().set(key, value);
161+
return await().until(cb::getSnapshot, snapshot -> snapshot.getSuccessCount() > metrics.getSuccessCount());
162+
}
163+
142164
@Test
143165
void shouldExposeMetricsViaCircuitBreaker() {
144166
StatefulRedisMultiDbConnection<String, String> connection = multiDbClient.connect();

src/test/java/io/lettuce/core/failover/CircuitBreakerUnitTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* Comprehensive unit tests for {@link CircuitBreaker} functionality.
2424
*
2525
* @author Ali Takavci
26+
* @author Ivo Gaydajiev
2627
* @since 7.1
2728
*/
2829
@Tag(UNIT_TEST)
@@ -238,6 +239,50 @@ void shouldNotTransitionWhenAlreadyInTargetState() {
238239
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.OPEN);
239240
}
240241

242+
@Test
243+
@DisplayName("Should reset metrics on forceful state transition")
244+
void shouldResetMetricsOnStateTransition() {
245+
// Given: some recorded events
246+
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.CLOSED);
247+
circuitBreaker.recordSuccess();
248+
circuitBreaker.recordFailure();
249+
250+
// When: force transition to OPEN
251+
circuitBreaker.transitionTo(CircuitBreaker.State.OPEN);
252+
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.OPEN);
253+
254+
// Then: metrics should be reset
255+
assertThat(circuitBreaker.getSnapshot().getSuccessCount()).isEqualTo(0);
256+
assertThat(circuitBreaker.getSnapshot().getFailureCount()).isEqualTo(0);
257+
258+
// When: record some more successes and failures
259+
circuitBreaker.recordSuccess();
260+
circuitBreaker.recordFailure();
261+
262+
// Then: metrics should reflect the new events
263+
assertThat(circuitBreaker.getSnapshot().getSuccessCount()).isEqualTo(1);
264+
assertThat(circuitBreaker.getSnapshot().getFailureCount()).isEqualTo(1);
265+
}
266+
267+
@Test
268+
@DisplayName("Should reset metrics on automatic state transition")
269+
void shouldResetMetricsOnAutomaticStateTransition() {
270+
// Given: some recorded events
271+
for (int i = 0; i < 4; i++) {
272+
circuitBreaker.recordSuccess();
273+
circuitBreaker.recordFailure();
274+
}
275+
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.CLOSED);
276+
277+
// When: record 1 more failures to meet the minimumNumberOfFailures threshold
278+
circuitBreaker.recordFailure();
279+
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.OPEN);
280+
281+
// Then: metrics should be reset
282+
assertThat(circuitBreaker.getSnapshot().getSuccessCount()).isEqualTo(0);
283+
assertThat(circuitBreaker.getSnapshot().getFailureCount()).isEqualTo(0);
284+
}
285+
241286
}
242287

243288
@Nested
@@ -452,7 +497,9 @@ void shouldHandleLargeNumberOfOperations() {
452497

453498
// Should open because failure count >= 1000 and failure rate >= 1.0%
454499
assertThat(circuitBreaker.getCurrentState()).isEqualTo(CircuitBreaker.State.OPEN);
455-
assertThat(circuitBreaker.getSnapshot().getFailureCount()).isGreaterThanOrEqualTo(1000);
500+
// After state transition, metrics are reset to fresh instance
501+
assertThat(circuitBreaker.getSnapshot().getFailureCount()).isEqualTo(0);
502+
assertThat(circuitBreaker.getSnapshot().getSuccessCount()).isEqualTo(0);
456503
}
457504

458505
}

0 commit comments

Comments
 (0)