Skip to content

Commit 97342ef

Browse files
committed
Add example for automatic failover
1 parent 20ea67a commit 97342ef

File tree

1 file changed

+131
-0
lines changed

1 file changed

+131
-0
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package io.lettuce.examples;
2+
3+
import io.lettuce.core.ClientOptions;
4+
import io.lettuce.core.RedisURI;
5+
import io.lettuce.core.SocketOptions;
6+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
7+
import io.lettuce.core.failover.CircuitBreaker;
8+
import io.lettuce.core.failover.DatabaseConfig;
9+
import io.lettuce.core.failover.MultiDbClient;
10+
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
11+
import io.lettuce.core.failover.health.HealthCheckStrategy;
12+
import io.lettuce.core.failover.health.HealthCheckStrategySupplier;
13+
import io.lettuce.core.failover.health.PingStrategy;
14+
import io.lettuce.test.Wait;
15+
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
import reactor.core.Disposable;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
import reactor.test.StepVerifier;
22+
23+
import java.time.Duration;
24+
import java.util.*;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
/**
29+
* Example of automatic failover using MultiDbClient. Automatic Failover API is subject to change since we are still in Beta and
30+
* actively improving the API.*
31+
*/
32+
public class AutomaticFailover {
33+
34+
private static final Logger log = LoggerFactory.getLogger(AutomaticFailover.class);
35+
36+
public static void main(String[] args) {
37+
// Setup: Configure MultiDbClient
38+
// Both endpoints should be available in the test environment.
39+
// At this point client lib does not check if the endpoints are actually available.
40+
// This limitation will be addressed in the future.
41+
// Local Redis instances can be started with:
42+
// docker run -p 6380:6379 -it redis:8.2.1
43+
// docker run -p 6381:6379 -it redis:8.2.1
44+
List<String> endpoints = Arrays.asList("redis://localhost:6380", "redis://localhost:6381");
45+
46+
SocketOptions.TcpUserTimeoutOptions tcpUserTimeout = SocketOptions.TcpUserTimeoutOptions.builder()
47+
.tcpUserTimeout(Duration.ofSeconds(4)).enable().build();
48+
49+
SocketOptions.KeepAliveOptions keepAliveOptions = SocketOptions.KeepAliveOptions.builder()
50+
.interval(Duration.ofSeconds(1)).idle(Duration.ofSeconds(1)).count(3).enable().build();
51+
52+
ClientOptions clientOptions = ClientOptions.builder().autoReconnect(false)
53+
.socketOptions(SocketOptions.builder().tcpUserTimeout(tcpUserTimeout).keepAlive(keepAliveOptions).build())
54+
.build();
55+
56+
List<DatabaseConfig> databaseConfigs = createDatabaseConfigs(endpoints, clientOptions);
57+
MultiDbClient client = MultiDbClient.create(databaseConfigs);
58+
59+
// Auto-reconnect and automatic failback are not supported in the current Beta release.
60+
client.setOptions(clientOptions);
61+
62+
// Connect to the MultiDbClient
63+
StatefulRedisMultiDbConnection<String, String> connection = client.connect();
64+
65+
// Get reactive commands interface
66+
RedisReactiveCommands<String, String> reactive = connection.reactive();
67+
68+
String keyName = "multidb-counter";
69+
70+
// Setup: Set initial counter value
71+
StepVerifier.create(reactive.set(keyName, "0")).expectNext("OK").verifyComplete();
72+
73+
AtomicLong commandsSubmitted = new AtomicLong();
74+
List<Throwable> capturedExceptions = new CopyOnWriteArrayList<>();
75+
76+
// Start a flux that imitates an application using the client
77+
// This follows the exact pattern from ConnectionInterruptionReactiveTest#testWithReactiveCommands
78+
Disposable subscription = Flux.interval(Duration.ofMillis(100)).flatMap(i -> reactive.incr(keyName)
79+
// We should count all attempts, because Lettuce retransmits failed commands
80+
.doFinally(value -> {
81+
commandsSubmitted.incrementAndGet();
82+
log.info("Commands submitted {}", commandsSubmitted.get());
83+
}).onErrorResume(e -> {
84+
capturedExceptions.add(e);
85+
return Mono.empty();
86+
})).subscribe();
87+
88+
// Wait for some commands to be executed before triggering the fault
89+
Wait.untilTrue(() -> commandsSubmitted.get() > 10).waitOrTimeout();
90+
91+
log.info("Executing commands. Stop the first Redis server to trigger failover");
92+
93+
// Wait a bit more to allow recovery
94+
Wait.untilTrue(() -> commandsSubmitted.get() > 10000).during(Duration.ofSeconds(120)).waitOrTimeout();
95+
96+
log.info("Captured exceptions: {}", capturedExceptions);
97+
98+
// Stop the command execution
99+
subscription.dispose();
100+
connection.close();
101+
client.shutdown();
102+
}
103+
104+
/**
105+
* @param clientOptions the client options to use for all database configs
106+
* @return list of DatabaseConfig instances
107+
*/
108+
private static List<DatabaseConfig> createDatabaseConfigs(List<String> endpoints,
109+
io.lettuce.core.ClientOptions clientOptions) {
110+
List<DatabaseConfig> configs = new ArrayList<>();
111+
112+
CircuitBreaker.CircuitBreakerConfig circuitBreakerConfig = new CircuitBreaker.CircuitBreakerConfig(5.0f, 5,
113+
CircuitBreaker.CircuitBreakerConfig.DEFAULT.getTrackedExceptions(), 2);
114+
115+
HealthCheckStrategySupplier pingSupplier = (uri, options) -> new PingStrategy(uri, options,
116+
HealthCheckStrategy.Config.builder().interval(500).timeout(1000).numProbes(1).build());
117+
118+
// Create a DatabaseConfig for each endpoint
119+
float weight = 1.0f;
120+
for (String endpointUri : endpoints) {
121+
RedisURI uri = RedisURI.builder(RedisURI.create(endpointUri)).withTimeout(Duration.ofSeconds(5)).build();
122+
123+
configs.add(new DatabaseConfig(uri, weight, clientOptions, circuitBreakerConfig, pingSupplier));
124+
weight /= 2; // Decrease weight for subsequent endpoints
125+
}
126+
127+
log.info("Created {} DatabaseConfig(s) from endpoint", configs.size());
128+
return configs;
129+
}
130+
131+
}

0 commit comments

Comments
 (0)