From 7d5878007b0435e768f2b4737f728aa8fedeef57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Tue, 9 Dec 2025 11:33:54 +0100 Subject: [PATCH 1/2] update elastic client companion using atomic reference --- build.sbt | 2 +- .../client/ElasticClientCompanion.scala | 55 +++++++------------ .../client/java/JavaClientCompanion.scala | 34 +++++------- .../client/java/JavaClientCompanion.scala | 34 +++++------- 4 files changed, 47 insertions(+), 78 deletions(-) diff --git a/build.sbt b/build.sbt index 359dc2b7..056a2bd6 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork" name := "softclient4es" -ThisBuild / version := "0.14.2" +ThisBuild / version := "0.14.3" ThisBuild / scalaVersion := scala213 diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientCompanion.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientCompanion.scala index cedea059..c89536c5 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientCompanion.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientCompanion.scala @@ -23,6 +23,7 @@ import org.slf4j.Logger import java.io.Closeable import java.net.URI import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference import scala.language.reflectiveCalls import scala.util.{Failure, Success, Try} @@ -32,18 +33,9 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def private val failures = new AtomicInteger(0) - /** Thread-safe client instance using double-checked locking pattern - * @volatile - * ensures visibility across threads - */ - @volatile private var client: Option[T] = None - - /** Lock object for synchronized initialization - */ - private val lock = new Object() + private val ref = new AtomicReference[Option[T]](None) - /** Get or create Elastic Client instance (thread-safe, lazy initialization) Uses double-checked - * locking for optimal performance + /** Get or create Elastic Client instance (thread-safe) using atomic reference * * @return * Elastic Client instance @@ -51,21 +43,16 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def * if client creation fails */ def apply(): T = { - // First check (no locking) - fast path for already initialized client - client match { + ref.get() match { case Some(c) => c - case None => - // Second check with lock - slow path for initialization - lock.synchronized { - client match { - case Some(c) => - c // Another thread initialized while we were waiting - case None => - val c = createClient() - client = Some(c) - logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}") - c - } + case None => + val c = createClient() + if (ref.compareAndSet(None, Some(c))) { + logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}") + c + } else { + // Another thread initialized while we were waiting + ref.get().get } } } @@ -141,22 +128,20 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def /** Check if client is initialized and connected */ - override def isInitialized: Boolean = client.isDefined + override def isInitialized: Boolean = ref.get().isDefined /** Close the client and release resources Idempotent - safe to call multiple times */ override def close(): Unit = { - lock.synchronized { - client.foreach { c => - Try { - c.close() - logger.info("Elasticsearch Client closed successfully") - }.recover { case ex: Exception => - logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex) - } - client = None + ref.get().foreach { c => + Try { + c.close() + logger.info("Elasticsearch Client closed successfully") + }.recover { case ex: Exception => + logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex) } } + ref.set(None) } /** Reset client (force reconnection on next access) Useful for connection recovery scenarios diff --git a/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala b/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala index 0d733502..e1841ff2 100644 --- a/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala +++ b/es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala @@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder} import org.slf4j.{Logger, LoggerFactory} import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.{Future, Promise} trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] { val logger: Logger = LoggerFactory getLogger getClass.getName - @volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None - - /** Lock object for synchronized initialization - */ - private val lock = new Object() + private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None) lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions def async(): ElasticsearchAsyncClient = { - // First check (no locking) - fast path for already initialized client - asyncClient match { + asyncRef.get() match { case Some(c) => c - case None => - // Second check with lock - slow path for initialization - lock.synchronized { - asyncClient match { - case Some(c) => - c // Another thread initialized while we were waiting - case None => - val c = createAsyncClient() - asyncClient = Some(c) - logger.info( - s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}" - ) - c - } + case None => + val c = createAsyncClient() + if (asyncRef.compareAndSet(None, Some(c))) { + logger.info( + s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}" + ) + c + } else { + // Another thread initialized while we were waiting + asyncRef.get().get } } } diff --git a/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala b/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala index 0d733502..e1841ff2 100644 --- a/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala +++ b/es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala @@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder} import org.slf4j.{Logger, LoggerFactory} import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.{Future, Promise} trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] { val logger: Logger = LoggerFactory getLogger getClass.getName - @volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None - - /** Lock object for synchronized initialization - */ - private val lock = new Object() + private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None) lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions def async(): ElasticsearchAsyncClient = { - // First check (no locking) - fast path for already initialized client - asyncClient match { + asyncRef.get() match { case Some(c) => c - case None => - // Second check with lock - slow path for initialization - lock.synchronized { - asyncClient match { - case Some(c) => - c // Another thread initialized while we were waiting - case None => - val c = createAsyncClient() - asyncClient = Some(c) - logger.info( - s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}" - ) - c - } + case None => + val c = createAsyncClient() + if (asyncRef.compareAndSet(None, Some(c))) { + logger.info( + s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}" + ) + c + } else { + // Another thread initialized while we were waiting + asyncRef.get().get } } } From e4a3e900118e72e3f525fd8c12c13837117eedc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Tue, 9 Dec 2025 11:47:11 +0100 Subject: [PATCH 2/2] update version api cache using atomic reference --- .../app/softnetwork/elastic/client/VersionApi.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/VersionApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/VersionApi.scala index 2526c93c..c54b5261 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/VersionApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/VersionApi.scala @@ -18,6 +18,8 @@ package app.softnetwork.elastic.client import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess} +import java.util.concurrent.atomic.AtomicReference + trait VersionApi extends ElasticClientHelpers { _: SerializationApi => // ======================================================================== @@ -25,22 +27,22 @@ trait VersionApi extends ElasticClientHelpers { _: SerializationApi => // ======================================================================== // Cache ES version (avoids calling it every time) - @volatile private var cachedVersion: Option[String] = None + private val cachedVersion = new AtomicReference[Option[String]](None) /** Get Elasticsearch version. * @return * the Elasticsearch version */ def version: ElasticResult[String] = { - cachedVersion match { + cachedVersion.get match { case Some(version) => ElasticSuccess(version) case None => executeVersion() match { - case success @ ElasticSuccess(version) => + case ElasticSuccess(version) => logger.info(s"✅ Elasticsearch version: $version") - cachedVersion = Some(version) - success + cachedVersion.compareAndSet(None, Some(version)) + ElasticSuccess(cachedVersion.get.getOrElse(version)) case failure @ ElasticFailure(error) => logger.error(s"❌ Failed to get Elasticsearch version: ${error.message}") failure