Skip to content

Commit 8d1796f

Browse files
uglideggivoatakavci
authored
[automatic failover] Add example for automatic failover (#3568)
* add DatabaseConfig.Builder * healthCheckStrategySupplier now defaults to PingStrategy.DEFAULT in the builder - When using the builder without setting healthCheckStrategySupplier: Health checks will use PingStrategy.DEFAULT - When explicitly setting to null: Health checks will be disabled (as documented) - When setting to a custom supplier: Uses the custom health check strategy Example Usage: // Uses PingStrategy.DEFAULT for health checks DatabaseConfig config1 = DatabaseConfig.builder(uri) .weight(1.0f) .build(); // Explicitly disables health checks DatabaseConfig config2 = DatabaseConfig.builder(uri) .healthCheckStrategySupplier(null) .build(); // Uses custom health check strategy DatabaseConfig config3 = DatabaseConfig.builder(uri) .healthCheckStrategySupplier(customSupplier) .build(); * HealthCheckStrategySupplier.NO_HEALTH_CHECK instead null * Remove DatabaseConfig constructors // To create DatabaseConfig use provided builder DatabaseConfig config = DatabaseConfig.builder(redisURI) .weight(1.5f) .clientOptions(options) .circuitBreakerConfig(cbConfig) .healthCheckStrategySupplier(supplier) .build(); * remove redundant public modifiers * Builder for CircuitBreakerConfig // Minimal configuration with defaults CircuitBreakerConfig config = CircuitBreakerConfig.builder().build(); // Custom configuration CircuitBreakerConfig config = CircuitBreakerConfig.builder() .failureRateThreshold(25.0f) .minimumNumberOfFailures(500) .metricsWindowSize(5) .build(); // With custom tracked exceptions Set<Class<? extends Throwable>> customExceptions = new HashSet<>(); customExceptions.add(RuntimeException.class); CircuitBreakerConfig config = CircuitBreakerConfig.builder() .failureRateThreshold(15.5f) .minimumNumberOfFailures(200) .trackedExceptions(customExceptions) .metricsWindowSize(3) .build(); * enforce min window size of 2s * tracked exceptions should not be null * add convenience methods for Tracked Exceptions //Combine add and remove CircuitBreakerConfig config = CircuitBreakerConfig.builder() .addTrackedExceptions(MyCustomException.class) .removeTrackedExceptions(TimeoutException.class) .build(); // Replace all tracked exceptions Set<Class<? extends Throwable>> customExceptions = new HashSet<>(); customExceptions.add(RuntimeException.class); customExceptions.add(IOException.class); CircuitBreakerConfig config = CircuitBreakerConfig.builder() .trackedExceptions(customExceptions) .build(); * remove option to configure per database clientOptions till #3572 is resolved * Disable health checks in test configs to isolate circuit breaker testing Configure DB1, DB2, and DB3 with NO_HEALTH_CHECK to prevent health check interference when testing circuit breaker failure detection. * forma * clean up * address review comments (Copilot) * Add example for automatic failover * Use builders * shutdown primary instance * remove unused imports * Update src/test/java/io/lettuce/examples/AutomaticFailover.java Co-authored-by: atakavci <a_takavci@yahoo.com> * revert accidentally disabled user timeout config --------- Co-authored-by: ggivo <ivo.gaydazhiev@redis.com> Co-authored-by: atakavci <a_takavci@yahoo.com>
1 parent bad065d commit 8d1796f

File tree

1 file changed

+166
-0
lines changed

1 file changed

+166
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package io.lettuce.examples;
2+
3+
import io.lettuce.core.ClientOptions;
4+
import io.lettuce.core.RedisClient;
5+
import io.lettuce.core.RedisConnectionException;
6+
import io.lettuce.core.RedisURI;
7+
import io.lettuce.core.SocketOptions;
8+
import io.lettuce.core.api.StatefulRedisConnection;
9+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
10+
import io.lettuce.core.failover.CircuitBreaker;
11+
import io.lettuce.core.failover.DatabaseConfig;
12+
import io.lettuce.core.failover.MultiDbClient;
13+
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
14+
import io.lettuce.core.failover.health.PingStrategy;
15+
import io.lettuce.test.Wait;
16+
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
import reactor.core.Disposable;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
22+
import reactor.test.StepVerifier;
23+
24+
import java.time.Duration;
25+
import java.util.*;
26+
import java.util.concurrent.CopyOnWriteArrayList;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
29+
/**
30+
* Example of automatic failover using MultiDbClient. Automatic Failover API is subject to change since we are still in Beta and
31+
* actively improving the API.
32+
*/
33+
public class AutomaticFailover {
34+
35+
private static final Logger log = LoggerFactory.getLogger(AutomaticFailover.class);
36+
37+
public static void main(String[] args) {
38+
// Setup: Configure MultiDbClient
39+
// Both endpoints should be available in the test environment.
40+
// At this point multiDbClient lib does not check if the endpoints are actually available.
41+
// This limitation will be addressed in the future.
42+
// Local Redis instances can be started with:
43+
// docker run -p 6380:6379 -it redis:8.2.1
44+
// docker run -p 6381:6379 -it redis:8.2.1
45+
List<String> endpoints = Arrays.asList("redis://localhost:6380", "redis://localhost:6381");
46+
47+
SocketOptions.TcpUserTimeoutOptions tcpUserTimeout = SocketOptions.TcpUserTimeoutOptions.builder()
48+
.tcpUserTimeout(Duration.ofSeconds(4)).enable().build();
49+
50+
SocketOptions.KeepAliveOptions keepAliveOptions = SocketOptions.KeepAliveOptions.builder()
51+
.interval(Duration.ofSeconds(1)).idle(Duration.ofSeconds(1)).count(3).enable().build();
52+
53+
ClientOptions clientOptions = ClientOptions.builder()
54+
.socketOptions(SocketOptions.builder().tcpUserTimeout(tcpUserTimeout).keepAlive(keepAliveOptions).build())
55+
.build();
56+
57+
List<DatabaseConfig> databaseConfigs = createDatabaseConfigs(endpoints);
58+
MultiDbClient multiDbClient = MultiDbClient.create(databaseConfigs);
59+
60+
// Automatic failback are not supported in the current Beta release.
61+
multiDbClient.setOptions(clientOptions);
62+
63+
// Connect to the MultiDbClient
64+
StatefulRedisMultiDbConnection<String, String> connection = multiDbClient.connect();
65+
log.info("Connected to {}", connection.getCurrentEndpoint());
66+
log.info("Available Endpoints: {}", connection.getEndpoints());
67+
68+
// Get reactive commands interface
69+
RedisReactiveCommands<String, String> reactive = connection.reactive();
70+
71+
String keyName = "multidb-counter";
72+
73+
// Setup: Set initial counter value
74+
StepVerifier.create(reactive.set(keyName, "0")).expectNext("OK").verifyComplete();
75+
76+
AtomicLong commandsSubmitted = new AtomicLong();
77+
List<Throwable> capturedExceptions = new CopyOnWriteArrayList<>();
78+
79+
// Start a flux that imitates an application using the multiDbClient
80+
// This follows the exact pattern from ConnectionInterruptionReactiveTest#testWithReactiveCommands
81+
Disposable subscription = Flux.interval(Duration.ofMillis(100)).flatMap(i -> reactive.incr(keyName)
82+
// We should count all attempts, because Lettuce retransmits failed commands
83+
.doFinally(value -> {
84+
commandsSubmitted.incrementAndGet();
85+
log.info("Commands submitted {}", commandsSubmitted.get());
86+
}).onErrorResume(e -> {
87+
capturedExceptions.add(e);
88+
return Mono.empty();
89+
})).subscribe();
90+
91+
// Wait for some commands to be executed before triggering the fault
92+
Wait.untilTrue(() -> commandsSubmitted.get() > 10).waitOrTimeout();
93+
94+
// Direct connection to the Redis instances used to trigger Primary Redis instance shutdown
95+
// and verify that commands are executed on the secondary Redis instance
96+
RedisClient directClient = RedisClient.create();
97+
98+
// Shutdown the current Redis instance to trigger failover
99+
RedisURI currentEndpoint = connection.getCurrentEndpoint();
100+
log.info("Stoping Redis server [{}] to trigger failover", currentEndpoint);
101+
shutdownRedisInstance(directClient, currentEndpoint);
102+
103+
// Wait for Commands to start being executed on the secondary Redis instances
104+
RedisURI secondaryUri = RedisURI.create(endpoints.get(1));
105+
StatefulRedisConnection<String, String> secondary = directClient.connect(secondaryUri);
106+
log.info("Waiting for commands to be executed on [{}]", secondaryUri);
107+
Wait.untilTrue(() -> (getMulitDbCounter(secondary)) > 20).during(Duration.ofSeconds(10)).waitOrTimeout();
108+
109+
// Stop the command execution
110+
subscription.dispose();
111+
112+
log.info("Commands submitted total: {}", commandsSubmitted.get());
113+
log.info("Commands executed on secondary [{}]: {}", secondaryUri, getMulitDbCounter(secondary));
114+
log.info("Captured exceptions: {}", capturedExceptions);
115+
116+
// Cleanup
117+
directClient.shutdown();
118+
multiDbClient.shutdown();
119+
}
120+
121+
private static int getMulitDbCounter(StatefulRedisConnection<String, String> connection) {
122+
String count = connection.sync().get("multidb-counter");
123+
return count == null ? 0 : Integer.parseInt(count);
124+
}
125+
126+
private static void shutdownRedisInstance(RedisClient redis, RedisURI redisUri) {
127+
log.info("Shutting down Redis instance [{}]", redisUri);
128+
try (StatefulRedisConnection<String, String> redis1 = redis.connect(redisUri)) {
129+
redis1.sync().shutdown(true);
130+
131+
Wait.untilTrue(() -> {
132+
try {
133+
redis.connect(redisUri);
134+
} catch (Exception e) {
135+
return e instanceof RedisConnectionException;
136+
}
137+
return false;
138+
}).waitOrTimeout();
139+
140+
log.info("Redis instance [{}] is down", redisUri);
141+
}
142+
}
143+
144+
/**
145+
* @return list of DatabaseConfig instances
146+
*/
147+
private static List<DatabaseConfig> createDatabaseConfigs(List<String> endpoints) {
148+
List<DatabaseConfig> configs = new ArrayList<>();
149+
150+
CircuitBreaker.CircuitBreakerConfig circuitBreakerConfig = CircuitBreaker.CircuitBreakerConfig.builder()
151+
.failureRateThreshold(10.0f).minimumNumberOfFailures(5).metricsWindowSize(5).build();
152+
153+
// Create a DatabaseConfig for each endpoint
154+
float weight = 1.0f;
155+
for (String endpointUri : endpoints) {
156+
configs.add(DatabaseConfig.builder(RedisURI.create(endpointUri)).weight(weight)
157+
.circuitBreakerConfig(circuitBreakerConfig).healthCheckStrategySupplier(PingStrategy.DEFAULT).build());
158+
159+
weight /= 2; // Decrease weight for subsequent endpoints
160+
}
161+
162+
log.info("Created {} DatabaseConfig(s) from endpoint", configs.size());
163+
return configs;
164+
}
165+
166+
}

0 commit comments

Comments
 (0)