-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[automatic failover] Add example for automatic failover #3568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+166
−0
Merged
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
8c3c658
add DatabaseConfig.Builder
ggivo 2963f3f
healthCheckStrategySupplier now defaults to PingStrategy.DEFAULT in t…
ggivo af02161
HealthCheckStrategySupplier.NO_HEALTH_CHECK instead null
ggivo 2cdef7b
Remove DatabaseConfig constructors
ggivo 02408ac
remove redundant public modifiers
ggivo b3cb131
Builder for CircuitBreakerConfig
ggivo 459f498
enforce min window size of 2s
ggivo f2c2ab8
tracked exceptions should not be null
ggivo d19c932
add convenience methods for Tracked Exceptions
ggivo fe57256
remove option to configure per database clientOptions till #3572 is r…
ggivo afdd830
Disable health checks in test configs to isolate circuit breaker testing
ggivo 6089135
forma
ggivo e2795b2
clean up
ggivo 999511f
address review comments (Copilot)
ggivo e1f529d
Add example for automatic failover
uglide a5693a4
Use builders
uglide eb7afb1
Merge branch 'feature/automatic-failover-1' into im/aa-demo-example
ggivo 719f5d2
Merge branch 'feature/automatic-failover-1' into im/aa-demo-example
ggivo 04d3c24
shutdown primary instance
ggivo aa98066
remove unused imports
ggivo e9a07ac
Update src/test/java/io/lettuce/examples/AutomaticFailover.java
ggivo 85b4b64
revert accidentally disabled user timeout config
ggivo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
166 changes: 166 additions & 0 deletions
166
src/test/java/io/lettuce/examples/AutomaticFailover.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| package io.lettuce.examples; | ||
|
|
||
| import io.lettuce.core.ClientOptions; | ||
| import io.lettuce.core.RedisClient; | ||
| import io.lettuce.core.RedisConnectionException; | ||
| import io.lettuce.core.RedisURI; | ||
| import io.lettuce.core.SocketOptions; | ||
| import io.lettuce.core.api.StatefulRedisConnection; | ||
| import io.lettuce.core.api.reactive.RedisReactiveCommands; | ||
| import io.lettuce.core.failover.CircuitBreaker; | ||
| import io.lettuce.core.failover.DatabaseConfig; | ||
| import io.lettuce.core.failover.MultiDbClient; | ||
| import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection; | ||
| import io.lettuce.core.failover.health.PingStrategy; | ||
| import io.lettuce.test.Wait; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import reactor.core.Disposable; | ||
| import reactor.core.publisher.Flux; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.test.StepVerifier; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.*; | ||
| import java.util.concurrent.CopyOnWriteArrayList; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
| * Example of automatic failover using MultiDbClient. Automatic Failover API is subject to change since we are still in Beta and | ||
| * actively improving the API. | ||
| */ | ||
| public class AutomaticFailover { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(AutomaticFailover.class); | ||
|
|
||
| public static void main(String[] args) { | ||
| // Setup: Configure MultiDbClient | ||
| // Both endpoints should be available in the test environment. | ||
| // At this point multiDbClient lib does not check if the endpoints are actually available. | ||
| // This limitation will be addressed in the future. | ||
| // Local Redis instances can be started with: | ||
| // docker run -p 6380:6379 -it redis:8.2.1 | ||
| // docker run -p 6381:6379 -it redis:8.2.1 | ||
| List<String> endpoints = Arrays.asList("redis://localhost:6380", "redis://localhost:6381"); | ||
|
|
||
| // SocketOptions.TcpUserTimeoutOptions tcpUserTimeout = SocketOptions.TcpUserTimeoutOptions.builder() | ||
| // .tcpUserTimeout(Duration.ofSeconds(4)).enable().build(); | ||
|
|
||
| SocketOptions.KeepAliveOptions keepAliveOptions = SocketOptions.KeepAliveOptions.builder() | ||
| .interval(Duration.ofSeconds(1)).idle(Duration.ofSeconds(1)).count(3).enable().build(); | ||
|
|
||
| ClientOptions clientOptions = ClientOptions.builder().socketOptions(SocketOptions.builder() | ||
| // .tcpUserTimeout(tcpUserTimeout) | ||
ggivo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .keepAlive(keepAliveOptions).build()).build(); | ||
|
|
||
| List<DatabaseConfig> databaseConfigs = createDatabaseConfigs(endpoints); | ||
| MultiDbClient multiDbClient = MultiDbClient.create(databaseConfigs); | ||
|
|
||
| // Automatic failback are not supported in the current Beta release. | ||
| multiDbClient.setOptions(clientOptions); | ||
|
|
||
| // Connect to the MultiDbClient | ||
| StatefulRedisMultiDbConnection<String, String> connection = multiDbClient.connect(); | ||
| log.info("Connected to {}", connection.getCurrentEndpoint()); | ||
| log.info("Available Endpoints: {}", connection.getEndpoints()); | ||
|
|
||
| // Get reactive commands interface | ||
| RedisReactiveCommands<String, String> reactive = connection.reactive(); | ||
|
|
||
| String keyName = "multidb-counter"; | ||
|
|
||
| // Setup: Set initial counter value | ||
| StepVerifier.create(reactive.set(keyName, "0")).expectNext("OK").verifyComplete(); | ||
|
|
||
| AtomicLong commandsSubmitted = new AtomicLong(); | ||
| List<Throwable> capturedExceptions = new CopyOnWriteArrayList<>(); | ||
|
|
||
| // Start a flux that imitates an application using the multiDbClient | ||
| // This follows the exact pattern from ConnectionInterruptionReactiveTest#testWithReactiveCommands | ||
| Disposable subscription = Flux.interval(Duration.ofMillis(100)).flatMap(i -> reactive.incr(keyName) | ||
| // We should count all attempts, because Lettuce retransmits failed commands | ||
| .doFinally(value -> { | ||
| commandsSubmitted.incrementAndGet(); | ||
| log.info("Commands submitted {}", commandsSubmitted.get()); | ||
| }).onErrorResume(e -> { | ||
| capturedExceptions.add(e); | ||
| return Mono.empty(); | ||
| })).subscribe(); | ||
|
|
||
| // Wait for some commands to be executed before triggering the fault | ||
| Wait.untilTrue(() -> commandsSubmitted.get() > 10).waitOrTimeout(); | ||
|
|
||
| // Direct connection to the Redis instances used to trigger Primary Redis instance shutdown | ||
| // and verify that commands are executed on the secondary Redis instance | ||
| RedisClient directClient = RedisClient.create(); | ||
| RedisURI secondaryUri = RedisURI.create(endpoints.get(1)); | ||
| StatefulRedisConnection<String, String> secondary = directClient.connect(secondaryUri); | ||
|
|
||
| // Shutdown the current Redis instance to trigger failover | ||
| RedisURI currentEndpoint = connection.getCurrentEndpoint(); | ||
| log.info("Stoping Redis server [{}] to trigger failover", currentEndpoint); | ||
| shutdownRedisInstance(directClient, currentEndpoint); | ||
|
|
||
| // Wait for Commands to start being executed on the secondary Redis instances | ||
| log.info("Waiting for commands to be executed on [{}]", secondaryUri); | ||
| Wait.untilTrue(() -> (getMulitDbCounter(secondary)) > 20).during(Duration.ofSeconds(10)).waitOrTimeout(); | ||
ggivo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Stop the command execution | ||
| subscription.dispose(); | ||
|
|
||
| log.info("Commands submitted total: {}", commandsSubmitted.get()); | ||
| log.info("Commands executed on secondary [{}]: {}", secondaryUri, getMulitDbCounter(secondary)); | ||
| log.info("Captured exceptions: {}", capturedExceptions); | ||
|
|
||
| // Cleanup | ||
| directClient.shutdown(); | ||
| multiDbClient.shutdown(); | ||
| } | ||
|
|
||
| private static int getMulitDbCounter(StatefulRedisConnection<String, String> connection) { | ||
| String count = connection.sync().get("multidb-counter"); | ||
| return count == null ? 0 : Integer.parseInt(count); | ||
| } | ||
|
|
||
| private static void shutdownRedisInstance(RedisClient redis, RedisURI redisUri) { | ||
| log.info("Shutting down Redis instance [{}]", redisUri); | ||
| try (StatefulRedisConnection<String, String> redis1 = redis.connect(redisUri)) { | ||
| redis1.sync().shutdown(true); | ||
|
|
||
| Wait.untilTrue(() -> { | ||
| try { | ||
| redis.connect(redisUri); | ||
| } catch (Exception e) { | ||
| return e instanceof RedisConnectionException; | ||
| } | ||
| return false; | ||
| }).waitOrTimeout(); | ||
|
|
||
| log.info("Redis instance [{}] is down", redisUri); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @return list of DatabaseConfig instances | ||
| */ | ||
| private static List<DatabaseConfig> createDatabaseConfigs(List<String> endpoints) { | ||
| List<DatabaseConfig> configs = new ArrayList<>(); | ||
|
|
||
| CircuitBreaker.CircuitBreakerConfig circuitBreakerConfig = CircuitBreaker.CircuitBreakerConfig.builder() | ||
| .failureRateThreshold(10.0f).minimumNumberOfFailures(5).metricsWindowSize(5).build(); | ||
|
|
||
| // Create a DatabaseConfig for each endpoint | ||
| float weight = 1.0f; | ||
| for (String endpointUri : endpoints) { | ||
| configs.add(DatabaseConfig.builder(RedisURI.create(endpointUri)).weight(weight) | ||
| .circuitBreakerConfig(circuitBreakerConfig).healthCheckStrategySupplier(PingStrategy.DEFAULT).build()); | ||
|
|
||
| weight /= 2; // Decrease weight for subsequent endpoints | ||
| } | ||
|
|
||
| log.info("Created {} DatabaseConfig(s) from endpoint", configs.size()); | ||
| return configs; | ||
| } | ||
|
|
||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.