88import java .util .Set ;
99import java .util .concurrent .CompletableFuture ;
1010import java .util .concurrent .ConcurrentHashMap ;
11+ import java .util .concurrent .locks .Lock ;
12+ import java .util .concurrent .locks .ReadWriteLock ;
13+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
1114import java .util .function .Supplier ;
1215
1316import io .lettuce .core .AbstractRedisClient ;
@@ -61,6 +64,12 @@ public class StatefulRedisMultiDbConnectionImpl<C extends StatefulRedisConnectio
6164
6265 protected final DatabaseConnectionFactory <C , K , V > connectionFactory ;
6366
67+ private final ReadWriteLock multiDbLock = new ReentrantReadWriteLock ();
68+
69+ private final Lock readLock = multiDbLock .readLock ();
70+
71+ private final Lock writeLock = multiDbLock .writeLock ();
72+
6473 public StatefulRedisMultiDbConnectionImpl (Map <RedisURI , RedisDatabase <C >> connections , ClientResources resources ,
6574 RedisCodec <K , V > codec , Supplier <JsonParser > parser , DatabaseConnectionFactory <C , K , V > connectionFactory ) {
6675 if (connections == null || connections .isEmpty ()) {
@@ -151,14 +160,18 @@ public RedisCommands<K, V> sync() {
151160
152161 @ Override
153162 public void addListener (RedisConnectionStateListener listener ) {
154- connectionStateListeners .add (listener );
155- current .getConnection ().addListener (listener );
163+ doBySharedLock (() -> {
164+ connectionStateListeners .add (listener );
165+ current .getConnection ().addListener (listener );
166+ });
156167 }
157168
158169 @ Override
159170 public void removeListener (RedisConnectionStateListener listener ) {
160- connectionStateListeners .remove (listener );
161- current .getConnection ().removeListener (listener );
171+ doBySharedLock (() -> {
172+ connectionStateListeners .remove (listener );
173+ current .getConnection ().removeListener (listener );
174+ });
162175 }
163176
164177 @ Override
@@ -224,14 +237,18 @@ public boolean isMulti() {
224237
225238 @ Override
226239 public void addListener (PushListener listener ) {
227- pushListeners .add (listener );
228- current .getConnection ().addListener (listener );
240+ doBySharedLock (() -> {
241+ pushListeners .add (listener );
242+ current .getConnection ().addListener (listener );
243+ });
229244 }
230245
231246 @ Override
232247 public void removeListener (PushListener listener ) {
233- pushListeners .remove (listener );
234- current .getConnection ().removeListener (listener );
248+ doBySharedLock (() -> {
249+ pushListeners .remove (listener );
250+ current .getConnection ().removeListener (listener );
251+ });
235252 }
236253
237254 @ Override
@@ -251,21 +268,45 @@ public Iterable<RedisURI> getEndpoints() {
251268
252269 @ Override
253270 public void switchToDatabase (RedisURI redisURI ) {
254- RedisDatabase <C > fromDb = current ;
255- RedisDatabase <C > toDb = databases .get (redisURI );
256- if (fromDb == null || toDb == null ) {
257- throw new UnsupportedOperationException ("Cannot initiate switch without a current and target database!" );
258- }
259- current = toDb ;
260- connectionStateListeners .forEach (listener -> {
261- toDb .getConnection ().addListener (listener );
262- fromDb .getConnection ().removeListener (listener );
263- });
264- pushListeners .forEach (listener -> {
265- toDb .getConnection ().addListener (listener );
266- fromDb .getConnection ().removeListener (listener );
271+ doByExclusiveLock (() -> {
272+ RedisDatabase <C > fromDb = current ;
273+ RedisDatabase <C > toDb = databases .get (redisURI );
274+ if (fromDb == null || toDb == null ) {
275+ throw new UnsupportedOperationException (
276+ "Unable to switch between endpoints - the driver was not able to locate the source or destination endpoint." );
277+ }
278+ if (fromDb .equals (toDb )) {
279+ return ;
280+ }
281+ current = toDb ;
282+ connectionStateListeners .forEach (listener -> {
283+ toDb .getConnection ().addListener (listener );
284+ fromDb .getConnection ().removeListener (listener );
285+ });
286+ pushListeners .forEach (listener -> {
287+ toDb .getConnection ().addListener (listener );
288+ fromDb .getConnection ().removeListener (listener );
289+ });
290+ fromDb .getDatabaseEndpoint ().handOverCommandQueue (toDb .getDatabaseEndpoint ());
267291 });
268- fromDb .getDatabaseEndpoint ().handOverCommandQueue (toDb .getDatabaseEndpoint ());
292+ }
293+
294+ protected void doBySharedLock (Runnable operation ) {
295+ readLock .lock ();
296+ try {
297+ operation .run ();
298+ } finally {
299+ readLock .unlock ();
300+ }
301+ }
302+
303+ protected void doByExclusiveLock (Runnable operation ) {
304+ writeLock .lock ();
305+ try {
306+ operation .run ();
307+ } finally {
308+ writeLock .unlock ();
309+ }
269310 }
270311
271312 @ Override
@@ -294,39 +335,44 @@ public void addDatabase(DatabaseConfig databaseConfig) {
294335 }
295336
296337 RedisURI redisURI = databaseConfig .getRedisURI ();
297- if (databases .containsKey (redisURI )) {
298- throw new IllegalArgumentException ("Database already exists: " + redisURI );
299- }
300338
301- // Create new database connection using the factory
302- RedisDatabase <C > database = connectionFactory .createDatabase (databaseConfig , codec );
339+ doByExclusiveLock (() -> {
340+ if (databases .containsKey (redisURI )) {
341+ throw new IllegalArgumentException ("Database already exists: " + redisURI );
342+ }
343+
344+ // Create new database connection using the factory
345+ RedisDatabase <C > database = connectionFactory .createDatabase (databaseConfig , codec );
303346
304- // Add listeners to the new connection if it's the current one
305- // (though it won't be current initially since we're just adding it)
306- databases .put (redisURI , database );
347+ // Add listeners to the new connection if it's the current one
348+ // (though it won't be current initially since we're just adding it)
349+ databases .put (redisURI , database );
350+
351+ database .getCircuitBreaker ().addListener (this ::onCircuitBreakerStateChange );
352+ });
307353
308- database .getCircuitBreaker ().addListener (this ::onCircuitBreakerStateChange );
309354 }
310355
311356 @ Override
312357 public void removeDatabase (RedisURI redisURI ) {
313358 if (redisURI == null ) {
314359 throw new IllegalArgumentException ("RedisURI must not be null" );
315360 }
316-
317- RedisDatabase <C > database = databases .get (redisURI );
318- if (database == null ) {
319- throw new IllegalArgumentException ("Database not found: " + redisURI );
320- }
321-
322- if (current .getRedisURI ().equals (redisURI )) {
323- throw new UnsupportedOperationException ("Cannot remove the currently active database: " + redisURI );
324- }
325-
326- // Remove the database and close its connection
327- databases .remove (redisURI );
328- database .getConnection ().close ();
329- database .getCircuitBreaker ().removeListener (this ::onCircuitBreakerStateChange );
361+ doByExclusiveLock (() -> {
362+ RedisDatabase <C > database = null ;
363+ database = databases .get (redisURI );
364+ if (database == null ) {
365+ throw new IllegalArgumentException ("Database not found: " + redisURI );
366+ }
367+
368+ if (current .getRedisURI ().equals (redisURI )) {
369+ throw new UnsupportedOperationException ("Cannot remove the currently active database: " + redisURI );
370+ }
371+
372+ // Remove the database and close its connection
373+ databases .remove (redisURI );
374+ database .close ();
375+ });
330376 }
331377
332378}
0 commit comments