@@ -1201,25 +1201,71 @@ The **BulkApi** leverages Akka Streams for efficient, backpressure-aware bulk op
12011201#### ** Features**
12021202
12031203✅ ** Streaming Architecture** : Process millions of documents without memory issues
1204+ ✅ ** Multiple data sources** : In-memory, File-based (JSON, JSON Array, Parquet, Delta Lake)
12041205✅ ** Backpressure Handling** : Automatic flow control based on Elasticsearch capacity
12051206✅ ** Error Recovery** : Configurable retry strategies
12061207✅ ** Batching** : Automatic batching for optimal performance
12071208✅ ** Parallel Processing** : Concurrent bulk requests with configurable parallelism
12081209
1209- ** Example: **
1210+ #### Data Sources
12101211
1211- ``` scala
1212- import akka .stream .scaladsl ._
1212+ | Source Type | Format | Description |
1213+ | ------------------| ---------------| ---------------------------------------|
1214+ | ** In-Memory** | Scala objects | Direct streaming from collections |
1215+ | ** JSON** | Text | Newline-delimited JSON (NDJSON) |
1216+ | ** JSON Array** | Text | JSON array with nested structures |
1217+ | ** Parquet** | Binary | Columnar storage format |
1218+ | ** Delta Lake** | Directory | ACID transactional data lake |
1219+
1220+ #### Operation Types
1221+
1222+ | Operation | Action | Behavior |
1223+ | ------------| ----------------| -------------------------------------------|
1224+ | ** INDEX** | Insert/Replace | Creates or replaces entire document |
1225+ | ** UPDATE** | Upsert | Updates existing or creates new (partial) |
1226+ | ** DELETE** | Remove | Deletes document by ID |
12131227
1214- // Stream-based bulk indexing
1215- val documents : List [String ] = List (
1216- """ {"id":"user-1","name":"Alice","age":30}""" ,
1217- """ {"id":"user-2","name":"Bob","age":25}""" ,
1218- """ {"id":"user-3","name":"Charlie","age":35}"""
1228+ ** Examples:**
1229+
1230+ ``` scala
1231+ // High-performance file indexing
1232+ implicit val options : BulkOptions = BulkOptions (
1233+ defaultIndex = " products" ,
1234+ maxBulkSize = 10000 ,
1235+ balance = 16 ,
1236+ disableRefresh = true
12191237)
12201238
1221- implicit val bulkOptions : BulkOptions = BulkOptions (defaultIndex = " users" )
1222- client.bulkSource(Source .fromIterator(() => documents), identity, indexKey= Some (" id" ))
1239+ implicit val hadoopConf : Configuration = new Configuration ()
1240+
1241+ // Load from Parquet
1242+ client.bulkFromFile(
1243+ filePath = " /data/products.parquet" ,
1244+ format = Parquet ,
1245+ idKey = Some (" id" )
1246+ ).foreach { result =>
1247+ result.indices.foreach(client.refresh)
1248+ println(s " Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec " )
1249+ }
1250+
1251+ // Load from Delta Lake
1252+ client.bulkFromFile(
1253+ filePath = " /data/delta-products" ,
1254+ format = Delta ,
1255+ idKey = Some (" id" ),
1256+ update = Some (true )
1257+ ).foreach { result =>
1258+ println(s " Updated ${result.successCount} products from Delta Lake " )
1259+ }
1260+
1261+ // Load JSON Array with nested objects
1262+ client.bulkFromFile(
1263+ filePath = " /data/persons.json" ,
1264+ format = JsonArray ,
1265+ idKey = Some (" uuid" )
1266+ ).foreach { result =>
1267+ println(s " Indexed ${result.successCount} persons with nested structures " )
1268+ }
12231269```
12241270
12251271---
0 commit comments