Skip to content

Commit e4b0e40

Browse files
authored
Merge pull request #27 from SOFTNETWORK-APP/feature/bulkFromSourceFile
Add support to bulk from multiple data sources : #### Data Sources | Source Type | Format | Description | |------------------|---------------|---------------------------------------| | **In-Memory** | Scala objects | Direct streaming from collections | | **JSON** | Text | Newline-delimited JSON (NDJSON) | | **JSON Array** | Text | JSON array with nested structures | | **Parquet** | Binary | Columnar storage format | | **Delta Lake** | Directory | ACID transactional data lake | **Examples:** ```scala // High-performance file indexing implicit val options: BulkOptions = BulkOptions( defaultIndex = "products", maxBulkSize = 10000, balance = 16, disableRefresh = true ) implicit val hadoopConf: Configuration = new Configuration() // Load from Parquet client.bulkFromFile( filePath = "/data/products.parquet", format = Parquet, idKey = Some("id") ).foreach { result => result.indices.foreach(client.refresh) println(s"Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec") } // Load from Delta Lake client.bulkFromFile( filePath = "/data/delta-products", format = Delta, idKey = Some("id"), update = Some(true) ).foreach { result => println(s"Updated ${result.successCount} products from Delta Lake") } // Load JSON Array with nested objects client.bulkFromFile( filePath = "/data/persons.json", format = JsonArray, idKey = Some("uuid") ).foreach { result => println(s"Indexed ${result.successCount} persons with nested structures") } ```
2 parents fc309ff + 92e8c1e commit e4b0e40

File tree

16 files changed

+3268
-984
lines changed

16 files changed

+3268
-984
lines changed

README.md

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,25 +1201,71 @@ The **BulkApi** leverages Akka Streams for efficient, backpressure-aware bulk op
12011201
#### **Features**
12021202

12031203
**Streaming Architecture**: Process millions of documents without memory issues
1204+
**Multiple data sources**: In-memory, File-based (JSON, JSON Array, Parquet, Delta Lake)
12041205
**Backpressure Handling**: Automatic flow control based on Elasticsearch capacity
12051206
**Error Recovery**: Configurable retry strategies
12061207
**Batching**: Automatic batching for optimal performance
12071208
**Parallel Processing**: Concurrent bulk requests with configurable parallelism
12081209

1209-
**Example:**
1210+
#### Data Sources
12101211

1211-
```scala
1212-
import akka.stream.scaladsl._
1212+
| Source Type | Format | Description |
1213+
|------------------|---------------|---------------------------------------|
1214+
| **In-Memory** | Scala objects | Direct streaming from collections |
1215+
| **JSON** | Text | Newline-delimited JSON (NDJSON) |
1216+
| **JSON Array** | Text | JSON array with nested structures |
1217+
| **Parquet** | Binary | Columnar storage format |
1218+
| **Delta Lake** | Directory | ACID transactional data lake |
1219+
1220+
#### Operation Types
1221+
1222+
| Operation | Action | Behavior |
1223+
|------------|----------------|-------------------------------------------|
1224+
| **INDEX** | Insert/Replace | Creates or replaces entire document |
1225+
| **UPDATE** | Upsert | Updates existing or creates new (partial) |
1226+
| **DELETE** | Remove | Deletes document by ID |
12131227

1214-
// Stream-based bulk indexing
1215-
val documents: List[String] = List(
1216-
"""{"id":"user-1","name":"Alice","age":30}""",
1217-
"""{"id":"user-2","name":"Bob","age":25}""",
1218-
"""{"id":"user-3","name":"Charlie","age":35}"""
1228+
**Examples:**
1229+
1230+
```scala
1231+
// High-performance file indexing
1232+
implicit val options: BulkOptions = BulkOptions(
1233+
defaultIndex = "products",
1234+
maxBulkSize = 10000,
1235+
balance = 16,
1236+
disableRefresh = true
12191237
)
12201238

1221-
implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "users")
1222-
client.bulkSource(Source.fromIterator(() => documents), identity, indexKey=Some("id"))
1239+
implicit val hadoopConf: Configuration = new Configuration()
1240+
1241+
// Load from Parquet
1242+
client.bulkFromFile(
1243+
filePath = "/data/products.parquet",
1244+
format = Parquet,
1245+
idKey = Some("id")
1246+
).foreach { result =>
1247+
result.indices.foreach(client.refresh)
1248+
println(s"Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec")
1249+
}
1250+
1251+
// Load from Delta Lake
1252+
client.bulkFromFile(
1253+
filePath = "/data/delta-products",
1254+
format = Delta,
1255+
idKey = Some("id"),
1256+
update = Some(true)
1257+
).foreach { result =>
1258+
println(s"Updated ${result.successCount} products from Delta Lake")
1259+
}
1260+
1261+
// Load JSON Array with nested objects
1262+
client.bulkFromFile(
1263+
filePath = "/data/persons.json",
1264+
format = JsonArray,
1265+
idKey = Some("uuid")
1266+
).foreach { result =>
1267+
println(s"Indexed ${result.successCount} persons with nested structures")
1268+
}
12231269
```
12241270

12251271
---
@@ -1471,18 +1517,18 @@ ThisBuild / resolvers ++= Seq(
14711517

14721518
// For Elasticsearch 6
14731519
// Using Jest client
1474-
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.0
1520+
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.1
14751521
// Or using Rest High Level client
1476-
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.0
1522+
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.1
14771523

14781524
// For Elasticsearch 7
1479-
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.0
1525+
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.1
14801526

14811527
// For Elasticsearch 8
1482-
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.0
1528+
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.1
14831529

14841530
// For Elasticsearch 9
1485-
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.0
1531+
libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.1
14861532
```
14871533

14881534
### **Quick Example**

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork"
1919

2020
name := "softclient4es"
2121

22-
ThisBuild / version := "0.14.0"
22+
ThisBuild / version := "0.14.1"
2323

2424
ThisBuild / scalaVersion := scala213
2525

core/build.sbt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ val mockito = Seq(
3030
"org.mockito" %% "mockito-scala" % "1.17.12" % Test
3131
)
3232

33-
libraryDependencies ++= akka ++ typesafeConfig ++ http ++
34-
json4s ++ mockito :+ "com.google.code.gson" % "gson" % Versions.gson :+
35-
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
36-
"org.scalatest" %% "scalatest" % Versions.scalatest % Test
33+
// Parquet & Avro
34+
val avro = Seq(
35+
"org.apache.parquet" % "parquet-avro" % "1.15.2" excludeAll (excludeSlf4jAndLog4j *),
36+
"org.apache.avro" % "avro" % "1.11.4" excludeAll (excludeSlf4jAndLog4j *),
37+
"org.apache.hadoop" % "hadoop-client" % "3.4.2" excludeAll (excludeSlf4jAndLog4j *)
38+
)
3739

40+
libraryDependencies ++= akka ++ typesafeConfig ++ http ++
41+
json4s ++ mockito ++ avro :+ "com.google.code.gson" % "gson" % Versions.gson :+
42+
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
43+
"io.delta" %% "delta-standalone" % "0.6.0" :+
44+
"org.scalatest" %% "scalatest" % Versions.scalatest % Test

core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import akka.actor.ActorSystem
2121
import akka.stream.{FlowShape, Materializer}
2222
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
2323
import app.softnetwork.elastic.client.bulk._
24+
import app.softnetwork.elastic.client.file._
2425
import app.softnetwork.elastic.client.result.{ElasticResult, ElasticSuccess}
26+
27+
import org.apache.hadoop.conf.Configuration
28+
2529
import org.json4s.DefaultFormats
2630
import org.json4s.jackson.JsonMethods.parse
2731

@@ -41,6 +45,142 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
4145
// PUBLIC METHODS
4246
// ========================================================================
4347

48+
/** Bulk from a Parquet or JSON file with automatic detection
49+
*
50+
* @param filePath
51+
* path to the file (.parquet, .json, .jsonl)
52+
* @param indexKey
53+
* JSON key to extract the index
54+
* @param idKey
55+
* JSON key to extract the ID
56+
* @param suffixDateKey
57+
* JSON key to append a date to the index
58+
* @param suffixDatePattern
59+
* date formatting pattern
60+
* @param update
61+
* true for upsert, false for index
62+
* @param delete
63+
* true for delete
64+
* @param parentIdKey
65+
* JSON key for the parent
66+
* @param callbacks
67+
* callbacks for events
68+
* @param bufferSize
69+
* read buffer size
70+
* @param validateJson
71+
* validate each JSON line
72+
* @param skipInvalid
73+
* ignore invalid JSON lines
74+
* @param format
75+
* file format (auto-detection if Unknown)
76+
* @param hadoopConf
77+
* custom Hadoop configuration
78+
* @param bulkOptions
79+
* configuration options
80+
* @return
81+
* Future with the detailed result
82+
*/
83+
def bulkFromFile(
84+
filePath: String,
85+
indexKey: Option[String] = None,
86+
idKey: Option[String] = None,
87+
suffixDateKey: Option[String] = None,
88+
suffixDatePattern: Option[String] = None,
89+
update: Option[Boolean] = None,
90+
delete: Option[Boolean] = None,
91+
parentIdKey: Option[String] = None,
92+
callbacks: BulkCallbacks = BulkCallbacks.default,
93+
bufferSize: Int = 500,
94+
validateJson: Boolean = true,
95+
skipInvalid: Boolean = true,
96+
format: FileFormat = Unknown,
97+
hadoopConf: Option[Configuration] = None
98+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
99+
100+
implicit val ec: ExecutionContext = system.dispatcher
101+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
102+
103+
logger.info(s"📁 Starting bulk from file: $filePath")
104+
105+
val source: Source[String, NotUsed] = if (validateJson) {
106+
FileSourceFactory.fromFileValidated(filePath, bufferSize, skipInvalid, format)
107+
} else {
108+
FileSourceFactory.fromFile(filePath, bufferSize, format)
109+
}
110+
111+
// Use the existing API with the file source
112+
bulkWithResult[String](
113+
items = source,
114+
toDocument = identity, // The document is already in JSON format.
115+
indexKey = indexKey,
116+
idKey = idKey,
117+
suffixDateKey = suffixDateKey,
118+
suffixDatePattern = suffixDatePattern,
119+
update = update,
120+
delete = delete,
121+
parentIdKey = parentIdKey,
122+
callbacks = callbacks
123+
)
124+
}
125+
126+
/** Bulk from a Parquet file specifically
127+
*/
128+
def bulkFromParquet(
129+
filePath: String,
130+
indexKey: Option[String] = None,
131+
idKey: Option[String] = None,
132+
callbacks: BulkCallbacks = BulkCallbacks.default,
133+
bufferSize: Int = 500,
134+
hadoopConf: Option[Configuration] = None
135+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
136+
137+
implicit val ec: ExecutionContext = system.dispatcher
138+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
139+
140+
logger.info(s"📁 Starting bulk from Parquet file: $filePath")
141+
142+
bulkWithResult[String](
143+
items = ParquetFileSource.fromFile(filePath, bufferSize),
144+
toDocument = identity,
145+
indexKey = indexKey,
146+
idKey = idKey,
147+
callbacks = callbacks
148+
)
149+
}
150+
151+
/** Bulk from a specific JSON file
152+
*/
153+
def bulkFromJson(
154+
filePath: String,
155+
indexKey: Option[String] = None,
156+
idKey: Option[String] = None,
157+
callbacks: BulkCallbacks = BulkCallbacks.default,
158+
bufferSize: Int = 500,
159+
validateJson: Boolean = true,
160+
skipInvalid: Boolean = true,
161+
hadoopConf: Option[Configuration] = None
162+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
163+
164+
implicit val ec: ExecutionContext = system.dispatcher
165+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
166+
167+
logger.info(s"📁 Starting bulk from JSON file: $filePath")
168+
169+
val source = if (validateJson) {
170+
JsonFileSource.fromFileValidated(filePath, bufferSize, skipInvalid)
171+
} else {
172+
JsonFileSource.fromFile(filePath, bufferSize)
173+
}
174+
175+
bulkWithResult[String](
176+
items = source,
177+
toDocument = identity,
178+
indexKey = indexKey,
179+
idKey = idKey,
180+
callbacks = callbacks
181+
)
182+
}
183+
44184
/** Bulk with detailed results (successes + failures).
45185
*
46186
* This method provides:

core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ package object bulk {
5959
else 0.0
6060

6161
def hasFailures: Boolean = failedCount > 0
62+
63+
def totalCount: Int = successCount + failedCount
6264
}
6365

6466
sealed trait DocumentResult {

0 commit comments

Comments
 (0)