Skip to content

Commit 7d58780

Browse files
committed
update elastic client companion using atomic reference
1 parent d6eafbe commit 7d58780

File tree

4 files changed

+47
-78
lines changed

4 files changed

+47
-78
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"
1919

2020
name := "softclient4es"
2121

22-
ThisBuild / version := "0.14.2"
22+
ThisBuild / version := "0.14.3"
2323

2424
ThisBuild / scalaVersion := scala213
2525

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientCompanion.scala

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.slf4j.Logger
2323
import java.io.Closeable
2424
import java.net.URI
2525
import java.util.concurrent.atomic.AtomicInteger
26+
import java.util.concurrent.atomic.AtomicReference
2627
import scala.language.reflectiveCalls
2728
import scala.util.{Failure, Success, Try}
2829

@@ -32,40 +33,26 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def
3233

3334
private val failures = new AtomicInteger(0)
3435

35-
/** Thread-safe client instance using double-checked locking pattern
36-
* @volatile
37-
* ensures visibility across threads
38-
*/
39-
@volatile private var client: Option[T] = None
40-
41-
/** Lock object for synchronized initialization
42-
*/
43-
private val lock = new Object()
36+
private val ref = new AtomicReference[Option[T]](None)
4437

45-
/** Get or create Elastic Client instance (thread-safe, lazy initialization) Uses double-checked
46-
* locking for optimal performance
38+
/** Get or create Elastic Client instance (thread-safe) using atomic reference
4739
*
4840
* @return
4941
* Elastic Client instance
5042
* @throws IllegalStateException
5143
* if client creation fails
5244
*/
5345
def apply(): T = {
54-
// First check (no locking) - fast path for already initialized client
55-
client match {
46+
ref.get() match {
5647
case Some(c) => c
57-
case None =>
58-
// Second check with lock - slow path for initialization
59-
lock.synchronized {
60-
client match {
61-
case Some(c) =>
62-
c // Another thread initialized while we were waiting
63-
case None =>
64-
val c = createClient()
65-
client = Some(c)
66-
logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}")
67-
c
68-
}
48+
case None =>
49+
val c = createClient()
50+
if (ref.compareAndSet(None, Some(c))) {
51+
logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}")
52+
c
53+
} else {
54+
// Another thread initialized while we were waiting
55+
ref.get().get
6956
}
7057
}
7158
}
@@ -141,22 +128,20 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def
141128

142129
/** Check if client is initialized and connected
143130
*/
144-
override def isInitialized: Boolean = client.isDefined
131+
override def isInitialized: Boolean = ref.get().isDefined
145132

146133
/** Close the client and release resources Idempotent - safe to call multiple times
147134
*/
148135
override def close(): Unit = {
149-
lock.synchronized {
150-
client.foreach { c =>
151-
Try {
152-
c.close()
153-
logger.info("Elasticsearch Client closed successfully")
154-
}.recover { case ex: Exception =>
155-
logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex)
156-
}
157-
client = None
136+
ref.get().foreach { c =>
137+
Try {
138+
c.close()
139+
logger.info("Elasticsearch Client closed successfully")
140+
}.recover { case ex: Exception =>
141+
logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex)
158142
}
159143
}
144+
ref.set(None)
160145
}
161146

162147
/** Reset client (force reconnection on next access) Useful for connection recovery scenarios

es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder}
2929
import org.slf4j.{Logger, LoggerFactory}
3030

3131
import java.util.concurrent.CompletableFuture
32+
import java.util.concurrent.atomic.AtomicReference
3233
import scala.concurrent.{Future, Promise}
3334

3435
trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] {
3536

3637
val logger: Logger = LoggerFactory getLogger getClass.getName
3738

38-
@volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None
39-
40-
/** Lock object for synchronized initialization
41-
*/
42-
private val lock = new Object()
39+
private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None)
4340

4441
lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions
4542

4643
def async(): ElasticsearchAsyncClient = {
47-
// First check (no locking) - fast path for already initialized client
48-
asyncClient match {
44+
asyncRef.get() match {
4945
case Some(c) => c
50-
case None =>
51-
// Second check with lock - slow path for initialization
52-
lock.synchronized {
53-
asyncClient match {
54-
case Some(c) =>
55-
c // Another thread initialized while we were waiting
56-
case None =>
57-
val c = createAsyncClient()
58-
asyncClient = Some(c)
59-
logger.info(
60-
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
61-
)
62-
c
63-
}
46+
case None =>
47+
val c = createAsyncClient()
48+
if (asyncRef.compareAndSet(None, Some(c))) {
49+
logger.info(
50+
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
51+
)
52+
c
53+
} else {
54+
// Another thread initialized while we were waiting
55+
asyncRef.get().get
6456
}
6557
}
6658
}

es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientCompanion.scala

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder}
2929
import org.slf4j.{Logger, LoggerFactory}
3030

3131
import java.util.concurrent.CompletableFuture
32+
import java.util.concurrent.atomic.AtomicReference
3233
import scala.concurrent.{Future, Promise}
3334

3435
trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] {
3536

3637
val logger: Logger = LoggerFactory getLogger getClass.getName
3738

38-
@volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None
39-
40-
/** Lock object for synchronized initialization
41-
*/
42-
private val lock = new Object()
39+
private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None)
4340

4441
lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions
4542

4643
def async(): ElasticsearchAsyncClient = {
47-
// First check (no locking) - fast path for already initialized client
48-
asyncClient match {
44+
asyncRef.get() match {
4945
case Some(c) => c
50-
case None =>
51-
// Second check with lock - slow path for initialization
52-
lock.synchronized {
53-
asyncClient match {
54-
case Some(c) =>
55-
c // Another thread initialized while we were waiting
56-
case None =>
57-
val c = createAsyncClient()
58-
asyncClient = Some(c)
59-
logger.info(
60-
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
61-
)
62-
c
63-
}
46+
case None =>
47+
val c = createAsyncClient()
48+
if (asyncRef.compareAndSet(None, Some(c))) {
49+
logger.info(
50+
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
51+
)
52+
c
53+
} else {
54+
// Another thread initialized while we were waiting
55+
asyncRef.get().get
6456
}
6557
}
6658
}

0 commit comments

Comments
 (0)