Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.14.2"
ThisBuild / version := "0.14.3"

ThisBuild / scalaVersion := scala213

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.slf4j.Logger
import java.io.Closeable
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import scala.language.reflectiveCalls
import scala.util.{Failure, Success, Try}

Expand All @@ -32,40 +33,26 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def

private val failures = new AtomicInteger(0)

/** Thread-safe client instance using double-checked locking pattern
* @volatile
* ensures visibility across threads
*/
@volatile private var client: Option[T] = None

/** Lock object for synchronized initialization
*/
private val lock = new Object()
private val ref = new AtomicReference[Option[T]](None)

/** Get or create Elastic Client instance (thread-safe, lazy initialization) Uses double-checked
* locking for optimal performance
/** Get or create Elastic Client instance (thread-safe) using atomic reference
*
* @return
* Elastic Client instance
* @throws IllegalStateException
* if client creation fails
*/
def apply(): T = {
// First check (no locking) - fast path for already initialized client
client match {
ref.get() match {
case Some(c) => c
case None =>
// Second check with lock - slow path for initialization
lock.synchronized {
client match {
case Some(c) =>
c // Another thread initialized while we were waiting
case None =>
val c = createClient()
client = Some(c)
logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}")
c
}
case None =>
val c = createClient()
if (ref.compareAndSet(None, Some(c))) {
logger.info(s"Elasticsearch Client initialized for ${elasticConfig.credentials.url}")
c
} else {
// Another thread initialized while we were waiting
ref.get().get
}
}
}
Expand Down Expand Up @@ -141,22 +128,20 @@ trait ElasticClientCompanion[T <: Closeable] extends ClientCompanion { _: { def

/** Check if client is initialized and connected
*/
override def isInitialized: Boolean = client.isDefined
override def isInitialized: Boolean = ref.get().isDefined

/** Close the client and release resources Idempotent - safe to call multiple times
*/
override def close(): Unit = {
lock.synchronized {
client.foreach { c =>
Try {
c.close()
logger.info("Elasticsearch Client closed successfully")
}.recover { case ex: Exception =>
logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex)
}
client = None
ref.get().foreach { c =>
Try {
c.close()
logger.info("Elasticsearch Client closed successfully")
}.recover { case ex: Exception =>
logger.warn(s"Error closing Elasticsearch Client: ${ex.getMessage}", ex)
}
}
ref.set(None)
}

/** Reset client (force reconnection on next access) Useful for connection recovery scenarios
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,31 @@ package app.softnetwork.elastic.client

import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}

import java.util.concurrent.atomic.AtomicReference

trait VersionApi extends ElasticClientHelpers { _: SerializationApi =>

// ========================================================================
// PUBLIC METHODS
// ========================================================================

// Cache ES version (avoids calling it every time)
@volatile private var cachedVersion: Option[String] = None
private val cachedVersion = new AtomicReference[Option[String]](None)

/** Get Elasticsearch version.
* @return
* the Elasticsearch version
*/
def version: ElasticResult[String] = {
cachedVersion match {
cachedVersion.get match {
case Some(version) =>
ElasticSuccess(version)
case None =>
executeVersion() match {
case success @ ElasticSuccess(version) =>
case ElasticSuccess(version) =>
logger.info(s"✅ Elasticsearch version: $version")
cachedVersion = Some(version)
success
cachedVersion.compareAndSet(None, Some(version))
ElasticSuccess(cachedVersion.get.getOrElse(version))
case failure @ ElasticFailure(error) =>
logger.error(s"❌ Failed to get Elasticsearch version: ${error.message}")
failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder}
import org.slf4j.{Logger, LoggerFactory}

import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Future, Promise}

trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] {

val logger: Logger = LoggerFactory getLogger getClass.getName

@volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None

/** Lock object for synchronized initialization
*/
private val lock = new Object()
private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None)

lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions

def async(): ElasticsearchAsyncClient = {
// First check (no locking) - fast path for already initialized client
asyncClient match {
asyncRef.get() match {
case Some(c) => c
case None =>
// Second check with lock - slow path for initialization
lock.synchronized {
asyncClient match {
case Some(c) =>
c // Another thread initialized while we were waiting
case None =>
val c = createAsyncClient()
asyncClient = Some(c)
logger.info(
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
)
c
}
case None =>
val c = createAsyncClient()
if (asyncRef.compareAndSet(None, Some(c))) {
logger.info(
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
)
c
} else {
// Another thread initialized while we were waiting
asyncRef.get().get
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,30 @@ import org.elasticsearch.client.{RestClient, RestClientBuilder}
import org.slf4j.{Logger, LoggerFactory}

import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Future, Promise}

trait JavaClientCompanion extends ElasticClientCompanion[ElasticsearchClient] {

val logger: Logger = LoggerFactory getLogger getClass.getName

@volatile private var asyncClient: Option[ElasticsearchAsyncClient] = None

/** Lock object for synchronized initialization
*/
private val lock = new Object()
private val asyncRef = new AtomicReference[Option[ElasticsearchAsyncClient]](None)

lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions

def async(): ElasticsearchAsyncClient = {
// First check (no locking) - fast path for already initialized client
asyncClient match {
asyncRef.get() match {
case Some(c) => c
case None =>
// Second check with lock - slow path for initialization
lock.synchronized {
asyncClient match {
case Some(c) =>
c // Another thread initialized while we were waiting
case None =>
val c = createAsyncClient()
asyncClient = Some(c)
logger.info(
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
)
c
}
case None =>
val c = createAsyncClient()
if (asyncRef.compareAndSet(None, Some(c))) {
logger.info(
s"Elasticsearch async Client initialized for ${elasticConfig.credentials.url}"
)
c
} else {
// Another thread initialized while we were waiting
asyncRef.get().get
}
}
}
Expand Down