Skip to content
Merged
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8c3c658
add DatabaseConfig.Builder
ggivo Dec 10, 2025
2963f3f
healthCheckStrategySupplier now defaults to PingStrategy.DEFAULT in t…
ggivo Dec 10, 2025
af02161
HealthCheckStrategySupplier.NO_HEALTH_CHECK instead null
ggivo Dec 10, 2025
2cdef7b
Remove DatabaseConfig constructors
ggivo Dec 10, 2025
02408ac
remove redundant public modifiers
ggivo Dec 10, 2025
b3cb131
Builder for CircuitBreakerConfig
ggivo Dec 10, 2025
459f498
enforce min window size of 2s
ggivo Dec 10, 2025
f2c2ab8
tracked exceptions should not be null
ggivo Dec 10, 2025
d19c932
add convenience methods for Tracked Exceptions
ggivo Dec 10, 2025
fe57256
remove option to configure per database clientOptions till #3572 is r…
ggivo Dec 10, 2025
afdd830
Disable health checks in test configs to isolate circuit breaker testing
ggivo Dec 10, 2025
6089135
forma
ggivo Dec 10, 2025
e2795b2
clean up
ggivo Dec 10, 2025
999511f
address review comments (Copilot)
ggivo Dec 10, 2025
e1f529d
Add example for automatic failover
uglide Dec 9, 2025
a5693a4
Use builders
uglide Dec 10, 2025
eb7afb1
Merge branch 'feature/automatic-failover-1' into im/aa-demo-example
ggivo Dec 11, 2025
719f5d2
Merge branch 'feature/automatic-failover-1' into im/aa-demo-example
ggivo Dec 11, 2025
04d3c24
shutdown primary instance
ggivo Dec 11, 2025
aa98066
remove unused imports
ggivo Dec 11, 2025
e9a07ac
Update src/test/java/io/lettuce/examples/AutomaticFailover.java
ggivo Dec 11, 2025
85b4b64
revert accidentally disabled user timeout config
ggivo Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions src/test/java/io/lettuce/examples/AutomaticFailover.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package io.lettuce.examples;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.failover.CircuitBreaker;
import io.lettuce.core.failover.DatabaseConfig;
import io.lettuce.core.failover.MultiDbClient;
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
import io.lettuce.core.failover.health.PingStrategy;
import io.lettuce.test.Wait;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;

/**
* Example of automatic failover using MultiDbClient. Automatic Failover API is subject to change since we are still in Beta and
* actively improving the API.
*/
public class AutomaticFailover {

private static final Logger log = LoggerFactory.getLogger(AutomaticFailover.class);

public static void main(String[] args) {
// Setup: Configure MultiDbClient
// Both endpoints should be available in the test environment.
// At this point multiDbClient lib does not check if the endpoints are actually available.
// This limitation will be addressed in the future.
// Local Redis instances can be started with:
// docker run -p 6380:6379 -it redis:8.2.1
// docker run -p 6381:6379 -it redis:8.2.1
List<String> endpoints = Arrays.asList("redis://localhost:6380", "redis://localhost:6381");

// SocketOptions.TcpUserTimeoutOptions tcpUserTimeout = SocketOptions.TcpUserTimeoutOptions.builder()
// .tcpUserTimeout(Duration.ofSeconds(4)).enable().build();

SocketOptions.KeepAliveOptions keepAliveOptions = SocketOptions.KeepAliveOptions.builder()
.interval(Duration.ofSeconds(1)).idle(Duration.ofSeconds(1)).count(3).enable().build();

ClientOptions clientOptions = ClientOptions.builder().socketOptions(SocketOptions.builder()
// .tcpUserTimeout(tcpUserTimeout)
.keepAlive(keepAliveOptions).build()).build();

List<DatabaseConfig> databaseConfigs = createDatabaseConfigs(endpoints);
MultiDbClient multiDbClient = MultiDbClient.create(databaseConfigs);

// Automatic failback are not supported in the current Beta release.
multiDbClient.setOptions(clientOptions);

// Connect to the MultiDbClient
StatefulRedisMultiDbConnection<String, String> connection = multiDbClient.connect();
log.info("Connected to {}", connection.getCurrentEndpoint());
log.info("Available Endpoints: {}", connection.getEndpoints());

// Get reactive commands interface
RedisReactiveCommands<String, String> reactive = connection.reactive();

String keyName = "multidb-counter";

// Setup: Set initial counter value
StepVerifier.create(reactive.set(keyName, "0")).expectNext("OK").verifyComplete();

AtomicLong commandsSubmitted = new AtomicLong();
List<Throwable> capturedExceptions = new CopyOnWriteArrayList<>();

// Start a flux that imitates an application using the multiDbClient
// This follows the exact pattern from ConnectionInterruptionReactiveTest#testWithReactiveCommands
Disposable subscription = Flux.interval(Duration.ofMillis(100)).flatMap(i -> reactive.incr(keyName)
// We should count all attempts, because Lettuce retransmits failed commands
.doFinally(value -> {
commandsSubmitted.incrementAndGet();
log.info("Commands submitted {}", commandsSubmitted.get());
}).onErrorResume(e -> {
capturedExceptions.add(e);
return Mono.empty();
})).subscribe();

// Wait for some commands to be executed before triggering the fault
Wait.untilTrue(() -> commandsSubmitted.get() > 10).waitOrTimeout();

// Direct connection to the Redis instances used to trigger Primary Redis instance shutdown
// and verify that commands are executed on the secondary Redis instance
RedisClient directClient = RedisClient.create();
RedisURI secondaryUri = RedisURI.create(endpoints.get(1));
StatefulRedisConnection<String, String> secondary = directClient.connect(secondaryUri);

// Shutdown the current Redis instance to trigger failover
RedisURI currentEndpoint = connection.getCurrentEndpoint();
log.info("Stoping Redis server [{}] to trigger failover", currentEndpoint);
shutdownRedisInstance(directClient, currentEndpoint);

// Wait for Commands to start being executed on the secondary Redis instances
log.info("Waiting for commands to be executed on [{}]", secondaryUri);
Wait.untilTrue(() -> (getMulitDbCounter(secondary)) > 20).during(Duration.ofSeconds(10)).waitOrTimeout();

// Stop the command execution
subscription.dispose();

log.info("Commands submitted total: {}", commandsSubmitted.get());
log.info("Commands executed on secondary [{}]: {}", secondaryUri, getMulitDbCounter(secondary));
log.info("Captured exceptions: {}", capturedExceptions);

// Cleanup
directClient.shutdown();
multiDbClient.shutdown();
}

private static int getMulitDbCounter(StatefulRedisConnection<String, String> connection) {
String count = connection.sync().get("multidb-counter");
return count == null ? 0 : Integer.parseInt(count);
}

private static void shutdownRedisInstance(RedisClient redis, RedisURI redisUri) {
log.info("Shutting down Redis instance [{}]", redisUri);
try (StatefulRedisConnection<String, String> redis1 = redis.connect(redisUri)) {
redis1.sync().shutdown(true);

Wait.untilTrue(() -> {
try {
redis.connect(redisUri);
} catch (Exception e) {
return e instanceof RedisConnectionException;
}
return false;
}).waitOrTimeout();

log.info("Redis instance [{}] is down", redisUri);
}
}

/**
* @return list of DatabaseConfig instances
*/
private static List<DatabaseConfig> createDatabaseConfigs(List<String> endpoints) {
List<DatabaseConfig> configs = new ArrayList<>();

CircuitBreaker.CircuitBreakerConfig circuitBreakerConfig = CircuitBreaker.CircuitBreakerConfig.builder()
.failureRateThreshold(10.0f).minimumNumberOfFailures(5).metricsWindowSize(5).build();

// Create a DatabaseConfig for each endpoint
float weight = 1.0f;
for (String endpointUri : endpoints) {
configs.add(DatabaseConfig.builder(RedisURI.create(endpointUri)).weight(weight)
.circuitBreakerConfig(circuitBreakerConfig).healthCheckStrategySupplier(PingStrategy.DEFAULT).build());

weight /= 2; // Decrease weight for subsequent endpoints
}

log.info("Created {} DatabaseConfig(s) from endpoint", configs.size());
return configs;
}

}
Loading