Skip to content

Commit 5f3060f

Browse files
authored
[automatic-failover] Implement weighted endpoint selection (#3519)
* - drop RedisDatabaseConfig - add HealthStatus * - set default healthStatus.Healthy * - review from @ggivo, improve readability * review from @tishun - rename predicate function - add javadoc * - format * - fix java docs * - format
1 parent e49708b commit 5f3060f

File tree

4 files changed

+90
-37
lines changed

4 files changed

+90
-37
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2011-Present, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*
7+
* This file contains contributions from third-party contributors
8+
* licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* https://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
package io.lettuce.core.failover;
21+
22+
/**
23+
* Enumeration representing the health status of a database endpoint as determined by the health check service.
24+
* <p>
25+
* This status reflects the results from active health checks performed by the health check service. Health checks periodically
26+
* probes endpoints to verify their availability and responsiveness, updating this status accordingly.
27+
* <p>
28+
* Used by the {@link RedisDatabase} to track and report health check results from the health check API.
29+
*
30+
* @author Ali Takavci
31+
* @since 7.1
32+
*/
33+
public enum HealthStatus {
34+
35+
/**
36+
* Health status is unknown. This is the initial state before any health checks have been performed by the health check
37+
* service.
38+
*/
39+
UNKNOWN,
40+
41+
/**
42+
* The endpoint is healthy and operating normally according to health check probes. Health checks are passing and the
43+
* endpoint is responding as expected.
44+
*/
45+
HEALTHY,
46+
47+
/**
48+
* The endpoint is unhealthy according to health check probes. Health checks are failing, indicating the endpoint is not
49+
* responding or not meeting health criteria.
50+
*/
51+
UNHEALTHY
52+
53+
}

src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,7 @@ private <K, V> RedisDatabase<StatefulRedisConnection<K, V>> createRedisDatabase(
9696
RedisURI uri = config.getRedisURI();
9797
StatefulRedisConnection<K, V> connection = connect(codec, uri);
9898
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
99-
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(
100-
new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection,
101-
databaseEndpoint);
99+
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);
102100

103101
return database;
104102
}
@@ -138,9 +136,7 @@ private <K, V> RedisDatabase<StatefulRedisPubSubConnection<K, V>> createRedisDat
138136
RedisURI uri = config.getRedisURI();
139137
StatefulRedisPubSubConnection<K, V> connection = connectPubSub(codec, uri);
140138
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
141-
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(
142-
new RedisDatabase.RedisDatabaseConfig(uri, config.getWeight(), config.getCircuitBreakerConfig()), connection,
143-
databaseEndpoint);
139+
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);
144140
return database;
145141
}
146142

src/main/java/io/lettuce/core/failover/RedisDatabase.java

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import io.lettuce.core.RedisURI;
66
import io.lettuce.core.api.StatefulRedisConnection;
7-
import io.lettuce.core.failover.CircuitBreaker.CircuitBreakerConfig;
87

98
/**
109
* Represents a Redis database with a weight and a connection.
@@ -26,12 +25,14 @@ public class RedisDatabase<C extends StatefulRedisConnection<?, ?>> implements C
2625

2726
private final CircuitBreaker circuitBreaker;
2827

29-
public RedisDatabase(RedisDatabaseConfig config, C connection, DatabaseEndpoint databaseEndpoint) {
30-
this.redisURI = config.redisURI;
31-
this.weight = config.weight;
28+
private HealthStatus healthStatus = HealthStatus.HEALTHY;
29+
30+
public RedisDatabase(DatabaseConfig config, C connection, DatabaseEndpoint databaseEndpoint) {
31+
this.redisURI = config.getRedisURI();
32+
this.weight = config.getWeight();
3233
this.connection = connection;
3334
this.databaseEndpoint = databaseEndpoint;
34-
this.circuitBreaker = new CircuitBreaker(config.circuitBreakerConfig);
35+
this.circuitBreaker = new CircuitBreaker(config.getCircuitBreakerConfig());
3536
databaseEndpoint.setCircuitBreaker(circuitBreaker);
3637
}
3738

@@ -61,26 +62,8 @@ public void close() {
6162
circuitBreaker.close();
6263
}
6364

64-
public static class RedisDatabaseConfig {
65-
66-
private RedisURI redisURI;
67-
68-
private float weight;
69-
70-
private CircuitBreakerConfig circuitBreakerConfig;
71-
72-
public RedisDatabaseConfig(RedisURI redisURI, float weight) {
73-
this.redisURI = redisURI;
74-
this.weight = weight;
75-
this.circuitBreakerConfig = CircuitBreakerConfig.DEFAULT;
76-
}
77-
78-
public RedisDatabaseConfig(RedisURI redisURI, float weight, CircuitBreakerConfig circuitBreakerConfig) {
79-
this.redisURI = redisURI;
80-
this.weight = weight;
81-
this.circuitBreakerConfig = circuitBreakerConfig;
82-
}
83-
65+
public HealthStatus getHealthStatus() {
66+
return healthStatus;
8467
}
8568

8669
}

src/main/java/io/lettuce/core/failover/StatefulRedisMultiDbConnectionImpl.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.locks.Lock;
1212
import java.util.concurrent.locks.ReadWriteLock;
1313
import java.util.concurrent.locks.ReentrantReadWriteLock;
14+
import java.util.function.Predicate;
1415
import java.util.function.Supplier;
1516

1617
import io.lettuce.core.AbstractRedisClient;
@@ -46,6 +47,7 @@ public class StatefulRedisMultiDbConnectionImpl<C extends StatefulRedisConnectio
4647

4748
protected final Map<RedisURI, RedisDatabase<C>> databases;
4849

50+
// this should not be null ever after succesfull initialization
4951
protected RedisDatabase<C> current;
5052

5153
protected final RedisCommands<K, V> sync;
@@ -79,7 +81,11 @@ public StatefulRedisMultiDbConnectionImpl(Map<RedisURI, RedisDatabase<C>> connec
7981
this.codec = codec;
8082
this.parser = parser;
8183
this.connectionFactory = connectionFactory;
82-
this.current = connections.values().stream().max(Comparator.comparingDouble(RedisDatabase::getWeight)).get();
84+
// TODO: Current implementation forces all database connections to be created and established (at least once before this
85+
// constructor called).
86+
// This is suboptimal and should be replaced with a logic that uses async connection creation and state management,
87+
// which safely starts with at least one healthy connection.
88+
this.current = getNextHealthyDatabase(null);
8389

8490
this.async = newRedisAsyncCommandsImpl();
8591
this.sync = newRedisSyncCommandsImpl();
@@ -95,7 +101,7 @@ private void onCircuitBreakerStateChange(CircuitBreakerStateChangeEvent event) {
95101
}
96102

97103
private void failoverFrom(RedisDatabase<C> fromDb) {
98-
RedisDatabase<C> healthyDatabase = getHealthyDatabase(fromDb);
104+
RedisDatabase<C> healthyDatabase = getNextHealthyDatabase(fromDb);
99105
if (healthyDatabase != null) {
100106
switchToDatabase(healthyDatabase.getRedisURI());
101107
} else {
@@ -104,10 +110,25 @@ private void failoverFrom(RedisDatabase<C> fromDb) {
104110
}
105111
}
106112

107-
private RedisDatabase<C> getHealthyDatabase(RedisDatabase<C> current) {
108-
return databases.values().stream().filter(db -> db != current)
109-
.filter(db -> db.getCircuitBreaker().getCurrentState() == CircuitBreaker.State.CLOSED)
110-
.max(Comparator.comparingDouble(RedisDatabase::getWeight)).get();
113+
private RedisDatabase<C> getNextHealthyDatabase(RedisDatabase<C> dbToExclude) {
114+
return databases.values().stream().filter(DatabasePredicates.isHealthy).filter(DatabasePredicates.isNot(dbToExclude))
115+
.max(DatabaseComparators.byWeight).orElse(null);
116+
}
117+
118+
static class DatabaseComparators {
119+
120+
public static final Comparator<RedisDatabase<?>> byWeight = Comparator.comparingDouble(RedisDatabase::getWeight);
121+
122+
}
123+
124+
static class DatabasePredicates {
125+
126+
public static final Predicate<RedisDatabase<?>> isHealthy = db -> db.getHealthStatus() == HealthStatus.HEALTHY;
127+
128+
public static Predicate<RedisDatabase<?>> isNot(RedisDatabase<?> dbInstance) {
129+
return db -> !db.equals(dbInstance);
130+
}
131+
111132
}
112133

113134
@Override

0 commit comments

Comments
 (0)