diff --git a/README.md b/README.md index 786d079b..0ceb2134 100644 --- a/README.md +++ b/README.md @@ -1201,25 +1201,71 @@ The **BulkApi** leverages Akka Streams for efficient, backpressure-aware bulk op #### **Features** ✅ **Streaming Architecture**: Process millions of documents without memory issues +✅ **Multiple data sources**: In-memory, File-based (JSON, JSON Array, Parquet, Delta Lake) ✅ **Backpressure Handling**: Automatic flow control based on Elasticsearch capacity ✅ **Error Recovery**: Configurable retry strategies ✅ **Batching**: Automatic batching for optimal performance ✅ **Parallel Processing**: Concurrent bulk requests with configurable parallelism -**Example:** +#### Data Sources -```scala -import akka.stream.scaladsl._ +| Source Type | Format | Description | +|------------------|---------------|---------------------------------------| +| **In-Memory** | Scala objects | Direct streaming from collections | +| **JSON** | Text | Newline-delimited JSON (NDJSON) | +| **JSON Array** | Text | JSON array with nested structures | +| **Parquet** | Binary | Columnar storage format | +| **Delta Lake** | Directory | ACID transactional data lake | + +#### Operation Types + +| Operation | Action | Behavior | +|------------|----------------|-------------------------------------------| +| **INDEX** | Insert/Replace | Creates or replaces entire document | +| **UPDATE** | Upsert | Updates existing or creates new (partial) | +| **DELETE** | Remove | Deletes document by ID | -// Stream-based bulk indexing -val documents: List[String] = List( - """{"id":"user-1","name":"Alice","age":30}""", - """{"id":"user-2","name":"Bob","age":25}""", - """{"id":"user-3","name":"Charlie","age":35}""" +**Examples:** + +```scala +// High-performance file indexing +implicit val options: BulkOptions = BulkOptions( + defaultIndex = "products", + maxBulkSize = 10000, + balance = 16, + disableRefresh = true ) -implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "users") -client.bulkSource(Source.fromIterator(() => documents), identity, indexKey=Some("id")) +implicit val hadoopConf: Configuration = new Configuration() + +// Load from Parquet +client.bulkFromFile( + filePath = "/data/products.parquet", + format = Parquet, + idKey = Some("id") +).foreach { result => + result.indices.foreach(client.refresh) + println(s"Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec") +} + +// Load from Delta Lake +client.bulkFromFile( + filePath = "/data/delta-products", + format = Delta, + idKey = Some("id"), + update = Some(true) +).foreach { result => + println(s"Updated ${result.successCount} products from Delta Lake") +} + +// Load JSON Array with nested objects +client.bulkFromFile( + filePath = "/data/persons.json", + format = JsonArray, + idKey = Some("uuid") +).foreach { result => + println(s"Indexed ${result.successCount} persons with nested structures") +} ``` --- @@ -1471,18 +1517,18 @@ ThisBuild / resolvers ++= Seq( // For Elasticsearch 6 // Using Jest client -libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.0 +libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-jest-client" % 0.14.1 // Or using Rest High Level client -libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.0 +libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es6-rest-client" % 0.14.1 // For Elasticsearch 7 -libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.0 +libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es7-rest-client" % 0.14.1 // For Elasticsearch 8 -libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.0 +libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es8-java-client" % 0.14.1 // For Elasticsearch 9 -libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.0 +libraryDependencies += "app.softnetwork.elastic" %% s"softclient4es9-java-client" % 0.14.1 ``` ### **Quick Example** diff --git a/build.sbt b/build.sbt index b5877a91..9ff003d8 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ ThisBuild / organization := "app.softnetwork" name := "softclient4es" -ThisBuild / version := "0.14.0" +ThisBuild / version := "0.14.1" ThisBuild / scalaVersion := scala213 diff --git a/core/build.sbt b/core/build.sbt index 58a8a719..ea7f0cdb 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -30,8 +30,15 @@ val mockito = Seq( "org.mockito" %% "mockito-scala" % "1.17.12" % Test ) -libraryDependencies ++= akka ++ typesafeConfig ++ http ++ -json4s ++ mockito :+ "com.google.code.gson" % "gson" % Versions.gson :+ - "com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+ - "org.scalatest" %% "scalatest" % Versions.scalatest % Test +// Parquet & Avro +val avro = Seq( + "org.apache.parquet" % "parquet-avro" % "1.15.2" excludeAll (excludeSlf4jAndLog4j *), + "org.apache.avro" % "avro" % "1.11.4" excludeAll (excludeSlf4jAndLog4j *), + "org.apache.hadoop" % "hadoop-client" % "3.4.2" excludeAll (excludeSlf4jAndLog4j *) +) +libraryDependencies ++= akka ++ typesafeConfig ++ http ++ +json4s ++ mockito ++ avro :+ "com.google.code.gson" % "gson" % Versions.gson :+ +"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+ +"io.delta" %% "delta-standalone" % "0.6.0" :+ +"org.scalatest" %% "scalatest" % Versions.scalatest % Test diff --git a/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala index 3516fd34..df9714f4 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala @@ -21,7 +21,11 @@ import akka.actor.ActorSystem import akka.stream.{FlowShape, Materializer} import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source} import app.softnetwork.elastic.client.bulk._ +import app.softnetwork.elastic.client.file._ import app.softnetwork.elastic.client.result.{ElasticResult, ElasticSuccess} + +import org.apache.hadoop.conf.Configuration + import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -41,6 +45,142 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers { // PUBLIC METHODS // ======================================================================== + /** Bulk from a Parquet or JSON file with automatic detection + * + * @param filePath + * path to the file (.parquet, .json, .jsonl) + * @param indexKey + * JSON key to extract the index + * @param idKey + * JSON key to extract the ID + * @param suffixDateKey + * JSON key to append a date to the index + * @param suffixDatePattern + * date formatting pattern + * @param update + * true for upsert, false for index + * @param delete + * true for delete + * @param parentIdKey + * JSON key for the parent + * @param callbacks + * callbacks for events + * @param bufferSize + * read buffer size + * @param validateJson + * validate each JSON line + * @param skipInvalid + * ignore invalid JSON lines + * @param format + * file format (auto-detection if Unknown) + * @param hadoopConf + * custom Hadoop configuration + * @param bulkOptions + * configuration options + * @return + * Future with the detailed result + */ + def bulkFromFile( + filePath: String, + indexKey: Option[String] = None, + idKey: Option[String] = None, + suffixDateKey: Option[String] = None, + suffixDatePattern: Option[String] = None, + update: Option[Boolean] = None, + delete: Option[Boolean] = None, + parentIdKey: Option[String] = None, + callbacks: BulkCallbacks = BulkCallbacks.default, + bufferSize: Int = 500, + validateJson: Boolean = true, + skipInvalid: Boolean = true, + format: FileFormat = Unknown, + hadoopConf: Option[Configuration] = None + )(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration) + + logger.info(s"📁 Starting bulk from file: $filePath") + + val source: Source[String, NotUsed] = if (validateJson) { + FileSourceFactory.fromFileValidated(filePath, bufferSize, skipInvalid, format) + } else { + FileSourceFactory.fromFile(filePath, bufferSize, format) + } + + // Use the existing API with the file source + bulkWithResult[String]( + items = source, + toDocument = identity, // The document is already in JSON format. + indexKey = indexKey, + idKey = idKey, + suffixDateKey = suffixDateKey, + suffixDatePattern = suffixDatePattern, + update = update, + delete = delete, + parentIdKey = parentIdKey, + callbacks = callbacks + ) + } + + /** Bulk from a Parquet file specifically + */ + def bulkFromParquet( + filePath: String, + indexKey: Option[String] = None, + idKey: Option[String] = None, + callbacks: BulkCallbacks = BulkCallbacks.default, + bufferSize: Int = 500, + hadoopConf: Option[Configuration] = None + )(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration) + + logger.info(s"📁 Starting bulk from Parquet file: $filePath") + + bulkWithResult[String]( + items = ParquetFileSource.fromFile(filePath, bufferSize), + toDocument = identity, + indexKey = indexKey, + idKey = idKey, + callbacks = callbacks + ) + } + + /** Bulk from a specific JSON file + */ + def bulkFromJson( + filePath: String, + indexKey: Option[String] = None, + idKey: Option[String] = None, + callbacks: BulkCallbacks = BulkCallbacks.default, + bufferSize: Int = 500, + validateJson: Boolean = true, + skipInvalid: Boolean = true, + hadoopConf: Option[Configuration] = None + )(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration) + + logger.info(s"📁 Starting bulk from JSON file: $filePath") + + val source = if (validateJson) { + JsonFileSource.fromFileValidated(filePath, bufferSize, skipInvalid) + } else { + JsonFileSource.fromFile(filePath, bufferSize) + } + + bulkWithResult[String]( + items = source, + toDocument = identity, + indexKey = indexKey, + idKey = idKey, + callbacks = callbacks + ) + } + /** Bulk with detailed results (successes + failures). * * This method provides: diff --git a/core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala b/core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala index 1ef6ed7f..3116b795 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala @@ -59,6 +59,8 @@ package object bulk { else 0.0 def hasFailures: Boolean = failedCount > 0 + + def totalCount: Int = successCount + failedCount } sealed trait DocumentResult { diff --git a/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala b/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala new file mode 100644 index 00000000..d7aef09e --- /dev/null +++ b/core/src/main/scala/app/softnetwork/elastic/client/file/package.scala @@ -0,0 +1,1266 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.client + +import akka.NotUsed +import akka.stream.scaladsl.Source +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.core.{JsonFactory, JsonParser, JsonToken} +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.databind.{ + DeserializationFeature, + JsonNode, + ObjectMapper, + SerializationFeature +} +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader} +import org.apache.parquet.hadoop.util.HadoopInputFile +import io.delta.standalone.DeltaLog +import io.delta.standalone.data.{CloseableIterator, RowRecord} +import io.delta.standalone.types._ +import org.apache.parquet.io.SeekableInputStream +import org.slf4j.{Logger, LoggerFactory} + +import java.io.{BufferedReader, InputStream, InputStreamReader} +import scala.concurrent.{blocking, ExecutionContext, Future} +import scala.io.{Source => IoSource} +import scala.util.{Failure, Success, Try} +import scala.jdk.CollectionConverters._ + +package object file { + + private val logger: Logger = LoggerFactory.getLogger("FileSource") + + sealed trait FileFormat { + def name: String + } + + case object Parquet extends FileFormat { + override def name: String = "Parquet" + } + + case object Json extends FileFormat { + override def name: String = "JSON" + } + + case object JsonArray extends FileFormat { + override def name: String = "JSON Array" + } + + case object Delta extends FileFormat { + override def name: String = "Delta Lake" + } + + case object Unknown extends FileFormat { + override def name: String = "Unknown" + } + + /** Hadoop configuration with optimizations for local file system */ + def hadoopConfiguration: Configuration = { + val conf = new Configuration() + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") + conf.setBoolean("parquet.avro.readInt96AsFixed", true) + // Optimizations + conf.setInt("io.file.buffer.size", 65536) // 64KB buffer + conf.setBoolean("fs.automatic.close", true) + conf + } + + /** Jackson ObjectMapper configuration */ + object JacksonConfig { + lazy val objectMapper: ObjectMapper = { + val mapper = new ObjectMapper() + + // Scala module for native support of Scala types + mapper.registerModule(DefaultScalaModule) + + // Java Time module for java.time.Instant, LocalDateTime, etc. + mapper.registerModule(new JavaTimeModule()) + + // Setup for performance + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) + + // Ignores null values in serialization + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + + // Optimizations + mapper.configure(SerializationFeature.INDENT_OUTPUT, false) // No pretty print + mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, false) + + mapper + } + } + + /** Base trait for file sources */ + sealed trait FileSource { + + protected def logger: Logger = LoggerFactory.getLogger(getClass) + + def format: FileFormat + + /** Reads a file and returns a Source of JSON strings + * + * @param filePath + * path to the file + * @param bufferSize + * buffer size for backpressure + * @param ec + * execution context + * @param conf + * Hadoop configuration (optional) + * @return + * Source of JSON strings + */ + def fromFile( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] + + lazy val mapper: ObjectMapper = JacksonConfig.objectMapper + + /** Version with Jackson JsonNode (more efficient) + */ + def fromFileAsJsonNode( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[JsonNode, NotUsed] = { + + fromFile(filePath, bufferSize).map { jsonString => + mapper.readTree(jsonString) + } + } + + /** Version with JSON validation and error logging + */ + def fromFileValidated( + filePath: String, + bufferSize: Int = 500, + skipInvalid: Boolean = true + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + var invalidCount = 0L + var validCount = 0L + + fromFile(filePath, bufferSize).mapConcat { line => + Try(mapper.readTree(line)) match { + case Success(_) => + validCount += 1 + if (validCount % 10000 == 0) { + logger.info(s"✅ Validated $validCount documents (${invalidCount} invalid)") + } + List(line) + + case Failure(ex) => + invalidCount += 1 + logger.warn( + s"⚠️ Invalid JSON line skipped (error: ${ex.getMessage}): ${line.take(100)}..." + ) + if (skipInvalid) List.empty else throw ex + } + } + } + + /** Validates that the file exists and is readable + * + * @param filePath + * Path to validate + * @param checkIsFile + * If true, validates it's a file; if false, validates it's a directory + * @param conf + * Hadoop configuration + */ + protected def validateFile( + filePath: String, + checkIsFile: Boolean = true + )(implicit conf: Configuration): Unit = { + val path = new Path(filePath) + val fs = FileSystem.get(conf) + + if (!fs.exists(path)) { + throw new IllegalArgumentException(s"File does not exist: $filePath") + } + + val status = fs.getFileStatus(path) + + if (checkIsFile && !status.isFile) { + throw new IllegalArgumentException(s"Path is not a file: $filePath") + } + + if (!checkIsFile && !status.isDirectory) { + throw new IllegalArgumentException(s"Path is not a directory: $filePath") + } + + if (checkIsFile && status.getLen == 0) { + logger.warn(s"⚠️ File is empty: $filePath") + } + + val pathType = if (checkIsFile) "file" else "directory" + val sizeInfo = if (checkIsFile) s"(${status.getLen} bytes)" else "" + logger.info(s"📁 Loading $pathType: $filePath $sizeInfo") + } + } + + /** Source for Parquet files */ + object ParquetFileSource extends FileSource { + + override def format: FileFormat = Parquet + + /** Creates an Akka Streams source from a Parquet file. Converts each Avro record to JSON + * String. + */ + override def fromFile( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + fromFileWithCustomMapping(filePath, bufferSize, record => record.toString) + } + + /** Alternative: Read Parquet with custom Avro to JSON conversion + */ + def fromFileWithCustomMapping( + filePath: String, + bufferSize: Int = 500, + avroToJson: GenericRecord => String + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + // Validate file before processing + Try(validateFile(filePath)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for Parquet file: $filePath", ex) + return Source.failed(ex) + } + + var recordCount = 0L + val startTime = System.currentTimeMillis() + + Source + .unfoldResource[String, ParquetReader[GenericRecord]]( + // Create: Open the Parquet reader + create = () => { + logger.info(s"📂 Opening Parquet file: $filePath") + Try { + AvroParquetReader + .builder[GenericRecord](HadoopInputFile.fromPath(new Path(filePath), conf)) + .withConf(conf) + .build() + } match { + case Success(reader) => reader + case Failure(ex) => + logger.error(s"❌ Failed to open Parquet file: $filePath", ex) + throw ex + } + }, + + // Read: Reads the next record and converts it to JSON + read = reader => + blocking { + Try(Option(reader.read())) match { + case Success(Some(record)) => + recordCount += 1 + if (recordCount % 10000 == 0) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + val throughput = recordCount / elapsed + logger.info( + f"📊 Read $recordCount records from Parquet ($throughput%.2f records/sec)" + ) + } + Some(avroToJson(record)) + + case Success(None) => + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + logger.info( + s"✅ Finished reading Parquet file: $recordCount records in ${elapsed}s" + ) + None + + case Failure(ex) => + logger.error(s"❌ Error reading Parquet record at position $recordCount", ex) + None + } + }, + + // Close: Close the reader properly + close = reader => { + Try(reader.close()) match { + case Success(_) => logger.debug(s"🔒 Closed Parquet reader for: $filePath") + case Failure(ex) => + logger.warn(s"⚠️ Failed to close Parquet reader: ${ex.getMessage}") + } + } + ) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + case class ParquetMetadata( + numRowGroups: Int, + numRows: Long, + schema: String + ) + + def getFileMetadata(filePath: String)(implicit conf: Configuration): ParquetMetadata = { + Try(validateFile(filePath, checkIsFile = true)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for Parquet file: $filePath", ex) + throw ex + } + + val path = new Path(filePath) + val inputFile = HadoopInputFile.fromPath(path, conf) + val reader = ParquetFileReader.open(inputFile) + + try { + val metadata = reader.getFooter.getFileMetaData + ParquetMetadata( + numRowGroups = reader.getRowGroups.size(), + numRows = reader.getRecordCount, + schema = metadata.getSchema.toString + ) + } finally { + reader.close() + } + } + } + + /** Source for JSON files (NDJSON or JSON Lines) */ + object JsonFileSource extends FileSource { + + override def format: FileFormat = Json + + /** Reads a JSON Lines file (one line = one JSON document). Format compatible with Elasticsearch + * Bulk. + */ + override def fromFile( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + // Validate file before processing + Try(validateFile(filePath)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for JSON file: $filePath", ex) + return Source.failed(ex) + } + + var lineCount = 0L + val startTime = System.currentTimeMillis() + + Source + .unfoldResource[String, BufferedReader]( + // Create: Open the file + create = () => { + logger.info(s"📂 Opening JSON file: $filePath") + Try { + val is: InputStream = HadoopInputFile.fromPath(new Path(filePath), conf).newStream() + new BufferedReader(new InputStreamReader(is, "UTF-8")) + } match { + case Success(reader) => reader + case Failure(ex) => + logger.error(s"❌ Failed to open JSON file: $filePath", ex) + throw ex + } + }, + + // Read: Read the next line + read = reader => { + blocking { + Try(Option(reader.readLine())) match { + case Success(Some(line)) => + lineCount += 1 + if (lineCount % 10000 == 0) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + val throughput = lineCount / elapsed + logger.info(f"📊 Read $lineCount lines from JSON ($throughput%.2f lines/sec)") + } + val trimmed = line.trim + if (trimmed.nonEmpty) Some(trimmed) else None // Skip empty lines + + case Success(None) => + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + logger.info(s"✅ Finished reading JSON file: $lineCount lines in ${elapsed}s") + None + + case Failure(ex) => + logger.error(s"❌ Error reading JSON line at position $lineCount", ex) + None + } + } + }, + + // Close: Close the reader properly + close = reader => { + Try(reader.close()) match { + case Success(_) => logger.debug(s"🔒 Closed JSON reader for: $filePath") + case Failure(ex) => + logger.warn(s"⚠️ Failed to close JSON reader: ${ex.getMessage}") + } + } + ) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + /** Alternative version with scala.io.Source (less control but simpler) + */ + def fromFileManaged( + filePath: String, + bufferSize: Int = 500 + ): Source[String, NotUsed] = { + + logger.info(s"📂 Opening JSON file (managed): $filePath") + + Source + .fromIterator(() => { + val source = IoSource.fromFile(filePath, "UTF-8") + source.getLines().filter(_.trim.nonEmpty) + }) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + } + + /** Source for JSON Array files (single array containing all documents) Format: + * [{"id":1},{"id":2},{"id":3}] + */ + object JsonArrayFileSource extends FileSource { + + override def format: FileFormat = JsonArray + + private val jsonFactory = new JsonFactory() + + override def fromFile( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + Try(validateFile(filePath)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for JSON Array file: $filePath", ex) + return Source.failed(ex) + } + + var elementCount = 0L + val startTime = System.currentTimeMillis() + + Source + .unfoldResource[String, (InputStream, JsonParser)]( + // Create: Open file via Hadoop and create JSON parser + create = () => { + logger.info(s"📂 Opening JSON Array file via Hadoop: $filePath") + Try { + val is: SeekableInputStream = + HadoopInputFile.fromPath(new Path(filePath), conf).newStream() + + // Create Jackson parser on top of Hadoop SeekableInputStream + val parser = jsonFactory.createParser(is) + + // Expect array start + val token = parser.nextToken() + if (token != JsonToken.START_ARRAY) { + is.close() + throw new IllegalArgumentException( + s"Expected JSON array, but found: ${token}. File: $filePath" + ) + } + + logger.info(s"📊 Started parsing JSON Array via Hadoop FS") + (is, parser) + } match { + case Success(result) => result + case Failure(ex) => + logger.error(s"❌ Failed to open JSON Array file: $filePath", ex) + throw ex + } + }, + + // Read: Parse next element from array + read = { case (_, parser) => + blocking { + Try { + val token = parser.nextToken() + + if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { + elementCount += 1 + + if (elementCount % 10000 == 0) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + val throughput = elementCount / elapsed + logger.info(f"📊 Parsed $elementCount elements ($throughput%.2f elements/sec)") + } + + // Parse current element as JsonNode + val node: JsonNode = mapper.readTree(parser) + Some(mapper.writeValueAsString(node)) + + } else if (token == JsonToken.END_ARRAY || token == null) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + logger.info( + s"✅ Finished reading JSON Array: $elementCount elements in ${elapsed}s" + ) + None + + } else { + // Skip unexpected tokens + logger.warn(s"⚠️ Unexpected token in JSON Array: $token") + None + } + } match { + case Success(result) => result + case Failure(ex) => + logger.error(s"❌ Error parsing JSON Array element at position $elementCount", ex) + None + } + } + }, + + // Close: Close parser and Hadoop input stream + close = { case (inputStream, parser) => + Try { + parser.close() // This also closes the underlying stream + } match { + case Success(_) => + logger.debug(s"🔒 Closed JSON Array parser for: $filePath") + case Failure(ex) => + logger.warn(s"⚠️ Failed to close JSON Array parser: ${ex.getMessage}") + } + + // Ensure Hadoop stream is closed + Try(inputStream.close()) match { + case Success(_) => + logger.debug(s"🔒 Closed Hadoop input stream for: $filePath") + case Failure(ex) => + logger.warn(s"⚠️ Failed to close Hadoop input stream: ${ex.getMessage}") + } + } + ) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + /** Alternative: Load entire array in memory (use for small files only!) + */ + def fromFileInMemory( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + Try(validateFile(filePath)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for JSON Array file: $filePath", ex) + return Source.failed(ex) + } + + logger.info(s"📂 Loading JSON Array file in memory: $filePath") + + Source + .future(Future { + blocking { + val is: InputStream = HadoopInputFile.fromPath(new Path(filePath), conf).newStream() + try { + val arrayNode = mapper.readTree(is) + if (!arrayNode.isArray) { + throw new IllegalArgumentException(s"File is not a JSON array: $filePath") + } + + arrayNode.elements().asScala.map(node => mapper.writeValueAsString(node)).toList + } finally { + is.close() + } + } + }) + .mapConcat(identity) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + /** Get metadata about the JSON array + */ + case class JsonArrayMetadata( + elementCount: Int, + hasNestedArrays: Boolean, + hasNestedObjects: Boolean, + maxDepth: Int + ) + + def getMetadata(filePath: String)(implicit conf: Configuration): JsonArrayMetadata = { + Try(validateFile(filePath, checkIsFile = true)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for JSON Array file: $filePath", ex) + throw ex + } + + val is: InputStream = HadoopInputFile.fromPath(new Path(filePath), conf).newStream() + + try { + val arrayNode = mapper.readTree(is) + if (!arrayNode.isArray) { + throw new IllegalArgumentException(s"File is not a JSON array: $filePath") + } + + val elements = arrayNode.elements().asScala.toList + val hasNestedArrays = elements.exists(hasArrayField) + val hasNestedObjects = elements.exists(hasObjectField) + val maxDepth = elements.map(calculateDepth).reduceOption(_ max _).getOrElse(0) + + JsonArrayMetadata( + elementCount = elements.size, + hasNestedArrays = hasNestedArrays, + hasNestedObjects = hasNestedObjects, + maxDepth = maxDepth + ) + } finally { + is.close() + } + } + + private def hasArrayField(node: JsonNode): Boolean = { + if (node.isArray) return true + if (node.isObject) { + node.properties().asScala.exists(entry => hasArrayField(entry.getValue)) + } else { + false + } + } + + private def hasObjectField(node: JsonNode): Boolean = { + if (node.isObject) return true + if (node.isArray) { + node.elements().asScala.exists(hasObjectField) + } else { + false + } + } + + private def calculateDepth(node: JsonNode): Int = { + if (node.isArray) { + 1 + node.elements().asScala.map(calculateDepth).reduceOption(_ max _).getOrElse(0) + } else if (node.isObject) { + 1 + node + .properties() + .asScala + .map(e => calculateDepth(e.getValue)) + .reduceOption(_ max _) + .getOrElse(0) + } else { + 0 + } + } + } + + /** Source for Delta Lake files using Delta Standalone + */ + object DeltaFileSource extends FileSource { + + override def format: FileFormat = Delta + + override def fromFile( + filePath: String, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + Try(validateFile(filePath, checkIsFile = false)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for Delta Lake table: $filePath", ex) + return Source.failed(ex) + } + + var rowCount = 0L + val startTime = System.currentTimeMillis() + + Source + .unfoldResource[String, CloseableIterator[RowRecord]]( + create = () => { + logger.info(s"📂 Opening Delta Lake table: $filePath") + Try { + val deltaLog = DeltaLog.forTable(conf, filePath) + val snapshot = deltaLog.snapshot() + + logger.info( + s"📊 Delta table version: ${snapshot.getVersion}, " + + s"files: ${snapshot.getAllFiles.size()}" + ) + + snapshot.open() + } match { + case Success(result) => result + case Failure(ex) => + logger.error(s"❌ Failed to open Delta Lake table: $filePath", ex) + throw ex + } + }, + read = iterator => + blocking { + Try { + if (iterator.hasNext) { + rowCount += 1 + if (rowCount % 10000 == 0) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + val throughput = rowCount / elapsed + logger.info(f"📊 Read $rowCount rows from Delta ($throughput%.2f rows/sec)") + } + + val row = iterator.next() + Some(rowRecordToJson(row)) + } else { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + logger.info(s"✅ Finished reading Delta table: $rowCount rows in ${elapsed}s") + None + } + } match { + case Success(result) => result + case Failure(ex) => + logger.error(s"❌ Error reading Delta row at position $rowCount", ex) + None + } + }, + close = iterator => + Try(iterator.close()) match { + case Success(_) => logger.debug(s"🔒 Closed Delta reader for: $filePath") + case Failure(ex) => + logger.warn(s"⚠️ Failed to close Delta reader: ${ex.getMessage}") + } + ) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + /** Convert Delta Standalone RowRecord to JSON string Utilise l'API réelle: getList/getMap + * retournent des java.util collections + */ + private def rowRecordToJson(row: RowRecord): String = { + import com.fasterxml.jackson.databind.node.ObjectNode + + val objectNode = mapper.createObjectNode() + val schema = row.getSchema + val fields = schema.getFields + + fields.foreach { field => + val fieldName = field.getName + val fieldType = field.getDataType + + if (row.isNullAt(fieldName)) { + objectNode.putNull(fieldName) + } else { + addValueToNode(objectNode, fieldName, row, fieldType) + } + } + + mapper.writeValueAsString(objectNode) + } + + /** Add typed value to Jackson ObjectNode + */ + private def addValueToNode( + node: ObjectNode, + fieldName: String, + row: RowRecord, + dataType: DataType + ): Unit = { + + Try { + dataType match { + case _: StringType => + node.put(fieldName, row.getString(fieldName)) + + case _: IntegerType => + node.put(fieldName, row.getInt(fieldName)) + + case _: ByteType => + node.put(fieldName, row.getByte(fieldName).toInt) + + case _: ShortType => + node.put(fieldName, row.getShort(fieldName).toInt) + + case _: LongType => + node.put(fieldName, row.getLong(fieldName)) + + case _: FloatType => + node.put(fieldName, row.getFloat(fieldName)) + + case _: DoubleType => + node.put(fieldName, row.getDouble(fieldName)) + + case _: BooleanType => + node.put(fieldName, row.getBoolean(fieldName)) + + case _: DecimalType => + node.put(fieldName, row.getBigDecimal(fieldName)) + + case _: DateType => + // Date stocké comme Int (jours depuis epoch) + val dateValue = row.getInt(fieldName) + val date = java.time.LocalDate.ofEpochDay(dateValue.toLong) + node.put(fieldName, date.toString) + + case _: TimestampType => + // Timestamp stocké comme Long (microsecondes depuis epoch) + val timestampMicros = row.getLong(fieldName) + val instant = java.time.Instant.ofEpochMilli(timestampMicros / 1000) + node.put(fieldName, instant.toString) + + case arrayType: ArrayType => + val arrayNode = node.putArray(fieldName) + // getList retourne java.util.List[Nothing] - on le traite comme Object + val list = row.getList(fieldName).asInstanceOf[java.util.List[Any]] + + list.asScala.foreach { element => + if (element == null) { + arrayNode.addNull() + } else { + addElementToArray(arrayNode, element, arrayType.getElementType) + } + } + + case structType: StructType => + val nestedNode = node.putObject(fieldName) + val nestedRow = row.getRecord(fieldName) + val nestedFields = structType.getFields + + nestedFields.foreach { nestedField => + val nestedFieldName = nestedField.getName + val nestedFieldType = nestedField.getDataType + + if (nestedRow.isNullAt(nestedFieldName)) { + nestedNode.putNull(nestedFieldName) + } else { + addValueToNode(nestedNode, nestedFieldName, nestedRow, nestedFieldType) + } + } + + case mapType: MapType => + val mapNode = node.putObject(fieldName) + // getMap retourne java.util.Map[Nothing, Nothing] - on le traite comme Object + val map = row.getMap(fieldName).asInstanceOf[java.util.Map[Any, Any]] + + map.asScala.foreach { case (key, value) => + val keyStr = key.toString + if (value == null) { + mapNode.putNull(keyStr) + } else { + addElementToObject(mapNode, keyStr, value, mapType.getValueType) + } + } + + case _: BinaryType => + val bytes = row.getBinary(fieldName) + node.put(fieldName, java.util.Base64.getEncoder.encodeToString(bytes)) + + case _ => + // Fallback: convertir en string + logger.warn(s"Unsupported data type for field $fieldName: ${dataType.getTypeName}") + node.put(fieldName, "") + } + } match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"Error processing field $fieldName: ${ex.getMessage}", ex) + node.put(fieldName, "") + } + } + + /** Add element to JSON array node + */ + private def addElementToArray( + arrayNode: com.fasterxml.jackson.databind.node.ArrayNode, + element: Any, + elementType: DataType + ): Unit = { + + elementType match { + case _: StringType => + arrayNode.add(element.toString) + + case _: IntegerType | _: ByteType | _: ShortType => + arrayNode.add(element.asInstanceOf[Number].intValue()) + + case _: LongType => + arrayNode.add(element.asInstanceOf[Number].longValue()) + + case _: FloatType => + arrayNode.add(element.asInstanceOf[Number].floatValue()) + + case _: DoubleType => + arrayNode.add(element.asInstanceOf[Number].doubleValue()) + + case _: BooleanType => + arrayNode.add(element.asInstanceOf[Boolean]) + + case _: DecimalType => + arrayNode.add(element.asInstanceOf[java.math.BigDecimal]) + + case _: DateType => + val days = element.asInstanceOf[Number].intValue() + val date = java.time.LocalDate.ofEpochDay(days.toLong) + arrayNode.add(date.toString) + + case _: TimestampType => + val micros = element.asInstanceOf[Number].longValue() + val instant = java.time.Instant.ofEpochMilli(micros / 1000) + arrayNode.add(instant.toString) + + case structType: StructType => + val nestedNode = arrayNode.addObject() + val nestedRow = element.asInstanceOf[RowRecord] + val nestedFields = structType.getFields + + nestedFields.foreach { field => + val nestedFieldName = field.getName + if (!nestedRow.isNullAt(nestedFieldName)) { + addValueToNode(nestedNode, nestedFieldName, nestedRow, field.getDataType) + } else { + nestedNode.putNull(nestedFieldName) + } + } + + case arrayType: ArrayType => + // Array imbriqué + val nestedArrayNode = arrayNode.addArray() + val nestedList = element.asInstanceOf[java.util.List[Any]] + nestedList.asScala.foreach { nestedElement => + if (nestedElement == null) { + nestedArrayNode.addNull() + } else { + addElementToArray(nestedArrayNode, nestedElement, arrayType.getElementType) + } + } + + case _ => + arrayNode.add(element.toString) + } + } + + /** Add element to JSON object node + */ + private def addElementToObject( + objectNode: com.fasterxml.jackson.databind.node.ObjectNode, + key: String, + value: Any, + valueType: DataType + ): Unit = { + + valueType match { + case _: StringType => + objectNode.put(key, value.toString) + + case _: IntegerType | _: ByteType | _: ShortType => + objectNode.put(key, value.asInstanceOf[Number].intValue()) + + case _: LongType => + objectNode.put(key, value.asInstanceOf[Number].longValue()) + + case _: FloatType => + objectNode.put(key, value.asInstanceOf[Number].floatValue()) + + case _: DoubleType => + objectNode.put(key, value.asInstanceOf[Number].doubleValue()) + + case _: BooleanType => + objectNode.put(key, value.asInstanceOf[Boolean]) + + case _: DecimalType => + objectNode.put(key, value.asInstanceOf[java.math.BigDecimal]) + + case _ => + objectNode.put(key, value.toString) + } + } + + /** Read Delta with version (time travel) + */ + def fromFileAtVersion( + filePath: String, + version: Long, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + Try(validateFile(filePath, checkIsFile = false)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for Delta Lake table: $filePath", ex) + return Source.failed(ex) + } + + var rowCount = 0L + val startTime = System.currentTimeMillis() + + Source + .unfoldResource[String, CloseableIterator[RowRecord]]( + create = () => { + logger.info(s"📂 Opening Delta Lake table at version $version: $filePath") + val deltaLog = DeltaLog.forTable(conf, filePath) + val snapshot = deltaLog.getSnapshotForVersionAsOf(version) + + logger.info(s"📊 Delta table version $version, files: ${snapshot.getAllFiles.size()}") + + snapshot.open() + }, + read = iterator => + blocking { + Try { + if (iterator.hasNext) { + rowCount += 1 + if (rowCount % 10000 == 0) { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + val throughput = rowCount / elapsed + logger.info( + f"📊 Read $rowCount rows from Delta v$version ($throughput%.2f rows/sec)" + ) + } + Some(rowRecordToJson(iterator.next())) + } else { + val elapsed = (System.currentTimeMillis() - startTime) / 1000.0 + logger.info( + s"✅ Finished reading Delta table v$version: $rowCount rows in ${elapsed}s" + ) + None + } + } match { + case Success(result) => result + case Failure(ex) => + logger.error(s"❌ Error reading Delta row at position $rowCount", ex) + None + } + }, + close = iterator => Try(iterator.close()) + ) + .buffer(bufferSize, akka.stream.OverflowStrategy.backpressure) + } + + /** Read Delta with timestamp (time travel) + */ + def fromFileAtTimestamp( + filePath: String, + timestampMillis: Long, + bufferSize: Int = 500 + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + + Try(validateFile(filePath, checkIsFile = false)) match { + case Success(_) => // OK + case Failure(ex) => + logger.error(s"❌ Validation failed for Delta Lake table: $filePath", ex) + return Source.failed(ex) + } + + logger.info(s"📂 Opening Delta Lake table at timestamp $timestampMillis: $filePath") + + val deltaLog = DeltaLog.forTable(conf, filePath) + val snapshot = deltaLog.getSnapshotForTimestampAsOf(timestampMillis) + + logger.info(s"📊 Resolved to version: ${snapshot.getVersion}") + + fromFileAtVersion(filePath, snapshot.getVersion, bufferSize) + } + + /** Get Delta table metadata + */ + def getTableInfo( + filePath: String + )(implicit conf: Configuration = hadoopConfiguration): DeltaTableInfo = { + val deltaLog = DeltaLog.forTable(conf, filePath) + val snapshot = deltaLog.snapshot() + val metadata = snapshot.getMetadata + + DeltaTableInfo( + version = snapshot.getVersion, + numFiles = snapshot.getAllFiles.size(), + schema = metadata.getSchema.getTreeString, + partitionColumns = metadata.getPartitionColumns.asScala.toList, + createdTime = metadata.getCreatedTime.orElse(0L), + description = Option(metadata.getDescription) + ) + } + } + + /** Case class for Delta table information + */ + case class DeltaTableInfo( + version: Long, + numFiles: Int, + schema: String, + partitionColumns: List[String], + createdTime: Long, + description: Option[String] + ) + + /** Automatic file format detection */ + object FileFormatDetector { + + def detect(filePath: String)(implicit conf: Configuration = hadoopConfiguration): FileFormat = { + val lowerPath = filePath.toLowerCase + + if (lowerPath.endsWith(".parquet") || lowerPath.endsWith(".parq")) { + Parquet + } else if (lowerPath.endsWith(".jsonl") || lowerPath.endsWith(".ndjson")) { + Json + } else if (lowerPath.endsWith(".json")) { + // Distinguishing JSON Lines vs JSON Array by reading the first character + detectJsonType(filePath) + } else if (isDeltaTable(filePath)) { + Delta + } else { + Unknown + } + } + + /** Detect if it's a Delta Lake table (check for _delta_log directory) + */ + private def isDeltaTable( + filePath: String + )(implicit conf: Configuration = hadoopConfiguration): Boolean = { + Try { + val fs = FileSystem.get(conf) + val deltaLogPath = new Path(filePath, "_delta_log") + fs.exists(deltaLogPath) && fs.getFileStatus(deltaLogPath).isDirectory + }.getOrElse(false) + } + + /** Distinguish between JSON Lines and JSON Array + */ + private def detectJsonType( + filePath: String + )(implicit conf: Configuration = hadoopConfiguration): FileFormat = { + Try { + val is = HadoopInputFile.fromPath(new Path(filePath), conf).newStream() + try { + val reader = new BufferedReader(new InputStreamReader(is, "UTF-8")) + val firstChar = reader.read().toChar + reader.close() + + if (firstChar == '[') JsonArray else Json + } finally { + is.close() + } + }.getOrElse(Unknown) + } + + /** Detect with validation + */ + def detectOrThrow(filePath: String): FileFormat = { + detect(filePath) match { + case Unknown => + throw new IllegalArgumentException( + s"Unsupported file format: $filePath. Supported: .parquet, .parq, .json, .jsonl, .ndjson" + ) + case format => format + } + } + } + + /** Factory to get the appropriate FileSource based on file format + */ + object FileSourceFactory { + + private def apply( + filePath: String + )(implicit conf: Configuration): FileSource = { + FileFormatDetector.detect(filePath) match { + case Parquet => ParquetFileSource + case Json => JsonFileSource + case JsonArray => JsonArrayFileSource + case Delta => DeltaFileSource + case Unknown => + throw new IllegalArgumentException( + s"Cannot determine file format for: $filePath. Supported: .parquet, .parq, .json, .jsonl, .ndjson" + ) + } + } + + def apply(filePath: String, format: FileFormat)(implicit + conf: Configuration = hadoopConfiguration + ): FileSource = { + format match { + case Parquet => ParquetFileSource + case Json => JsonFileSource + case JsonArray => JsonArrayFileSource + case Delta => DeltaFileSource + case Unknown => apply(filePath) + } + } + + /** Load file with specific format + */ + def fromFile( + filePath: String, + bufferSize: Int = 500, + format: FileFormat = Unknown + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + val source = apply(filePath, format) + logger.info(s"📁 Detected format: ${source.format.name}") + source.fromFile(filePath, bufferSize) + } + + /** Load file with validation + */ + def fromFileValidated( + filePath: String, + bufferSize: Int = 500, + skipInvalid: Boolean = true, + format: FileFormat = Unknown + )(implicit + ec: ExecutionContext, + conf: Configuration = hadoopConfiguration + ): Source[String, NotUsed] = { + val source = apply(filePath, format) + logger.info(s"📁 Detected format: ${source.format.name}") + source.fromFileValidated(filePath, bufferSize, skipInvalid) + } + } +} diff --git a/core/src/test/scala/app/softnetwork/elastic/client/file/FileSourceSpec.scala b/core/src/test/scala/app/softnetwork/elastic/client/file/FileSourceSpec.scala new file mode 100644 index 00000000..c24cb834 --- /dev/null +++ b/core/src/test/scala/app/softnetwork/elastic/client/file/FileSourceSpec.scala @@ -0,0 +1,784 @@ +package app.softnetwork.elastic.client.file + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.time.{Seconds, Span} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.example.data.simple.SimpleGroupFactory +import org.apache.parquet.schema.{MessageType, MessageTypeParser} +import io.delta.standalone.{DeltaLog, Operation} +import io.delta.standalone.actions.{AddFile, Metadata} +import io.delta.standalone.types.{IntegerType, StringType, StructField, StructType} + +import java.io.File +import java.nio.file.Files +import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ + +class FileSourceSpec extends AnyWordSpec with Matchers with ScalaFutures with BeforeAndAfterAll { + + implicit val system: ActorSystem = ActorSystem("FileSourceTest") + implicit val ec: ExecutionContext = system.dispatcher + implicit val conf: Configuration = hadoopConfiguration + implicit val patience: PatienceConfig = PatienceConfig(timeout = Span(10, Seconds)) + + "FileFormatDetector" should { + "detect Parquet files" in { + FileFormatDetector.detect("data/file.parquet") shouldBe Parquet + FileFormatDetector.detect("data/file.parq") shouldBe Parquet + } + + "detect JSON files" in { + FileFormatDetector.detect("data/file.jsonl") shouldBe Json + FileFormatDetector.detect("data/file.ndjson") shouldBe Json + } + + "detect unknown files" in { + FileFormatDetector.detect("data/file.txt") shouldBe Unknown + } + + "throw on unknown files with detectOrThrow" in { + assertThrows[IllegalArgumentException] { + FileFormatDetector.detectOrThrow("data/file.txt") + } + } + } + + "JsonFileFactory" should { + "read JSON lines file" in { + // Create a temporary file for testing + val tempFile = java.io.File.createTempFile("test", ".jsonl") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("""{"id":1,"name":"Alice","nested":{"age":30}}""") + writer.println("""{"id":2,"name":"Bob","nested":{"age":25}}""") + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath) + .runWith(Sink.seq) + .futureValue + + result should have size 2 + result.head should include("Alice") + result.last should include("Bob") + } + + "read JSON Array file (single line)" in { + val tempFile = java.io.File.createTempFile("test", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println( + """[{"id":1,"name":"Alice","nested": {"age":30}},{"id":2,"name":"Bob","nested":{"age":25}}]""" + ) + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = JsonArray) + .runWith(Sink.seq) + .futureValue + + result should have size 2 + result.head should include("Alice") + result.last should include("Bob") + } + + "read JSON Array file (multi-line)" in { + val tempFile = java.io.File.createTempFile("test", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("[") + writer.println(""" { + "id":1, + "name":"Alice", + "nested":{ + "age":30 + } + },""") + writer.println(""" {"id":2,"name":"Bob","nested":{"age":25}}""") + writer.println("]") + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath) + .runWith(Sink.seq) + .futureValue + + result should have size 2 + result.head should include("Alice") + result.last should include("Bob") + } + + "read JSON Array file with nested arrays and objects" in { + val tempFile = File.createTempFile("test-nested", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("""[ + | { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0 }, + | { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0 }, + | { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", + | "children": [ + | { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09" }, + | { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09" } + | ], + | "childrenCount": 2 + | } + |]""".stripMargin) + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = JsonArray) + .runWith(Sink.seq) + .futureValue + + result should have size 3 + + // Verify first element (no children) + result.head should include("Homer Simpson") + result.head should include("A12") + result.head should include("childrenCount") + + // Verify second element (no children) + result(1) should include("Moe Szyslak") + result(1) should include("A14") + + // Verify third element (with nested children array) + val thirdElement = result(2) + thirdElement should include("Barney Gumble2") + thirdElement should include("A16") + thirdElement should include("children") + thirdElement should include("Steve Gumble") + thirdElement should include("Josh Gumble") + thirdElement should include("parentId") + + // Parse and validate nested structure + + val mapper = JacksonConfig.objectMapper + + val jsonNode = mapper.readTree(thirdElement) + jsonNode.get("uuid").asText() shouldBe "A16" + jsonNode.get("childrenCount").asInt() shouldBe 2 + + val children = jsonNode.get("children") + children.isArray shouldBe true + children.size() shouldBe 2 + + children.get(0).get("name").asText() shouldBe "Steve Gumble" + children.get(0).get("parentId").asText() shouldBe "A16" + + children.get(1).get("name").asText() shouldBe "Josh Gumble" + children.get(1).get("parentId").asText() shouldBe "A16" + } + + "handle JSON Array with empty nested arrays" in { + val tempFile = File.createTempFile("test-empty-nested", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("""[ + | { "uuid": "A12", "name": "Test", "children": [] } + |]""".stripMargin) + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = JsonArray) + .runWith(Sink.seq) + .futureValue + + result should have size 1 + result.head should include("children") + result.head should include("[]") + } + + "handle deeply nested JSON structures" in { + val tempFile = File.createTempFile("test-deep-nested", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("""[ + | { + | "level1": { + | "level2": { + | "level3": { + | "data": "deep value", + | "array": [1, 2, 3] + | } + | } + | } + | } + |]""".stripMargin) + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = JsonArray) + .runWith(Sink.seq) + .futureValue + + result should have size 1 + result.head should include("deep value") + result.head should include("level1") + result.head should include("level2") + result.head should include("level3") + } + + "read Parquet file" in { + // Create temporary Parquet file + val tempDir = Files.createTempDirectory("parquet-test").toFile + tempDir.deleteOnExit() + val parquetFile = new File(tempDir, "test.parquet") + parquetFile.deleteOnExit() + + // Define schema + val schemaString = + """message TestRecord { + | required int32 id; + | required binary name (UTF8); + | required int32 age; + |}""".stripMargin + + val schema: MessageType = MessageTypeParser.parseMessageType(schemaString) + val groupFactory = new SimpleGroupFactory(schema) + + // Write Parquet file + val path = new Path(parquetFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + // Write records + val record1 = groupFactory.newGroup() + record1.add("id", 1) + record1.add("name", "Alice") + record1.add("age", 30) + writer.write(record1) + + val record2 = groupFactory.newGroup() + record2.add("id", 2) + record2.add("name", "Bob") + record2.add("age", 25) + writer.write(record2) + + val record3 = groupFactory.newGroup() + record3.add("id", 3) + record3.add("name", "Charlie") + record3.add("age", 35) + writer.write(record3) + } finally { + writer.close() + } + + // Read Parquet file + val result = FileSourceFactory + .fromFile(parquetFile.getAbsolutePath, format = Parquet) + .runWith(Sink.seq) + .futureValue + + result should have size 3 + result.head should include("Alice") + result(1) should include("Bob") + result.last should include("Charlie") + + // Verify JSON structure + result.foreach { json => + json should include("id") + json should include("name") + json should include("age") + } + } + + "read Parquet file with nested structures" in { + val tempDir = Files.createTempDirectory("parquet-nested-test").toFile + tempDir.deleteOnExit() + val parquetFile = new File(tempDir, "nested.parquet") + parquetFile.deleteOnExit() + + val schemaString = + """message NestedRecord { + | required int32 id; + | required group person { + | required binary name (UTF8); + | required int32 age; + | } + | required group address { + | required binary city (UTF8); + | required binary country (UTF8); + | } + |}""".stripMargin + + val schema: MessageType = MessageTypeParser.parseMessageType(schemaString) + val groupFactory = new SimpleGroupFactory(schema) + + val path = new Path(parquetFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + val record = groupFactory.newGroup() + record.add("id", 1) + + val person = record.addGroup("person") + person.add("name", "Alice") + person.add("age", 30) + + val address = record.addGroup("address") + address.add("city", "Paris") + address.add("country", "France") + + writer.write(record) + } finally { + writer.close() + } + + val result = FileSourceFactory + .fromFile(parquetFile.getAbsolutePath, format = Parquet) + .runWith(Sink.seq) + .futureValue + + result should have size 1 + val json = result.head + json should include("Alice") + json should include("Paris") + json should include("France") + } + + "get Parquet file metadata" in { + val tempDir = Files.createTempDirectory("parquet-meta-test").toFile + tempDir.deleteOnExit() + val parquetFile = new File(tempDir, "meta.parquet") + parquetFile.deleteOnExit() + + val schemaString = + """message TestRecord { + | required int32 id; + | required binary name (UTF8); + |}""".stripMargin + + val schema: MessageType = MessageTypeParser.parseMessageType(schemaString) + val groupFactory = new SimpleGroupFactory(schema) + + val path = new Path(parquetFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + (1 to 100).foreach { i => + val record = groupFactory.newGroup() + record.add("id", i) + record.add("name", s"Person$i") + writer.write(record) + } + } finally { + writer.close() + } + + val metadata = ParquetFileSource.getFileMetadata(parquetFile.getAbsolutePath) + + metadata.numRowGroups should be > 0 + metadata.numRows shouldBe 100 + metadata.schema should include("id") + metadata.schema should include("name") + } + + "read Delta Lake table" in { + val tempDir = Files.createTempDirectory("delta-test").toFile + tempDir.deleteOnExit() + + val deltaLog = DeltaLog.forTable(conf, tempDir.getAbsolutePath) + + // Define Delta Standalone StructType schema + val schema = new StructType( + Array( + new StructField("id", new IntegerType(), false), + new StructField("name", new StringType(), false), + new StructField("age", new IntegerType(), false) + ) + ) + + // Create metadata with Delta Standalone StructType + val metadata = Metadata + .builder() + .schema(schema) + .build() + + // Write Parquet data file + val dataFile = new File(tempDir, "part-00000.parquet") + + val parquetSchema = MessageTypeParser.parseMessageType( + """message TestRecord { + | required int32 id; + | required binary name (UTF8); + | required int32 age; + |}""".stripMargin + ) + + val groupFactory = new SimpleGroupFactory(parquetSchema) + val path = new Path(dataFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(parquetSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + val record1 = groupFactory.newGroup() + record1.add("id", 1) + record1.add("name", "Alice") + record1.add("age", 30) + writer.write(record1) + + val record2 = groupFactory.newGroup() + record2.add("id", 2) + record2.add("name", "Bob") + record2.add("age", 25) + writer.write(record2) + } finally { + writer.close() + } + + // Create AddFile action - relative path from the table root + val addFile = AddFile + .builder( + dataFile.getName, + Map.empty[String, String].asJava, + dataFile.length(), + System.currentTimeMillis(), + true + ) + .build() + + // Create Operation + val operation = new Operation(Operation.Name.WRITE) + + // Commit to Delta log + val txn = deltaLog.startTransaction() + txn.commit(List(metadata, addFile).asJava, operation, "0") + + // Read Delta table + val result = FileSourceFactory + .fromFile(tempDir.getAbsolutePath, format = Delta) + .runWith(Sink.seq) + .futureValue + + result should have size 2 + result.head should include("Alice") + result.last should include("Bob") + + result.foreach { json => + json should include("id") + json should include("name") + json should include("age") + } + } + + "read Delta table with time travel" in { + val tempDir = Files.createTempDirectory("delta-timetravel-test").toFile + tempDir.deleteOnExit() + + val deltaLog = DeltaLog.forTable(conf, tempDir.getAbsolutePath) + + // Define Delta Standalone StructType schema + val schema = new StructType( + Array( + new StructField("id", new IntegerType(), false), + new StructField("value", new StringType(), false) + ) + ) + + val metadata = Metadata + .builder() + .schema(schema) + .build() + + // Version 0: Initial data + val dataFile1 = createDeltaDataFile(tempDir, "v0.parquet", List((1, "v0_data"))) + val addFile1 = createAddFile(dataFile1) + + val operation1 = new Operation(Operation.Name.WRITE) + val txn1 = deltaLog.startTransaction() + txn1.commit(List(metadata, addFile1).asJava, operation1, "0") + + // Version 1: Add more data + val dataFile2 = createDeltaDataFile(tempDir, "v1.parquet", List((2, "v1_data"))) + val addFile2 = createAddFile(dataFile2) + + val operation2 = new Operation(Operation.Name.WRITE) + val txn2 = deltaLog.startTransaction() + txn2.commit(List(addFile2).asJava, operation2, "1") + + // Read version 0 + val resultV0 = DeltaFileSource + .fromFileAtVersion(tempDir.getAbsolutePath, version = 0) + .runWith(Sink.seq) + .futureValue + + resultV0 should have size 1 + resultV0.head should include("v0_data") + + // Read version 1 (latest) + val resultV1 = FileSourceFactory + .fromFile(tempDir.getAbsolutePath, format = Delta) + .runWith(Sink.seq) + .futureValue + + resultV1 should have size 2 + } + + "get Delta table info" in { + val tempDir = Files.createTempDirectory("delta-info-test").toFile + tempDir.deleteOnExit() + + val deltaLog = DeltaLog.forTable(conf, tempDir.getAbsolutePath) + + // Define Delta Standalone StructType schema + val schema = new StructType( + Array( + new StructField("id", new IntegerType(), false) + ) + ) + + val metadata = Metadata + .builder() + .schema(schema) + .description("Test table") + .build() + + val operation = new Operation(Operation.Name.CREATE_TABLE) + val txn = deltaLog.startTransaction() + txn.commit(List(metadata).asJava, operation, "0") + + val info = DeltaFileSource.getTableInfo(tempDir.getAbsolutePath) + + info.version shouldBe 0 + info.description shouldBe Some("Test table") + info.schema should include("id") + } + + "read Delta table with partitions" in { + val tempDir = Files.createTempDirectory("delta-partition-test").toFile + tempDir.deleteOnExit() + + val deltaLog = DeltaLog.forTable(conf, tempDir.getAbsolutePath) + + // Define partitioned schema with Delta Standalone types + val schema = new StructType( + Array( + new StructField("id", new IntegerType(), false), + new StructField("category", new StringType(), false), + new StructField("value", new StringType(), false) + ) + ) + + val metadata = Metadata + .builder() + .schema(schema) + .partitionColumns(List("category").asJava) + .build() + + // Create data files for different partitions + val dataFileA = createDeltaDataFilePartitioned( + tempDir, + "category=A/part-00000.parquet", + List((1, "A", "data_A1"), (2, "A", "data_A2")) + ) + + val dataFileB = createDeltaDataFilePartitioned( + tempDir, + "category=B/part-00000.parquet", + List((3, "B", "data_B1"), (4, "B", "data_B2")) + ) + + val addFileA = AddFile + .builder( + "category=A/part-00000.parquet", + Map("category" -> "A").asJava, + dataFileA.length(), + System.currentTimeMillis(), + true + ) + .build() + + val addFileB = AddFile + .builder( + "category=B/part-00000.parquet", + Map("category" -> "B").asJava, + dataFileB.length(), + System.currentTimeMillis(), + true + ) + .build() + + val operation = new Operation(Operation.Name.WRITE) + val txn = deltaLog.startTransaction() + txn.commit(List(metadata, addFileA, addFileB).asJava, operation, "0") + + // Read all data + val resultAll = FileSourceFactory + .fromFile(tempDir.getAbsolutePath, format = Delta) + .runWith(Sink.seq) + .futureValue + + resultAll should have size 4 + } + + "handle non-existent file" in { + val result = FileSourceFactory + .fromFile("/non/existent/file.json", format = Json) + .runWith(Sink.seq) + + whenReady(result.failed) { ex => + ex shouldBe a[IllegalArgumentException] + ex.getMessage should include("does not exist") + } + } + + "handle empty file" in { + val tempFile = java.io.File.createTempFile("test", ".json") + tempFile.deleteOnExit() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = Json) + .runWith(Sink.seq) + .futureValue + + result should have size 0 + } + + "handle invalid JSON Array" in { + val tempFile = File.createTempFile("test", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("""{"not":"an array"}""") + writer.close() + + val result = FileSourceFactory + .fromFile(tempFile.getAbsolutePath, format = JsonArray) + .runWith(Sink.seq) + + whenReady(result.failed) { ex => + ex shouldBe a[IllegalArgumentException] + ex.getMessage should include("JSON array") + } + } + } + + // Helper methods + private def createDeltaDataFile( + tempDir: File, + fileName: String, + data: List[(Int, String)] + ): File = { + val dataDir = new File(tempDir, "data") + dataDir.mkdirs() + val dataFile = new File(dataDir, fileName) + + val schema = MessageTypeParser.parseMessageType( + """message Record { + | required int32 id; + | required binary value (UTF8); + |}""".stripMargin + ) + + val groupFactory = new SimpleGroupFactory(schema) + val path = new Path(dataFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + data.foreach { case (id, value) => + val record = groupFactory.newGroup() + record.add("id", id) + record.add("value", value) + writer.write(record) + } + } finally { + writer.close() + } + + dataFile + } + + private def createDeltaDataFilePartitioned( + tempDir: File, + relativePath: String, + data: List[(Int, String, String)] + ): File = { + val dataFile = new File(tempDir, relativePath) + dataFile.getParentFile.mkdirs() + + val schema = MessageTypeParser.parseMessageType( + """message Record { + | required int32 id; + | required binary category (UTF8); + | required binary value (UTF8); + |}""".stripMargin + ) + + val groupFactory = new SimpleGroupFactory(schema) + val path = new Path(dataFile.getAbsolutePath) + val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter + .builder(path) + .withType(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withConf(conf) + .build() + + try { + data.foreach { case (id, category, value) => + val record = groupFactory.newGroup() + record.add("id", id) + record.add("category", category) + record.add("value", value) + writer.write(record) + } + } finally { + writer.close() + } + + dataFile + } + + private def createAddFile(dataFile: File): AddFile = { + AddFile + .builder( + s"data/${dataFile.getName}", + Map.empty[String, String].asJava, + dataFile.length(), + System.currentTimeMillis(), + true + ) + .build() + } + + override def afterAll(): Unit = { + system.terminate() + super.afterAll() + } +} diff --git a/documentation/client/bulk.md b/documentation/client/bulk.md index bae45c87..03780228 100644 --- a/documentation/client/bulk.md +++ b/documentation/client/bulk.md @@ -8,6 +8,7 @@ The **BulkApi** trait provides high-performance bulk operations for Elasticsearc **Features:** - **High-performance streaming** with Akka Streams +- **Multiple data sources**: In-memory, File-based (JSON, JSON Array, Parquet, Delta Lake) - **Automatic retry** with exponential backoff - **Parallel processing** with configurable balance - **Real-time progress tracking** and metrics @@ -23,6 +24,7 @@ The **BulkApi** trait provides high-performance bulk operations for Elasticsearc - Requires `SettingsApi` for index settings management - Requires `IndexApi` for individual document operations (retry) - Requires Akka Streams for reactive processing +- Requires Hadoop for file operations (Parquet, Delta Lake) --- @@ -43,6 +45,16 @@ Source[D, NotUsed] -> Return Either[FailedDocument, SuccessfulDocument] ``` +### Data Sources + +| Source Type | Format | Description | +|------------------|---------------|---------------------------------------| +| **In-Memory** | Scala objects | Direct streaming from collections | +| **JSON** | Text | Newline-delimited JSON (NDJSON) | +| **JSON Array** | Text | JSON array with nested structures | +| **Parquet** | Binary | Columnar storage format | +| **Delta Lake** | Directory | ACID transactional data lake | + ### Operation Types | Operation | Action | Behavior | @@ -88,7 +100,7 @@ case class BulkOptions( // Usage implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "products", + defaultIndex = "products", maxBulkSize = 5000, balance = 8 ) @@ -197,52 +209,173 @@ case class BulkMetrics( } ``` +--- + +### bulkFromFile + +**NEW**: Loads and indexes documents directly from files with support for multiple formats. + +**Signature:** + +```scala +def bulkFromFile( + filePath: String, + format: FileFormat = Json, + indexKey: Option[String] = None, + idKey: Option[String] = None, + suffixDateKey: Option[String] = None, + suffixDatePattern: Option[String] = None, + update: Option[Boolean] = None, + delete: Option[Boolean] = None, + callbacks: BulkCallbacks = BulkCallbacks.default +)(implicit + bulkOptions: BulkOptions, + system: ActorSystem, + conf: Configuration = new Configuration() +): Future[BulkResult] +``` + +**Parameters:** +- `filePath` - Path to the file (local or HDFS) +- `format` - File format: `Json`, `JsonArray`, `Parquet`, or `Delta` +- `indexKey` - Optional field name containing index name +- `idKey` - Optional field name containing document ID +- `suffixDateKey` - Optional date field for index suffix +- `suffixDatePattern` - Date pattern for suffix formatting +- `update` - If true, performs upsert operation +- `delete` - If true, performs delete operation +- `callbacks` - Event callbacks for monitoring +- `bulkOptions` - Implicit bulk configuration +- `system` - Implicit ActorSystem +- `conf` - Implicit Hadoop Configuration (for Parquet/Delta) + +**Supported File Formats:** + +```scala +sealed trait FileFormat +case object Json extends FileFormat // Newline-delimited JSON +case object JsonArray extends FileFormat // JSON array with nested objects +case object Parquet extends FileFormat // Apache Parquet columnar format +case object Delta extends FileFormat // Delta Lake tables +``` + +**Returns:** +- `Future[BulkResult]` with detailed success/failure information + **Examples:** +#### 1. Load from JSON File (NDJSON) + ```scala -import akka.actor.ActorSystem -import scala.concurrent.ExecutionContext.Implicits.global +// NDJSON file (one JSON object per line): +// {"id": "1", "name": "Product 1", "price": 10.0} +// {"id": "2", "name": "Product 2", "price": 20.0} -implicit val system: ActorSystem = ActorSystem("bulk-system") -implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "products", - maxBulkSize = 1000, - balance = 4 -) +val result = client.bulkFromFile( + filePath = "/data/products.jsonl", + format = Json, + idKey = Some("id") +).futureValue -// Domain model -case class Product(id: String, name: String, price: Double, category: String) +println(s"Indexed ${result.successCount} products from JSON") +``` -// Basic bulk indexing -val products: Source[Product, NotUsed] = getProducts() // Large dataset +#### 2. Load from JSON Array with Nested Objects -val toJson: Product => String = product => s""" -{ - "id": "${product.id}", - "name": "${product.name}", - "price": ${product.price}, - "category": "${product.category}" -} -""" +```scala +// JSON Array file: +// [ +// { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0 }, +// { "uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", +// "children": [ +// { "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09" }, +// { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09" } +// ], +// "childrenCount": 2 +// } +// ] + +val result = client.bulkFromFile( + filePath = "/data/persons.json", + format = JsonArray, + idKey = Some("uuid") +).futureValue + +println(s"Indexed ${result.successCount} persons with nested structures") +``` + +#### 3. Load from Parquet File -val resultFuture: Future[BulkResult] = client.bulkWithResult( - items = products, - toDocument = toJson, +```scala +// Parquet file with schema: id, name, price, category +val result = client.bulkFromFile( + filePath = "/data/products.parquet", + format = Parquet, idKey = Some("id") -) +).futureValue -resultFuture.foreach { result => - println(s"✅ Success: ${result.successCount}") - println(s"❌ Failed: ${result.failedCount}") - println(s"📊 Throughput: ${result.metrics.throughput} docs/sec") - - // Handle failures - result.failedDocuments.foreach { failed => - println(s"Failed ID: ${failed.id}, Error: ${failed.error}") - } -} +println(s"Indexed ${result.successCount} products from Parquet") +println(s"Throughput: ${result.metrics.throughput} docs/sec") +``` + +#### 4. Load from Delta Lake Table + +```scala +// Delta Lake table directory structure: +// /data/delta-products/ +// ├── _delta_log/ +// │ ├── 00000000000000000000.json +// │ └── 00000000000000000001.json +// ├── part-00000.parquet +// └── part-00001.parquet + +val result = client.bulkFromFile( + filePath = "/data/delta-products", + format = Delta, + idKey = Some("id") +).futureValue + +println(s"Indexed ${result.successCount} products from Delta Lake") +``` -// With callbacks for real-time monitoring +#### 5. Load with Date-Based Index Suffixing + +```scala +// Load logs with automatic date-based index partitioning +val result = client.bulkFromFile( + filePath = "/data/logs.jsonl", + format = Json, + idKey = Some("id"), + suffixDateKey = Some("timestamp"), + suffixDatePattern = Some("yyyy-MM-dd") +)( + bulkOptions.copy(defaultIndex = "logs"), + system, + hadoopConf +).futureValue + +// Creates indices: logs-2024-01-15, logs-2024-01-16, etc. +println(s"Indexed ${result.successCount} logs across ${result.indices.size} indices") +result.indices.foreach(idx => println(s" - $idx")) +``` + +#### 6. Load with Update (Upsert) + +```scala +// Update existing documents or insert new ones +val result = client.bulkFromFile( + filePath = "/data/product-updates.json", + format = JsonArray, + idKey = Some("id"), + update = Some(true) // Upsert mode +).futureValue + +println(s"Updated/Inserted ${result.successCount} products") +``` + +#### 7. Load with Callbacks for Monitoring + +```scala val callbacks = BulkCallbacks( onSuccess = (id, index) => logger.info(s"✅ Indexed document $id in $index"), @@ -252,108 +385,145 @@ val callbacks = BulkCallbacks( onComplete = result => { logger.info(s""" - |Bulk operation completed: + |📊 File bulk operation completed: + | - File: /data/products.parquet | - Success: ${result.successCount} | - Failed: ${result.failedCount} | - Duration: ${result.metrics.durationMs}ms | - Throughput: ${result.metrics.throughput} docs/sec """.stripMargin) - } + }, + + onBatchComplete = (batchSize, metrics) => + logger.info(s"📦 Processed batch: $batchSize docs (${metrics.throughput} docs/sec)") ) -client.bulkWithResult( - items = products, - toDocument = toJson, +val result = client.bulkFromFile( + filePath = "/data/large-dataset.parquet", + format = Parquet, idKey = Some("id"), callbacks = callbacks -) - -// Bulk update (upsert) -client.bulkWithResult( - items = productUpdates, - toDocument = toJson, - idKey = Some("id"), - update = Some(true) // Upsert mode -).foreach { result => - println(s"Updated ${result.successCount} products") -} +).futureValue +``` -// Bulk delete -val obsoleteProducts: Source[Product, NotUsed] = client.scrollAs[Product]( - """ - |SELECT uuid AS id, name, price, category, outdated AS obsolete FROM products WHERE outdated = true - |""".stripMargin -) -val idsToDelete: Source[String, NotUsed] = obsoleteProducts.map(_._1.id) +#### 8. Load from HDFS -client.bulkWithResult( - items = idsToDelete, - toDocument = id => s"""{"id": "$id"}""", - idKey = Some("id"), - delete = Some(true) -) +```scala +// Configure Hadoop for HDFS access +implicit val hadoopConf: Configuration = new Configuration() +hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020") +hadoopConf.set("dfs.client.use.datanode.hostname", "true") + +// Load from HDFS +val result = client.bulkFromFile( + filePath = "hdfs://namenode:8020/data/products.parquet", + format = Parquet, + idKey = Some("id") +).futureValue -// Date-based index suffixing -case class LogEntry(id: String, message: String, timestamp: String) +println(s"Indexed ${result.successCount} products from HDFS") +``` -val logs: Source[LogEntry, NotUsed] = getLogEntries() +#### 9. Error Handling -client.bulkWithResult( - items = logs, - toDocument = log => s""" - { - "id": "${log.id}", - "message": "${log.message}", - "timestamp": "${log.timestamp}" - } - """, - idKey = Some("id"), - suffixDateKey = Some("timestamp"), // Field containing date - suffixDatePattern = Some("yyyy-MM-dd") // Pattern for suffix -)( - bulkOptions.copy(defaultIndex = "logs"), // Base index: "logs-2024-01-15" - system -) +```scala +val result = client.bulkFromFile( + filePath = "/data/products.json", + format = JsonArray, + idKey = Some("id") +).futureValue -// Error handling and retry analysis -resultFuture.foreach { result => - if (result.failedCount > 0) { - // Group errors by type - val errorsByType = result.failedDocuments - .groupBy(_.error) - .mapValues(_.size) - - errorsByType.foreach { case (errorType, count) => - println(s"Error: $errorType - Count: $count") - } - - // Identify retryable failures - val retryable = result.failedDocuments.filter(_.retryable) - println(s"Retryable failures: ${retryable.size}") +// Handle failures +if (result.failedCount > 0) { + println(s"⚠️ ${result.failedCount} documents failed to index") + + // Group errors by type + val errorsByType = result.failedDocuments + .groupBy(_.error) + .view.mapValues(_.size) + + errorsByType.foreach { case (errorType, count) => + println(s" - $errorType: $count failures") } + + // Write failures to file for investigation + val failedIds = result.failedDocuments.map(_.id) + java.nio.file.Files.write( + java.nio.file.Paths.get("/data/failed-ids.txt"), + failedIds.mkString("\n").getBytes + ) } +``` -// Performance tuning example -implicit val highThroughputOptions: BulkOptions = BulkOptions( +#### 10. Performance Tuning for Large Files + +```scala +// Optimize for large file processing +implicit val highPerformanceOptions: BulkOptions = BulkOptions( defaultIndex = "products", maxBulkSize = 5000, // Larger batches balance = 8, // More parallel workers - disableRefresh = true, // Disable auto-refresh for speed + disableRefresh = true, // Disable auto-refresh retryOnFailure = true, - maxRetries = 5 + maxRetries = 3, + logEvery = 20 // Log less frequently ) -client.bulkWithResult( - items = largeDataset, - toDocument = toJson, +val result = client.bulkFromFile( + filePath = "/data/large-products.parquet", + format = Parquet, idKey = Some("id") -).foreach { result => - // Manual refresh after bulk - result.indices.foreach(client.refresh) - println(s"Bulk completed: ${result.metrics.throughput} docs/sec") -} +).futureValue + +// Manual refresh after bulk +result.indices.foreach(client.refresh) + +println(s""" + |✅ Large file processing completed: + | - Documents: ${result.successCount} + | - Duration: ${result.metrics.durationMs}ms + | - Throughput: ${result.metrics.throughput} docs/sec +""".stripMargin) ``` +#### 11. Delta Lake with Specific Version + +```scala +// Load specific version of Delta table +val deltaPath = "/data/delta-products" + +// Get Delta table info +val tableInfo = DeltaFileSource.getTableInfo(deltaPath) +println(s"Delta table version: ${tableInfo.version}") +println(s"Number of files: ${tableInfo.numFiles}") +println(s"Size: ${tableInfo.sizeInBytes} bytes") + +// Load from specific version +val result = client.bulkFromFile( + filePath = deltaPath, + format = Delta, + idKey = Some("id") +).futureValue + +println(s"Indexed ${result.successCount} products from Delta Lake v${tableInfo.version}") +``` + +--- + +### File Format Comparison + +| Format | Speed | Size | Schema | Nested | Use Case | +|--------------|------------|---------|--------|--------|---------------------------------| +| **JSON** | Medium | Large | No | Yes | Semi-structured data | +| **JsonArray**| Medium | Large | No | Yes | Complex nested structures | +| **Parquet** | Very Fast | Small | Yes | Yes | Big data, analytics | +| **Delta** | Very Fast | Small | Yes | Yes | ACID transactions, time travel | + +**Recommendations :** +- **JSON/JsonArray**: APIs, logs, semi-structured data +- **Parquet**: Large datasets, columnar analytics +- **Delta Lake**: Data lakes, versioning, ACID compliance + --- ### bulkSource @@ -450,7 +620,7 @@ source } .runWith(Sink.ignore) -// Integration with other streams +// Integration with file streaming val csvSource: Source[String, NotUsed] = FileIO.fromPath(Paths.get("products.csv")) .via(Framing.delimiter(ByteString("\n"), 1024)) @@ -461,7 +631,7 @@ csvSource .grouped(1000) .flatMapConcat { batch => client.bulkSource( - items = batch.iterator, + items = Source(batch), toDocument = toJson, idKey = Some("id") ) @@ -525,119 +695,82 @@ def bulk[D]( --- -## Implementation Requirements +## File-Based Bulk Operations -### toBulkElasticAction +### File Validation -```scala -implicit private[client] def toBulkElasticAction( - a: BulkActionType -): BulkElasticAction -``` - -Converts internal `BulkActionType` to Elasticsearch-specific bulk action. - ---- - -### bulkFlow +All file-based operations automatically validate: +- ✅ File/directory existence +- ✅ Read permissions +- ✅ File format compatibility +- ✅ Non-empty content (with warnings) ```scala -private[client] def bulkFlow(implicit - bulkOptions: BulkOptions, - system: ActorSystem -): Flow[Seq[BulkActionType], BulkResultType, NotUsed] -``` - -**Implementation Example:** - -```scala -private[client] def bulkFlow(implicit - bulkOptions: BulkOptions, - system: ActorSystem -): Flow[Seq[BulkActionType], BulkResultType, NotUsed] = { - - implicit val ec: ExecutionContext = system.dispatcher - - Flow[Seq[BulkActionType]] - .mapAsync(1) { actions => - val bulkRequest = new BulkRequest() - - actions.foreach { action => - val elasticAction = toBulkElasticAction(action) - bulkRequest.add(elasticAction) - } - - Future { - client.bulk(bulkRequest, RequestOptions.DEFAULT) - } - } +// Automatic validation +try { + val result = client.bulkFromFile( + filePath = "/data/products.parquet", + format = Parquet, + idKey = Some("id") + ).futureValue +} catch { + case e: IllegalArgumentException if e.getMessage.contains("does not exist") => + println("File not found") + case e: IllegalArgumentException if e.getMessage.contains("not a file") => + println("Path is not a file") + case e: IllegalArgumentException if e.getMessage.contains("not a directory") => + println("Path is not a directory (required for Delta)") } ``` ---- +### File Metadata -### extractBulkResults +Get information about files before processing: ```scala -private[client] def extractBulkResults( - result: BulkResultType, - originalBatch: Seq[BulkItem] -): Seq[Either[FailedDocument, SuccessfulDocument]] +// Parquet metadata (available) +val parquetMeta = ParquetFileSource.getFileMetadata("/data/products.parquet") +println(s""" + |Parquet file: + | Rows: ${parquetMeta.numRows} + | Row groups: ${parquetMeta.numRowGroups} + | Columns: ${parquetMeta.schema.getFieldCount} +""".stripMargin) + +// Delta table info (available) +val deltaMeta = DeltaFileSource.getTableInfo("/data/delta-products") +println(s""" + |Delta table: + | Version: ${deltaMeta.version} + | Files: ${deltaMeta.numFiles} + | Size: ${deltaMeta.sizeInBytes} bytes + | Partitions: ${deltaMeta.partitionColumns.mkString(", ")} +""".stripMargin) + +// JSON Array metadata (available) +val jsonMeta = JsonArrayFileSource.getMetadata("/data/persons.json") +println(s""" + |JSON array: + | Elements: ${jsonMeta.elementCount} + | Has nested arrays: ${jsonMeta.hasNestedArrays} + | Has nested objects: ${jsonMeta.hasNestedObjects} + | Max depth: ${jsonMeta.maxDepth} +""".stripMargin) ``` -**Implementation Example:** +### Format-Specific Metadata Methods -```scala -private[client] def extractBulkResults( - result: BulkResponse, - originalBatch: Seq[BulkItem] -): Seq[Either[FailedDocument, SuccessfulDocument]] = { - - result.getItems.zip(originalBatch).map { case (item, original) => - if (item.isFailed) { - Left(FailedDocument( - id = original.id.getOrElse("unknown"), - index = original.index, - document = original.document, - error = item.getFailureMessage, - retryable = isRetryable(item.getFailure) - )) - } else { - Right(SuccessfulDocument( - id = item.getId, - index = item.getIndex - )) - } - } -} - -private def isRetryable(failure: BulkItemResponse.Failure): Boolean = { - val retryableErrors = Set( - "version_conflict_engine_exception", - "es_rejected_execution_exception", - "timeout_exception" - ) - retryableErrors.exists(failure.getMessage.contains) -} -``` - ---- - -### toBulkAction & actionToBulkItem - -```scala -private[client] def toBulkAction(bulkItem: BulkItem): BulkActionType - -private[client] def actionToBulkItem(action: BulkActionType): BulkItem -``` - -Bidirectional conversion between internal `BulkItem` and Elasticsearch-specific `BulkActionType`. +| Format | Metadata Method | Available Info | +|--------------|-------------------------------|---------------------------------------------------| +| **Parquet** | `getFileMetadata(path)` | ✅ Rows, row groups, schema, compression | +| **Delta** | `getTableInfo(path)` | ✅ Version, files, size, partitions | +| **JsonArray**| `getMetadata(path)` | ✅ Element count, nesting info, max depth | --- ## Common Patterns -### High-Throughput Indexing +### High-Throughput File Indexing ```scala // Optimize for maximum throughput @@ -650,20 +783,18 @@ implicit val highPerformanceOptions: BulkOptions = BulkOptions( logEvery = 50 // Less frequent logging ) -val result = client.bulkWithResult( - items = massiveDataset, - toDocument = toJson, +val result = client.bulkFromFile( + filePath = "/data/massive-dataset.parquet", + format = Parquet, idKey = Some("id") -) +).futureValue -result.foreach { r => - // Manual refresh once at the end - r.indices.foreach(client.refresh) - println(s"Indexed ${r.successCount} documents at ${r.metrics.throughput} docs/sec") -} +// Manual refresh once at the end +result.indices.foreach(client.refresh) +println(s"Indexed ${result.successCount} documents at ${result.metrics.throughput} docs/sec") ``` -### Reliable Indexing with Retry +### Reliable File Indexing with Retry ```scala // Optimize for reliability @@ -678,93 +809,99 @@ implicit val reliableOptions: BulkOptions = BulkOptions( retryBackoffMultiplier = 3.0 ) -val result = client.bulkWithResult( - items = criticalData, - toDocument = toJson, +val result = client.bulkFromFile( + filePath = "/data/critical-data.json", + format = JsonArray, idKey = Some("id") -) +).futureValue -result.foreach { r => - if (r.failedCount > 0) { - // Log all failures for investigation - r.failedDocuments.foreach { failed => - logger.error(s"Critical failure: ${failed.id} - ${failed.error}") - alerting.sendAlert(s"Failed to index critical document: ${failed.id}") - } +if (result.failedCount > 0) { + // Log all failures for investigation + result.failedDocuments.foreach { failed => + logger.error(s"Critical failure: ${failed.id} - ${failed.error}") + alerting.sendAlert(s"Failed to index critical document: ${failed.id}") } } ``` -### Time-Series Data with Date Suffixes +### Time-Series Data from Files ```scala -case class LogEntry( - id: String, - timestamp: String, // ISO format: "2024-01-15T10:30:00Z" - level: String, - message: String -) - -val logs: Source[LogEntry, NotUsed] = streamLogs() - +// Load logs with automatic date-based partitioning implicit val logOptions: BulkOptions = BulkOptions( - defaultIndex = "logs", // Base index + defaultIndex = "logs", maxBulkSize = 2000, balance = 4 ) -client.bulkWithResult( - items = logs, - toDocument = log => s""" - { - "id": "${log.id}", - "timestamp": "${log.timestamp}", - "level": "${log.level}", - "message": "${log.message}" - } - """, +val result = client.bulkFromFile( + filePath = "/data/application-logs.jsonl", + format = Json, idKey = Some("id"), suffixDateKey = Some("timestamp"), suffixDatePattern = Some("yyyy-MM-dd") -) +).futureValue + // Creates indices: logs-2024-01-15, logs-2024-01-16, etc. +println(s"Indexed ${result.successCount} logs across ${result.indices.size} daily indices") ``` -### Incremental Updates +### Batch Processing Multiple Files ```scala -case class ProductUpdate(id: String, price: Double, stock: Int) - -val updates: Source[ProductUpdate, NotUsed] = getProductUpdates() +val files = Seq( + "/data/products-2024-01.parquet", + "/data/products-2024-02.parquet", + "/data/products-2024-03.parquet" +) -client.bulkWithResult( - items = updates, - toDocument = update => s""" - { - "id": "${update.id}", - "price": ${update.price}, - "stock": ${update.stock} +val results = Future.sequence( + files.map { file => + client.bulkFromFile( + filePath = file, + format = Parquet, + idKey = Some("id") + ) } - """, - idKey = Some("id"), - update = Some(true) // Upsert mode -).foreach { result => - println(s"Updated ${result.successCount} products") +) + +results.foreach { resultList => + val totalSuccess = resultList.map(_.successCount).sum + val totalFailed = resultList.map(_.failedCount).sum + + println(s""" + |📊 Batch processing completed: + | - Files processed: ${files.size} + | - Total success: $totalSuccess + | - Total failed: $totalFailed + """.stripMargin) } ``` -### Batch Deletion +### Incremental Delta Lake Updates ```scala -val obsoleteIds: Source[String, NotUsed] = findObsoleteDocuments() +// Track last processed version +var lastVersion: Long = 0 -client.bulkWithResult( - items = obsoleteIds, - toDocument = id => s"""{"id": "$id"}""", - idKey = Some("id"), - delete = Some(true) -).foreach { result => - println(s"Deleted ${result.successCount} documents") +// Get current Delta table version +val tableInfo = DeltaFileSource.getTableInfo("/data/delta-products") + +if (tableInfo.version > lastVersion) { + println(s"Processing Delta updates from v$lastVersion to v${tableInfo.version}") + + val result = client.bulkFromFile( + filePath = "/data/delta-products", + format = Delta, + idKey = Some("id"), + update = Some(true) // Upsert mode + ).futureValue + + lastVersion = tableInfo.version + + println(s"Updated ${result.successCount} products from Delta Lake") +} else { + println("No new Delta versions to process") } ``` @@ -772,758 +909,339 @@ client.bulkWithResult( ## Performance Optimization -### Tuning Parameters +### Tuning Parameters for File Operations -| Parameter | Low Throughput | Balanced | High Throughput | -|------------------|-----------------|-----------|------------------| -| `maxBulkSize` | 500 | 1000 | 5000-10000 | -| `balance` | 1-2 | 4 | 8-16 | -| `disableRefresh` | false | false | true | -| `retryOnFailure` | true | true | false | +| Parameter | Small Files (<1GB) | Medium Files (1-10GB) | Large Files (>10GB) | +|------------------|--------------------|-----------------------|---------------------| +| `maxBulkSize` | 1000 | 5000 | 10000 | +| `balance` | 4 | 8 | 16 | +| `disableRefresh` | false | true | true | +| `retryOnFailure` | true | true | false | -### Memory Considerations +### Format-Specific Optimization ```scala -// For large documents, use smaller batches -implicit val largeDocOptions: BulkOptions = BulkOptions( - defaultIndex = "documents", - maxBulkSize = 100, // Fewer large documents per batch - balance = 2 +// JSON - Text parsing overhead +implicit val textOptions: BulkOptions = BulkOptions( + maxBulkSize = 2000, + balance = 4 ) -// For small documents, use larger batches -implicit val smallDocOptions: BulkOptions = BulkOptions( - defaultIndex = "events", - maxBulkSize = 10000, // Many small documents per batch - balance = 8 +// Parquet/Delta - Binary format, faster +implicit val binaryOptions: BulkOptions = BulkOptions( + maxBulkSize = 10000, + balance = 16 ) ``` -### Backpressure Handling - -```scala -// Akka Streams automatically handles backpressure -val source = client.bulkSource( - items = infiniteStream, - toDocument = toJson, - idKey = Some("id") -) - -// Add throttling if needed -source - .throttle(1000, 1.second) // Max 1000 docs/sec - .runWith(Sink.foreach { - case Right(success) => println(s"✅ ${success.id}") - case Left(failed) => println(s"❌ ${failed.id}") - }) -end_scalar -``` - ---- - -## Error Handling - -### Retryable vs Non-Retryable Errors +### Memory Considerations ```scala -// Retryable errors (automatic retry) -val retryableErrors = Set( - "version_conflict_engine_exception", // Concurrent modification - "es_rejected_execution_exception", // Queue full - "timeout_exception", // Temporary timeout - "connect_exception" // Network issue +// For large Parquet files with wide schemas +implicit val wideSchemaOptions: BulkOptions = BulkOptions( + maxBulkSize = 500, // Smaller batches + balance = 2 // Less parallelism ) -// Non-retryable errors (fail immediately) -val nonRetryableErrors = Set( - "mapper_parsing_exception", // Invalid document structure - "illegal_argument_exception", // Invalid field value - "index_not_found_exception" // Missing index -) -``` - -### Handling Failures - -```scala -val result = client.bulkWithResult( - items = products, - toDocument = toJson, - idKey = Some("id") +// For narrow schemas (few columns) +implicit val narrowSchemaOptions: BulkOptions = BulkOptions( + maxBulkSize = 10000, + balance = 8 ) - -result.foreach { r => - if (r.failedCount > 0) { - // Group by error type - val errorGroups = r.failedDocuments.groupBy(_.error) - - errorGroups.foreach { case (errorType, failures) => - println(s"Error: $errorType") - println(s"Count: ${failures.size}") - - // Handle specific error types - errorType match { - case e if e.contains("mapper_parsing") => - // Log invalid documents for review - failures.foreach { f => - logger.error(s"Invalid document: ${f.document}") - } - - case e if e.contains("version_conflict") => - // Retry with latest version - failures.foreach { f => - retryWithFreshVersion(f.id) - } - - case _ => - logger.error(s"Unhandled error: $errorType") - } - } - } -} ``` --- -## Monitoring and Metrics - -### Real-Time Progress Tracking - -```scala -val callbacks = BulkCallbacks( - onSuccess = (id, index) => { - metricsCollector.incrementSuccess() - }, - - onFailure = failed => { - metricsCollector.incrementFailure(failed.error) - }, - - onComplete = result => { - val metrics = result.metrics - logger.info(s""" - |Bulk Operation Summary: - | Duration: ${metrics.durationMs}ms - | Total Documents: ${metrics.totalDocuments} - | Success: ${result.successCount} - | Failed: ${result.failedCount} - | Throughput: ${metrics.throughput} docs/sec - | Batches: ${metrics.totalBatches} - | Indices: ${result.indices.mkString(", ")} - """.stripMargin) - - // Error breakdown - metrics.errorsByType.foreach { case (errorType, count) => - logger.info(s" $errorType: $count") - } - } -) - -client.bulkWithResult( - items = products, - toDocument = toJson, - idKey = Some("id"), - callbacks = callbacks -) -``` +## Error Handling -### Custom Metrics Collection +### File-Specific Errors ```scala -var successCount = 0 -var failureCount = 0 -val startTime = System.currentTimeMillis() - -client.bulkSource( - items = products, - toDocument = toJson, +val result = client.bulkFromFile( + filePath = "/data/products.json", + format = JsonArray, idKey = Some("id") -).runWith(Sink.foreach { - case Right(_) => - successCount += 1 - if (successCount % 1000 == 0) { - val elapsed = System.currentTimeMillis() - startTime - val throughput = (successCount * 1000.0) / elapsed - println(s"Progress: $successCount docs, $throughput docs/sec") - } - - case Left(_) => - failureCount += 1 -}) -``` - ---- - -## Best Practices - -**1. Choose Appropriate Batch Sizes** - -```scala -// ✅ Good - balanced batch size -implicit val options: BulkOptions = BulkOptions( - defaultIndex = "products", - maxBulkSize = 1000 // Good for most use cases ) -// ❌ Too small - overhead -implicit val tooSmall: BulkOptions = BulkOptions(maxBulkSize = 10) - -// ❌ Too large - memory issues -implicit val tooLarge: BulkOptions = BulkOptions(maxBulkSize = 100000) -``` - -**2. Disable Refresh for Large Bulks** - -```scala -// ✅ Good - disable refresh during bulk -implicit val options: BulkOptions = BulkOptions( - defaultIndex = "products", - disableRefresh = true -) - -val result = client.bulkWithResult(items, toJson, Some("id")) -result.foreach { r => - // Manual refresh once at the end - r.indices.foreach(client.refresh) -} -``` - -**3. Handle Failures Appropriately** - -```scala -// ✅ Good - detailed failure handling -result.foreach { r => - r.failedDocuments.foreach { failed => - if (failed.retryable) { - retryQueue.add(failed) - } else { - deadLetterQueue.add(failed) - } - } -} - -// ❌ Avoid - ignoring failures -result.foreach { r => - println(s"Success: ${r.successCount}") - // Failures ignored! -} -``` - -**4. Use Callbacks for Monitoring** - -```scala -// ✅ Good - real-time monitoring -val callbacks = BulkCallbacks( - onSuccess = (id, index) => recordSuccess(id, index), - onFailure = failed => recordFailure(failed.error), - onComplete = result => sendCompletionNotification(result) -) - -client.bulkWithResult(items, toJson, Some("id"), callbacks = callbacks) -``` - -**5. Tune Parallelism Based on Cluster Size** - -```scala -// Small cluster (1-3 nodes) -implicit val smallCluster: BulkOptions = BulkOptions(balance = 2) - -// Medium cluster (4-10 nodes) -implicit val mediumCluster: BulkOptions = BulkOptions(balance = 4) - -// Large cluster (10+ nodes) -implicit val largeCluster: BulkOptions = BulkOptions(balance = 8) -``` - ---- - -## Testing Scenarios - -### Test Basic Bulk Indexing - -```scala -def testBulkIndexing()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "test-bulk", - maxBulkSize = 100 - ) +result.recover { + case e: IllegalArgumentException if e.getMessage.contains("does not exist") => + logger.error(s"File not found: ${e.getMessage}") + BulkResult.empty - val testData = (1 to 1000).map { i => - Map("id" -> s"doc-$i", "name" -> s"Product $i", "price" -> (i * 10.0)) - } + case e: IllegalArgumentException if e.getMessage.contains("not a JSON array") => + logger.error(s"Invalid JSON format: ${e.getMessage}") + BulkResult.empty - val toJson: Map[String, Any] => String = doc => s""" - { - "id": "${doc("id")}", - "name": "${doc("name")}", - "price": ${doc("price")} - } - """ + case e: java.io.IOException => + logger.error(s"IO error reading file: ${e.getMessage}") + BulkResult.empty - client.bulkWithResult( - items = testData.iterator, - toDocument = toJson, - idKey = Some("id") - ).map { result => - assert(result.successCount == 1000, "All documents should be indexed") - assert(result.failedCount == 0, "No failures expected") - assert(result.indices.contains("test-bulk"), "Index should be created") - - println(s"✅ Bulk test passed: ${result.successCount} documents indexed") - } + case e: Exception => + logger.error(s"Unexpected error: ${e.getMessage}", e) + throw e } ``` -### Test Bulk Update +### Handling Partial Failures ```scala -def testBulkUpdate()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "test-bulk") - - for { - // First, index documents - _ <- client.bulkWithResult( - items = testData.iterator, - toDocument = toJson, - idKey = Some("id") - ) - - // Then, update them - updates = testData.map(doc => doc.updated("price", 999.99)) - updateResult <- client.bulkWithResult( - items = updates.iterator, - toDocument = toJson, - idKey = Some("id"), - update = Some(true) - ) - - _ = assert(updateResult.successCount == testData.size, "All updates should succeed") - - // Verify updates - doc <- client.get("doc-1", "test-bulk") - _ = assert(doc.contains("999.99"), "Price should be updated") - } yield { - println("✅ Bulk update test passed") - } -} -``` - -### Test Bulk Delete +val result = client.bulkFromFile( + filePath = "/data/products.parquet", + format = Parquet, + idKey = Some("id") +).futureValue -```scala -def testBulkDelete()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "test-bulk") +if (result.failedCount > 0) { + val failureRate = result.failedCount.toDouble / (result.successCount + result.failedCount) - for { - // Index documents - _ <- client.bulkWithResult( - items = testData.iterator, - toDocument = toJson, - idKey = Some("id") - ) + if (failureRate > 0.1) { + // More than 10% failures - investigate + logger.error(s"High failure rate: ${failureRate * 100}%") - // Delete them - deleteResult <- client.bulkWithResult( - items = testData.iterator, - toDocument = toJson, - idKey = Some("id"), - delete = Some(true) + // Write failed documents for reprocessing + val failedJson = result.failedDocuments.map(_.document).mkString("\n") + java.nio.file.Files.write( + java.nio.file.Paths.get("/data/failed-documents.jsonl"), + failedJson.getBytes ) - - _ = assert(deleteResult.successCount == testData.size, "All deletes should succeed") - - // Verify deletion - exists <- client.exists("doc-1", "test-bulk") - _ = assert(!exists, "Document should be deleted") - } yield { - println("✅ Bulk delete test passed") } } ``` -### Test Error Handling +--- -```scala -def testBulkErrorHandling()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "test-bulk", - retryOnFailure = false // Disable retry for testing - ) - - val mixedData = Seq( - """{"id": "valid-1", "name": "Valid Product"}""", - """{"id": "invalid", "name": INVALID_JSON}""", // Invalid JSON - """{"id": "valid-2", "name": "Another Valid"}""" - ) - - client.bulkWithResult( - items = mixedData.iterator, - toDocument = identity, - idKey = Some("id") - ).map { result => - assert(result.successCount == 2, "Two valid documents should succeed") - assert(result.failedCount == 1, "One invalid document should fail") - - val failed = result.failedDocuments.head - assert(failed.id == "invalid", "Failed document ID should match") - assert(failed.error.contains("parse"), "Error should mention parsing") - - println("✅ Error handling test passed") - } -} -``` +## Testing File-Based Bulk Operations -### Test Date-Based Index Suffixing +### Test JSON Array with Nested Objects ```scala -def testDateSuffixing()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "logs") +"bulkFromFile" should "handle JSON array with nested objects" in { + implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "test-json") - val logs = Seq( - """{"id": "log-1", "timestamp": "2024-01-15T10:00:00Z", "message": "Log 1"}""", - """{"id": "log-2", "timestamp": "2024-01-16T10:00:00Z", "message": "Log 2"}""", - """{"id": "log-3", "timestamp": "2024-01-17T10:00:00Z", "message": "Log 3"}""" - ) + val tempFile = java.io.File.createTempFile("test", ".json") + tempFile.deleteOnExit() - client.bulkWithResult( - items = logs.iterator, - toDocument = identity, - idKey = Some("id"), - suffixDateKey = Some("timestamp"), - suffixDatePattern = Some("yyyy-MM-dd") - ).map { result => - assert(result.successCount == 3, "All logs should be indexed") - assert(result.indices.contains("logs-2024-01-15"), "Index with date suffix should exist") - assert(result.indices.contains("logs-2024-01-16"), "Index with date suffix should exist") - assert(result.indices.contains("logs-2024-01-17"), "Index with date suffix should exist") - assert(result.indices.size == 3, "Three different indices should be created") - - println("✅ Date suffixing test passed") - } -} -``` - -### Test Retry Mechanism - -```scala -def testRetryMechanism()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "test-bulk", - retryOnFailure = true, - maxRetries = 3, - retryDelay = 100.millis - ) + val writer = new java.io.PrintWriter(tempFile) + writer.println("""[ + | {"uuid": "A16", "name": "Person", "children": [ + | {"name": "Child 1", "age": 10}, + | {"name": "Child 2", "age": 12} + | ]} + |]""".stripMargin) + writer.close() - var attemptCount = 0 + val result = client.bulkFromFile( + filePath = tempFile.getAbsolutePath, + format = JsonArray, + idKey = Some("uuid") + ).futureValue - // Simulate transient failure - val mockData = Seq("""{"id": "doc-1", "name": "Test"}""") + result.successCount shouldBe 1 + result.failedCount shouldBe 0 - client.bulkWithResult( - items = mockData.iterator, - toDocument = { doc => - attemptCount += 1 - if (attemptCount < 3) { - // Simulate transient error - throw new Exception("Simulated transient error") - } - doc - }, - idKey = Some("id") - ).map { result => - assert(result.successCount == 1, "Document should succeed after retry") - assert(attemptCount >= 2, "Should have retried at least once") - - println(s"✅ Retry test passed (attempts: $attemptCount)") - } + // Verify nested structure was preserved + val doc = client.get("A16", "test-json").get + doc should include("children") + doc should include("Child 1") } ``` -### Test Performance Metrics +### Test Parquet File Loading ```scala -def testPerformanceMetrics()(implicit system: ActorSystem): Future[Unit] = { - implicit val bulkOptions: BulkOptions = BulkOptions( - defaultIndex = "test-bulk", - maxBulkSize = 1000, - logEvery = 10 - ) - - val largeDataset = (1 to 10000).map { i => - s"""{"id": "doc-$i", "name": "Product $i"}""" - } +"bulkFromFile" should "load and index Parquet file" in { + implicit val bulkOptions: BulkOptions = BulkOptions(defaultIndex = "test-parquet") - client.bulkWithResult( - items = largeDataset.iterator, - toDocument = identity, + // Assume Parquet file exists + val result = client.bulkFromFile( + filePath = "/test-data/products.parquet", + format = Parquet, idKey = Some("id") - ).map { result => - val metrics = result.metrics - - assert(metrics.totalDocuments == 10000, "Total documents should match") - assert(metrics.totalBatches == 10, "Should have 10 batches (1000 each)") - assert(metrics.throughput > 0, "Throughput should be calculated") - assert(metrics.duration > 0, "Duration should be recorded") - - println(s""" - |✅ Performance test passed: - | Documents: ${metrics.totalDocuments} - | Batches: ${metrics.totalBatches} - | Duration: ${metrics.duration}ms - | Throughput: ${metrics.throughput} docs/sec - """.stripMargin) - } + ).futureValue + + result.successCount should be > 0 + result.failedCount shouldBe 0 + result.metrics.throughput should be > 0.0 } ``` --- -## Advanced Use Cases +## Best Practices for File-Based Operations -### Multi-Index Bulk Operations +**1. Choose the Right Format** ```scala -case class Document(id: String, index: String, data: String) - -val multiIndexDocs: Source[Document, NotUsed] = getDocuments() - -// Custom transformation to handle multiple indices -client.bulkWithResult( - items = multiIndexDocs, - toDocument = doc => s""" - { - "id": "${doc.id}", - "index": "${doc.index}", - "data": "${doc.data}" - } - """, - indexKey = Some("index"), // Dynamic index per document - idKey = Some("id") -)( - bulkOptions.copy(defaultIndex = "default"), // Fallback index - system -).foreach { result => - println(s"Indexed across ${result.indices.size} indices") - result.indices.foreach(idx => println(s" - $idx")) -} -``` - -### Conditional Bulk Operations +// ✅ Good - Use Parquet for large datasets +client.bulkFromFile("/data/big-dataset.parquet", Parquet, idKey = Some("id")) -```scala -def bulkWithCondition[D]( - items: Source[D, NotUsed], - toDocument: D => String, - condition: D => Boolean -)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { - - val filteredItems = items.filter(condition) - - client.bulkWithResult( - items = filteredItems, - toDocument = toDocument, - idKey = Some("id") - ) -} - -// Usage: Only index products with price > 0 -bulkWithCondition( - items = products, - toDocument = toJson, - condition = (p: Product) => p.price > 0 -) +// ❌ Avoid - Json for large datasets (slow parsing) +client.bulkFromFile("/data/big-dataset.json", Json, idKey = Some("id")) ``` -### Bulk with Transformation Pipeline +**2. Validate Files Before Processing** ```scala -def bulkWithTransformation[D, T]( - items: Source[D, NotUsed], - transform: D => T, - toDocument: T => String -)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { - - val transformedItems = items.map(transform) - - client.bulkWithResult( - items = transformedItems, - toDocument = toDocument, - idKey = Some("id") - ) -} - -// Usage: Enrich products before indexing -case class EnrichedProduct( - id: String, - name: String, - price: Double, - category: String, - enrichedAt: String -) - -def enrichProduct(product: Product): EnrichedProduct = { - EnrichedProduct( - id = product.id, - name = product.name, - price = product.price, - category = categorize(product), - enrichedAt = java.time.Instant.now().toString - ) -} - -bulkWithTransformation( - items = products, - transform = enrichProduct, - toDocument = toJson -) -``` - -### Bulk with External API Integration - -```scala -def bulkWithExternalEnrichment[D]( - items: Source[D, NotUsed], - enrichmentApi: D => Future[D], - toDocument: D => String -)(implicit - bulkOptions: BulkOptions, - system: ActorSystem, - ec: ExecutionContext -): Future[BulkResult] = { - - // Enrich in batches to avoid overwhelming external API - val enrichedFuture = Future.sequence( - items.grouped(100).map { batch => - Future.sequence(batch.map(enrichmentApi)) - } - ).map(_.flatten) - - enrichedFuture.flatMap { enrichedItems => - client.bulkWithResult( - items = enrichedItems.iterator, - toDocument = toDocument, - idKey = Some("id") - ) - } +// ✅ Good - Check file metadata first +val parquetMeta = ParquetFileSource.getFileMetadata("/data/products.parquet") +if (parquetMeta.numRows > 1000000) { + // Use optimized settings for large files + implicit val options: BulkOptions = BulkOptions(maxBulkSize = 10000, balance = 16) } ``` -### Bulk with Deduplication +**3. Handle Large Files Efficiently** ```scala -def bulkWithDeduplication[D]( - items: Source[D, NotUsed], - getId: D => String, - toDocument: D => String -)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = { - - val seen = scala.collection.mutable.Set[String]() - val dedupedItems = items.filter { item => - val id = getId(item) - if (seen.contains(id)) { - false - } else { - seen.add(id) - true - } - } - - client.bulkWithResult( - items = dedupedItems, - toDocument = toDocument, - idKey = Some("id") - ) -} -``` - ---- - -## Troubleshooting - -### Common Issues and Solutions - -**1. Out of Memory Errors** - -```scala -// Problem: Large batches causing OOM -implicit val problematic: BulkOptions = BulkOptions(maxBulkSize = 100000) - -// Solution: Reduce batch size -implicit val fixed: BulkOptions = BulkOptions(maxBulkSize = 1000) -``` - -**2. Slow Performance** - -```scala -// Problem: Sequential processing -implicit val slow: BulkOptions = BulkOptions(balance = 1) - -// Solution: Increase parallelism -implicit val fast: BulkOptions = BulkOptions( - balance = 8, - maxBulkSize = 5000, - disableRefresh = true +// ✅ Good - Disable refresh for large files +implicit val options: BulkOptions = BulkOptions( + disableRefresh = true, + maxBulkSize = 10000 ) + +val result = client.bulkFromFile("/data/huge-file.parquet", Parquet, Some("id")).futureValue +result.indices.foreach(client.refresh) // Manual refresh once ``` -**3. Too Many Retries** +**4. Monitor File Processing** ```scala -// Problem: Retrying non-retryable errors -implicit val wasteful: BulkOptions = BulkOptions( - retryOnFailure = true, - maxRetries = 10 +// ✅ Good - Use callbacks for monitoring +val callbacks = BulkCallbacks( + onBatchComplete = (size, metrics) => + println(s"Processed batch: $size docs at ${metrics.throughput} docs/sec"), + onComplete = result => + println(s"File processing: ${result.successCount} success, ${result.failedCount} failed") ) -// Solution: Identify and skip non-retryable errors -result.foreach { r => - r.failedDocuments.foreach { failed => - if (!failed.retryable) { - deadLetterQueue.add(failed) // Don't retry - } - } -} +client.bulkFromFile("/data/file.parquet", Parquet, Some("id"), callbacks = callbacks) ``` -**4. Index Refresh Issues** +**5. Use Delta Lake for Incremental Updates** ```scala -// Problem: Slow indexing due to frequent refresh -implicit val slow: BulkOptions = BulkOptions(disableRefresh = false) - -// Solution: Disable refresh during bulk, refresh once at end -implicit val fast: BulkOptions = BulkOptions(disableRefresh = true) - -client.bulkWithResult(items, toJson, Some("id")).foreach { result => - result.indices.foreach(client.refresh) // Manual refresh -} +// ✅ Good - Track Delta versions +val currentVersion = DeltaFileSource.getTableInfo("/data/delta").version +// Process only new data... ``` ---- - -## Comparison with Other Operations - -### Bulk vs Individual Operations - -| Aspect | Individual | Bulk | -|--------------------|------------------|-----------------------| -| **Performance** | Slow (1 req/doc) | Fast (1000s docs/req) | -| **Network** | High overhead | Minimal overhead | -| **Memory** | Low | Higher | -| **Error Handling** | Immediate | Batched | -| **Use Case** | Single documents | Large datasets | +**Example: Smart File Processing with Metadata** ```scala -// Individual indexing (slow) -products.foreach { product => - client.index("products", product.id, toJson(product)) +def smartBulkFromFile( + filePath: String, + format: FileFormat, + idKey: Option[String] +)(implicit system: ActorSystem, hadoopConf: Configuration): Future[BulkResult] = { + + // Auto-tune based on available metadata + implicit val bulkOptions: BulkOptions = format match { + case Parquet => + val meta = ParquetFileSource.getFileMetadata(filePath) + println(s"📊 Parquet file: ${meta.numRows} rows") + + if (meta.numRows > 10000000) { + // Very large file + BulkOptions( + defaultIndex = "data", + maxBulkSize = 10000, + balance = 16, + disableRefresh = true, + logEvery = 100 + ) + } else if (meta.numRows > 1000000) { + // Large file + BulkOptions( + defaultIndex = "data", + maxBulkSize = 5000, + balance = 8, + disableRefresh = true + ) + } else { + // Small file + BulkOptions( + defaultIndex = "data", + maxBulkSize = 1000, + balance = 4 + ) + } + + case Delta => + val info = DeltaFileSource.getTableInfo(filePath) + println(s"📊 Delta table: version ${info.version}, ${info.numFiles} files, ${info.sizeInBytes / 1024 / 1024}MB") + + if (info.sizeInBytes > 1024 * 1024 * 1024) { + // >1GB + BulkOptions( + defaultIndex = "data", + maxBulkSize = 10000, + balance = 16, + disableRefresh = true + ) + } else { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 5000, + balance = 8 + ) + } + + case JsonArray => + val meta = JsonArrayFileSource.getMetadata(filePath) + println(s"📊 JSON array: ${meta.elementCount} elements, nested=${meta.hasNestedArrays}") + + if (meta.elementCount > 100000) { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 5000, + balance = 8, + disableRefresh = true + ) + } else { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 1000, + balance = 4 + ) + } + + case _ => + val path = new Path(filePath) + val fs = FileSystem.get(conf) + val status = fs.getFileStatus(path) + val sizeMB = status.getLen() / 1024 / 1024 + println(s"📊 $format file: ${sizeMB}MB") + + if (sizeMB > 100) { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 10000, + balance = 16, + disableRefresh = true + ) + } else if (sizeMB > 10) { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 5000, + balance = 8 + ) + } else { + BulkOptions( + defaultIndex = "data", + maxBulkSize = 1000, + balance = 4 + ) + } + } + + client.bulkFromFile(filePath, format, idKey) } -// Bulk indexing (fast) -client.bulkWithResult( - items = products, - toDocument = toJson, - idKey = Some("id") -) +// Usage +smartBulkFromFile("/data/products.parquet", Parquet, Some("id")) + .foreach { result => + println(s"✅ Indexed ${result.successCount} documents at ${result.metrics.throughput} docs/sec") + } ``` --- @@ -1532,37 +1250,55 @@ client.bulkWithResult( ### Key Takeaways -1. **Use bulk operations for large datasets** (> 100 documents) -2. **Tune batch size** based on document size and memory -3. **Disable refresh** during bulk, refresh once at end -4. **Enable retry** for production reliability -5. **Monitor metrics** for performance optimization -6. **Handle failures** appropriately (retry vs dead letter queue) -7. **Use callbacks** for real-time monitoring -8. **Adjust parallelism** based on cluster size +1. **File-based bulk operations** support JSON, JSON Array, Parquet, and Delta Lake +2. **Parquet and Delta** offer best performance for large datasets +3. **JSON Array** handles complex nested structures correctly +4. **Automatic validation** ensures file integrity before processing +5. **Same configuration** applies to both in-memory and file-based operations +6. **Streaming architecture** enables processing of files larger than memory +7. **Delta Lake** provides versioning and ACID compliance ### Quick Reference ```scala -// High-performance bulk indexing +// High-performance file indexing implicit val options: BulkOptions = BulkOptions( defaultIndex = "products", - maxBulkSize = 5000, - balance = 8, - disableRefresh = true, - retryOnFailure = true, - maxRetries = 3 + maxBulkSize = 10000, + balance = 16, + disableRefresh = true ) -client.bulkWithResult( - items = products, - toDocument = toJson, - idKey = Some("id"), - callbacks = BulkCallbacks.logging(logger) +implicit val hadoopConf: Configuration = new Configuration() + +// Load from Parquet +client.bulkFromFile( + filePath = "/data/products.parquet", + format = Parquet, + idKey = Some("id") ).foreach { result => result.indices.foreach(client.refresh) println(s"Indexed ${result.successCount} docs at ${result.metrics.throughput} docs/sec") } + +// Load from Delta Lake +client.bulkFromFile( + filePath = "/data/delta-products", + format = Delta, + idKey = Some("id"), + update = Some(true) +).foreach { result => + println(s"Updated ${result.successCount} products from Delta Lake") +} + +// Load JSON Array with nested objects +client.bulkFromFile( + filePath = "/data/persons.json", + format = JsonArray, + idKey = Some("uuid") +).foreach { result => + println(s"Indexed ${result.successCount} persons with nested structures") +} ``` --- diff --git a/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestBulkApiSpec.scala b/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestBulkApiSpec.scala new file mode 100644 index 00000000..87eaffc2 --- /dev/null +++ b/es6/jest/src/test/scala/app/softnetwork/elastic/client/JestBulkApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JestClientSpi +import app.softnetwork.elastic.scalatest.EmbeddedElasticTestKit + +class JestBulkApiSpec extends BulkApiSpec with EmbeddedElasticTestKit { + override lazy val client: BulkApi = new JestClientSpi().client(elasticConfig) +} diff --git a/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala new file mode 100644 index 00000000..e8db8fbc --- /dev/null +++ b/es6/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.EmbeddedElasticTestKit + +class RestHighLevelBulkApiSpec extends BulkApiSpec with EmbeddedElasticTestKit { + override lazy val client: BulkApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala new file mode 100644 index 00000000..9193e155 --- /dev/null +++ b/es7/rest/src/test/scala/app/softnetwork/elastic/client/RestHighLevelBulkApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.RestHighLevelClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class RestHighLevelBulkApiSpec extends BulkApiSpec with ElasticDockerTestKit { + override lazy val client: BulkApi = new RestHighLevelClientSpi().client(elasticConfig) +} diff --git a/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala new file mode 100644 index 00000000..7ccdcccc --- /dev/null +++ b/es8/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class JavaBulkApiSpec extends BulkApiSpec with ElasticDockerTestKit { + override lazy val client: BulkApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala new file mode 100644 index 00000000..7ccdcccc --- /dev/null +++ b/es9/java/src/test/scala/app/softnetwork/elastic/client/JavaBulkApiSpec.scala @@ -0,0 +1,8 @@ +package app.softnetwork.elastic.client + +import app.softnetwork.elastic.client.spi.JavaClientSpi +import app.softnetwork.elastic.scalatest.ElasticDockerTestKit + +class JavaBulkApiSpec extends BulkApiSpec with ElasticDockerTestKit { + override lazy val client: BulkApi = new JavaClientSpi().client(elasticConfig) +} diff --git a/project/SoftClient4es.scala b/project/SoftClient4es.scala index ac608802..e2c7b8a3 100644 --- a/project/SoftClient4es.scala +++ b/project/SoftClient4es.scala @@ -27,6 +27,13 @@ trait SoftClient4es { ) ) + lazy val excludeSlf4jAndLog4j: Seq[ExclusionRule] = Seq( + ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"), + ExclusionRule(organization = "org.slf4j", name = "slf4j-reload4j"), + ExclusionRule(organization = "log4j", name = "log4j"), + ExclusionRule(organization = "org.apache.logging.log4j") + ) + def jacksonDependencies(esVersion: String): Seq[ModuleID] = { val jackson2_19 = "2.19.0" val jackson2_13 = "2.13.3" diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/BulkApiSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/BulkApiSpec.scala new file mode 100644 index 00000000..7ba465d4 --- /dev/null +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/BulkApiSpec.scala @@ -0,0 +1,256 @@ +/* + * Copyright 2025 SOFTNETWORK + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.client + +import akka.actor.ActorSystem +import app.softnetwork.elastic.client.bulk.BulkOptions +import app.softnetwork.elastic.client.file._ +import app.softnetwork.elastic.scalatest.ElasticTestKit +import app.softnetwork.persistence.generateUUID +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} +import org.slf4j.{Logger, LoggerFactory} + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContextExecutor} + +trait BulkApiSpec extends AnyFlatSpecLike with Matchers with ScalaFutures { + self: ElasticTestKit => + + lazy val log: Logger = LoggerFactory getLogger getClass.getName + + implicit val system: ActorSystem = ActorSystem(generateUUID()) + + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + + implicit val patience: PatienceConfig = PatienceConfig(timeout = Span(10, Seconds)) + + def client: BulkApi + + override def beforeAll(): Unit = { + self.beforeAll() + createIndex("person") + } + + override def afterAll(): Unit = { + Await.result(system.terminate(), Duration(30, TimeUnit.SECONDS)) + self.afterAll() + } + + private val persons: List[String] = List( + """ { "uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0} """, + """ { "uuid": "A16", "name": "Barney Gumble2", "birthDate": "1969-05-09", "children": [{ "parentId": "A16", "name": "Steve Gumble", "birthDate": "1999-05-09"}, { "parentId": "A16", "name": "Josh Gumble", "birthDate": "2002-05-09"}], "childrenCount": 2 } """ + ) + + "BulkClient" should "handle Json file format" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val tempFile = java.io.File.createTempFile("persons", ".jsonl") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + persons.foreach(writer.println) + writer.close() + + val response = + client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid") + ) + .futureValue + + log.info(s"Bulk index response: $response") + response.failedCount shouldBe 0 + response.successCount shouldBe persons.size + response.indices should contain("person") + } + + it should "handle multiple elements JSON array" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val tempFile = java.io.File.createTempFile("array_persons", ".jsonl") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + writer.println("[") + writer.println(persons.mkString(",\n")) + writer.println("]") + writer.close() + + val response = + client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + log.info(s"Bulk index response: ${response}") + response.failedCount shouldBe 0 + response.successCount shouldBe persons.size + response.indices should contain("person") + } + + it should "handle empty JSON array" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val tempFile = createTempJsonArrayFile("empty_array", List.empty) + + val response = client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + response.successCount shouldBe 0 + response.failedCount shouldBe 0 + response.totalCount shouldBe 0 + } + + it should "handle single element JSON array" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val singlePerson = List( + """{"uuid": "A99", "name": "Single Person", "birthDate": "1990-01-01", "childrenCount": 0}""" + ) + + val tempFile = createTempJsonArrayFile("single_person", singlePerson) + + val response = client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + response.successCount shouldBe 1 + response.failedCount shouldBe 0 + } + + it should "handle JSON array with nested objects" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val personsWithChildren = List( + """{"uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "children": [{"name": "Steve", "birthDate": "1999-05-09"}, {"name": "Josh", "birthDate": "2002-05-09"}], "childrenCount": 2}""" + ) + + val tempFile = createTempJsonArrayFile("persons_with_children", personsWithChildren) + + val response = client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + response.successCount shouldBe 1 + response.failedCount shouldBe 0 + + /*// Verify the document was indexed with nested structure + Thread.sleep(1000) // Wait for indexing + + val searchResult = + client.search("person", """{"query": {"term": {"uuid": "A16"}}}""").futureValue + searchResult.hits.total.value shouldBe 1 + + val doc = searchResult.hits.hits.head.source + doc should include("children") + doc should include("Steve") + doc should include("Josh")*/ + } + + it should "handle malformed JSON array gracefully" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + val tempFile = java.io.File.createTempFile("malformed_array", ".json") + tempFile.deleteOnExit() + + val writer = new java.io.PrintWriter(tempFile) + try { + writer.println("[{invalid json}]") + } finally { + writer.close() + } + + val result = client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + result.successCount shouldBe 0 + result.failedCount shouldBe 0 + result.totalCount shouldBe 0 + } + + it should "handle large JSON array efficiently" in { + implicit val bulkOptions: BulkOptions = BulkOptions("person") + + // Generate 1000 persons + val largePersonsList = (1 to 1000).map { i => + s"""{"uuid": "P$i", "name": "Person $i", "birthDate": "1990-01-01", "childrenCount": 0}""" + }.toList + + val tempFile = createTempJsonArrayFile("large_array", largePersonsList) + + val startTime = System.currentTimeMillis() + + val response = client + .bulkFromFile( + tempFile.getAbsolutePath, + idKey = Some("uuid"), + format = JsonArray + ) + .futureValue + + val duration = System.currentTimeMillis() - startTime + + log.info(s"✅ Indexed ${response.successCount} documents in ${duration}ms") + + response.successCount shouldBe 1000 + response.failedCount shouldBe 0 + + // Performance check (adjust threshold as needed) + duration should be < 10000L // Should complete in less than 10 seconds + } + + def createTempJsonArrayFile( + prefix: String, + jsonStrings: List[String] + ): java.io.File = { + val tempFile = java.io.File.createTempFile(prefix, ".json") + tempFile.deleteOnExit() + val writer = new java.io.PrintWriter(tempFile) + writer.println("[") + writer.println(jsonStrings.mkString(",\n")) + writer.println("]") + writer.close() + tempFile + } +} diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala index c83d38d9..3e8d7ce1 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/ElasticClientSpec.scala @@ -132,16 +132,16 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M "Toggle refresh" should "work" in { pClient.toggleRefresh("person", enable = false).get shouldBe true var settings = pClient.loadSettings("person").get - new JsonParser() - .parse(settings) + JsonParser + .parseString(settings) .getAsJsonObject .get("refresh_interval") .getAsString shouldBe "-1" pClient.toggleRefresh("person", enable = true).get shouldBe true settings = pClient.loadSettings("person").get - new JsonParser() - .parse(settings) + JsonParser + .parseString(settings) .getAsJsonObject .get("refresh_interval") .getAsString shouldBe "1s" @@ -159,15 +159,15 @@ trait ElasticClientSpec extends AnyFlatSpecLike with ElasticDockerTestKit with M "Updating number of replicas" should "work" in { pClient.setReplicas("person", 3) - new JsonParser() - .parse(pClient.loadSettings("person").get) + JsonParser + .parseString(pClient.loadSettings("person").get) .getAsJsonObject .get("number_of_replicas") .getAsString shouldBe "3" pClient.setReplicas("person", 0) - new JsonParser() - .parse(pClient.loadSettings("person").get) + JsonParser + .parseString(pClient.loadSettings("person").get) .getAsJsonObject .get("number_of_replicas") .getAsString shouldBe "0"