Skip to content
76 changes: 61 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1201,25 +1201,71 @@ The **BulkApi** leverages Akka Streams for efficient, backpressure-aware bulk op
#### **Features**

✅ **Streaming Architecture**: Process millions of documents without memory issues
✅ **Multiple data sources**: In-memory, File-based (JSON, JSON Array, Parquet, Delta Lake)
✅ **Backpressure Handling**: Automatic flow control based on Elasticsearch capacity
✅ **Error Recovery**: Configurable retry strategies
✅ **Batching**: Automatic batching for optimal performance
✅ **Parallel Processing**: Concurrent bulk requests with configurable parallelism

**Example:**
#### Data Sources

```scala
import akka.stream.scaladsl._
| Source Type | Format | Description |
|------------------|---------------|---------------------------------------|
| **In-Memory** | Scala objects | Direct streaming from collections |
| **JSON** | Text | Newline-delimited JSON (NDJSON) |
| **JSON Array** | Text | JSON array with nested structures |
| **Parquet** | Binary | Columnar storage format |
| **Delta Lake** | Directory | ACID transactional data lake |

#### Operation Types

| Operation | Action | Behavior |
|------------|----------------|-------------------------------------------|
| **INDEX** | Insert/Replace | Creates or replaces entire document |
| **UPDATE** | Upsert | Updates existing or creates new (partial) |
| **DELETE** | Remove | Deletes document by ID |

// Stream-based bulk indexing
val documents: List[String] = List(
"""{"id":"user-1","name":"Alice","age":30}""",
"""{"id":"user-2","name":"Bob","age":25}""",
"""{"id":"user-3","name":"Charlie","age":35}"""
**Examples:**

```scala
// High-performance file indexing
implicit val options: BulkOptions = BulkOptions(
defaultIndex = "products",
maxBulkSize = 10000,
balance = 16,
disableRefresh = true
)

implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "users")
client.bulkSource(Source.fromIterator(() => documents), identity, indexKey=Some("id"))
implicit val hadoopConf: Configuration = new Configuration()

// Load from Parquet
client.bulkFromFile(
filePath = "/data/products.parquet",
format = Parquet,
idKey = Some("id")
).foreach { result =>
result.indices.foreach(client.refresh)
println(s"Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec")
}

// Load from Delta Lake
client.bulkFromFile(
filePath = "/data/delta-products",
format = Delta,
idKey = Some("id"),
update = Some(true)
).foreach { result =>
println(s"Updated ${result.successCount} products from Delta Lake")
}

// Load JSON Array with nested objects
client.bulkFromFile(
filePath = "/data/persons.json",
format = JsonArray,
idKey = Some("uuid")
).foreach { result =>
println(s"Indexed ${result.successCount} persons with nested structures")
}
```

---
Expand Down Expand Up @@ -1471,18 +1517,18 @@ ThisBuild / resolvers ++= Seq(

// For Elasticsearch 6
// Using Jest client
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.0
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.1
// Or using Rest High Level client
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.0
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.1

// For Elasticsearch 7
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.0
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.1

// For Elasticsearch 8
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.0
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.1

// For Elasticsearch 9
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.0
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.1
```

### **Quick Example**
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.14.0"
ThisBuild / version := "0.14.1"

ThisBuild / scalaVersion := scala213

Expand Down
15 changes: 11 additions & 4 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ val mockito = Seq(
"org.mockito" %% "mockito-scala" % "1.17.12" % Test
)

libraryDependencies ++= akka ++ typesafeConfig ++ http ++
json4s ++ mockito :+ "com.google.code.gson" % "gson" % Versions.gson :+
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
"org.scalatest" %% "scalatest" % Versions.scalatest % Test
// Parquet & Avro
val avro = Seq(
"org.apache.parquet" % "parquet-avro" % "1.15.2" excludeAll (excludeSlf4jAndLog4j *),
"org.apache.avro" % "avro" % "1.11.4" excludeAll (excludeSlf4jAndLog4j *),
"org.apache.hadoop" % "hadoop-client" % "3.4.2" excludeAll (excludeSlf4jAndLog4j *)
)

libraryDependencies ++= akka ++ typesafeConfig ++ http ++
json4s ++ mockito ++ avro :+ "com.google.code.gson" % "gson" % Versions.gson :+
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
"io.delta" %% "delta-standalone" % "0.6.0" :+
"org.scalatest" %% "scalatest" % Versions.scalatest % Test
140 changes: 140 additions & 0 deletions core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import akka.actor.ActorSystem
import akka.stream.{FlowShape, Materializer}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
import app.softnetwork.elastic.client.bulk._
import app.softnetwork.elastic.client.file._
import app.softnetwork.elastic.client.result.{ElasticResult, ElasticSuccess}

import org.apache.hadoop.conf.Configuration

import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse

Expand All @@ -41,6 +45,142 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
// PUBLIC METHODS
// ========================================================================

/** Bulk from a Parquet or JSON file with automatic detection
*
* @param filePath
* path to the file (.parquet, .json, .jsonl)
* @param indexKey
* JSON key to extract the index
* @param idKey
* JSON key to extract the ID
* @param suffixDateKey
* JSON key to append a date to the index
* @param suffixDatePattern
* date formatting pattern
* @param update
* true for upsert, false for index
* @param delete
* true for delete
* @param parentIdKey
* JSON key for the parent
* @param callbacks
* callbacks for events
* @param bufferSize
* read buffer size
* @param validateJson
* validate each JSON line
* @param skipInvalid
* ignore invalid JSON lines
* @param format
* file format (auto-detection if Unknown)
* @param hadoopConf
* custom Hadoop configuration
* @param bulkOptions
* configuration options
* @return
* Future with the detailed result
*/
def bulkFromFile(
filePath: String,
indexKey: Option[String] = None,
idKey: Option[String] = None,
suffixDateKey: Option[String] = None,
suffixDatePattern: Option[String] = None,
update: Option[Boolean] = None,
delete: Option[Boolean] = None,
parentIdKey: Option[String] = None,
callbacks: BulkCallbacks = BulkCallbacks.default,
bufferSize: Int = 500,
validateJson: Boolean = true,
skipInvalid: Boolean = true,
format: FileFormat = Unknown,
hadoopConf: Option[Configuration] = None
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {

implicit val ec: ExecutionContext = system.dispatcher
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)

logger.info(s"📁 Starting bulk from file: $filePath")

val source: Source[String, NotUsed] = if (validateJson) {
FileSourceFactory.fromFileValidated(filePath, bufferSize, skipInvalid, format)
} else {
FileSourceFactory.fromFile(filePath, bufferSize, format)
}

// Use the existing API with the file source
bulkWithResult[String](
items = source,
toDocument = identity, // The document is already in JSON format.
indexKey = indexKey,
idKey = idKey,
suffixDateKey = suffixDateKey,
suffixDatePattern = suffixDatePattern,
update = update,
delete = delete,
parentIdKey = parentIdKey,
callbacks = callbacks
)
}

/** Bulk from a Parquet file specifically
*/
def bulkFromParquet(
filePath: String,
indexKey: Option[String] = None,
idKey: Option[String] = None,
callbacks: BulkCallbacks = BulkCallbacks.default,
bufferSize: Int = 500,
hadoopConf: Option[Configuration] = None
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {

implicit val ec: ExecutionContext = system.dispatcher
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)

logger.info(s"📁 Starting bulk from Parquet file: $filePath")

bulkWithResult[String](
items = ParquetFileSource.fromFile(filePath, bufferSize),
toDocument = identity,
indexKey = indexKey,
idKey = idKey,
callbacks = callbacks
)
}

/** Bulk from a specific JSON file
*/
def bulkFromJson(
filePath: String,
indexKey: Option[String] = None,
idKey: Option[String] = None,
callbacks: BulkCallbacks = BulkCallbacks.default,
bufferSize: Int = 500,
validateJson: Boolean = true,
skipInvalid: Boolean = true,
hadoopConf: Option[Configuration] = None
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {

implicit val ec: ExecutionContext = system.dispatcher
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)

logger.info(s"📁 Starting bulk from JSON file: $filePath")

val source = if (validateJson) {
JsonFileSource.fromFileValidated(filePath, bufferSize, skipInvalid)
} else {
JsonFileSource.fromFile(filePath, bufferSize)
}

bulkWithResult[String](
items = source,
toDocument = identity,
indexKey = indexKey,
idKey = idKey,
callbacks = callbacks
)
}

/** Bulk with detailed results (successes + failures).
*
* This method provides:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ package object bulk {
else 0.0

def hasFailures: Boolean = failedCount > 0

def totalCount: Int = successCount + failedCount
}

sealed trait DocumentResult {
Expand Down
Loading