Skip to content

Commit 2d707a4

Browse files
committed
fix json array source file + init file source specifications
1 parent 1de3c70 commit 2d707a4

File tree

2 files changed

+197
-45
lines changed

2 files changed

+197
-45
lines changed

core/src/main/scala/app/softnetwork/elastic/client/file/package.scala

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package app.softnetwork.elastic.client
33
import akka.NotUsed
44
import akka.stream.scaladsl.Source
55
import com.fasterxml.jackson.annotation.JsonInclude
6-
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
6+
import com.fasterxml.jackson.core.{JsonFactory, JsonParser, JsonToken}
77
import com.fasterxml.jackson.databind.node.ObjectNode
88
import com.fasterxml.jackson.databind.{
99
DeserializationFeature,
@@ -15,13 +15,14 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
1515
import com.fasterxml.jackson.module.scala.DefaultScalaModule
1616
import org.apache.avro.generic.GenericRecord
1717
import org.apache.hadoop.conf.Configuration
18-
import org.apache.hadoop.fs.{FileSystem, Path}
18+
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
1919
import org.apache.parquet.avro.AvroParquetReader
2020
import org.apache.parquet.hadoop.ParquetReader
2121
import org.apache.parquet.hadoop.util.HadoopInputFile
22-
import io.delta.standalone.{DeltaLog, Snapshot}
22+
import io.delta.standalone.DeltaLog
2323
import io.delta.standalone.data.{CloseableIterator, RowRecord}
2424
import io.delta.standalone.types._
25+
import org.apache.parquet.io.SeekableInputStream
2526
import org.slf4j.{Logger, LoggerFactory}
2627

2728
import java.io.{BufferedReader, InputStream, InputStreamReader}
@@ -391,8 +392,8 @@ package object file {
391392

392393
override def format: FileFormat = JsonArray
393394

394-
/** Reads a JSON Array file and streams each element
395-
*/
395+
private val jsonFactory = new JsonFactory()
396+
396397
override def fromFile(
397398
filePath: String,
398399
bufferSize: Int = 500
@@ -401,29 +402,34 @@ package object file {
401402
conf: Configuration = hadoopConfiguration
402403
): Source[String, NotUsed] = {
403404

404-
// Validate file before processing
405405
validateFile(filePath)
406406

407407
var elementCount = 0L
408408
val startTime = System.currentTimeMillis()
409409

410410
Source
411-
.unfoldResource[String, (BufferedReader, JsonParser)](
412-
// Create: Open the file and JSON parser
411+
.unfoldResource[String, (InputStream, JsonParser)](
412+
// Create: Open file via Hadoop and create JSON parser
413413
create = () => {
414-
logger.info(s"📂 Opening JSON Array file: $filePath")
414+
logger.info(s"📂 Opening JSON Array file via Hadoop: $filePath")
415415
Try {
416-
val is: InputStream = HadoopInputFile.fromPath(new Path(filePath), conf).newStream()
417-
val reader = new BufferedReader(new InputStreamReader(is, "UTF-8"))
418-
val factory = mapper.getFactory
419-
val parser = factory.createParser(reader)
420-
421-
// Vérifier que c'est bien un array
422-
if (parser.nextToken() != JsonToken.START_ARRAY) {
423-
throw new IllegalArgumentException(s"File is not a JSON array: $filePath")
416+
val is: SeekableInputStream =
417+
HadoopInputFile.fromPath(new Path(filePath), conf).newStream()
418+
419+
// Create Jackson parser on top of Hadoop SeekableInputStream
420+
val parser = jsonFactory.createParser(is)
421+
422+
// Expect array start
423+
val token = parser.nextToken()
424+
if (token != JsonToken.START_ARRAY) {
425+
is.close()
426+
throw new IllegalArgumentException(
427+
s"Expected JSON array, but found: ${token}. File: $filePath"
428+
)
424429
}
425430

426-
(reader, parser)
431+
logger.info(s"📊 Started parsing JSON Array via Hadoop FS")
432+
(is, parser)
427433
} match {
428434
case Success(result) => result
429435
case Failure(ex) =>
@@ -432,48 +438,63 @@ package object file {
432438
}
433439
},
434440

435-
// Read: Read the next JSON object from the array
441+
// Read: Parse next element from array
436442
read = { case (_, parser) =>
437443
blocking {
438444
Try {
439-
if (parser.nextToken() == JsonToken.START_OBJECT) {
445+
val token = parser.nextToken()
446+
447+
if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) {
440448
elementCount += 1
449+
441450
if (elementCount % 10000 == 0) {
442451
val elapsed = (System.currentTimeMillis() - startTime) / 1000.0
443452
val throughput = elementCount / elapsed
444-
logger.info(
445-
f"📊 Read $elementCount elements from JSON Array ($throughput%.2f elements/sec)"
446-
)
453+
logger.info(f"📊 Parsed $elementCount elements ($throughput%.2f elements/sec)")
447454
}
448455

449-
// Read the full object
450-
val node = mapper.readTree(parser)
456+
// Parse current element as JsonNode
457+
val node: JsonNode = mapper.readTree(parser)
451458
Some(mapper.writeValueAsString(node))
452-
} else {
459+
460+
} else if (token == JsonToken.END_ARRAY || token == null) {
453461
val elapsed = (System.currentTimeMillis() - startTime) / 1000.0
454462
logger.info(
455463
s"✅ Finished reading JSON Array: $elementCount elements in ${elapsed}s"
456464
)
457465
None
466+
467+
} else {
468+
// Skip unexpected tokens
469+
logger.warn(s"⚠️ Unexpected token in JSON Array: $token")
470+
None
458471
}
459472
} match {
460473
case Success(result) => result
461474
case Failure(ex) =>
462-
logger.error(s"❌ Error reading JSON Array element at position $elementCount", ex)
475+
logger.error(s"❌ Error parsing JSON Array element at position $elementCount", ex)
463476
None
464477
}
465478
}
466479
},
467480

468-
// Close: Close parser and reader
469-
close = { case (reader, parser) =>
481+
// Close: Close parser and Hadoop input stream
482+
close = { case (inputStream, parser) =>
470483
Try {
471-
parser.close()
472-
reader.close()
484+
parser.close() // This also closes the underlying stream
473485
} match {
474-
case Success(_) => logger.debug(s"🔒 Closed JSON Array reader for: $filePath")
486+
case Success(_) =>
487+
logger.debug(s"🔒 Closed JSON Array parser for: $filePath")
488+
case Failure(ex) =>
489+
logger.warn(s"⚠️ Failed to close JSON Array parser: ${ex.getMessage}")
490+
}
491+
492+
// Ensure Hadoop stream is closed
493+
Try(inputStream.close()) match {
494+
case Success(_) =>
495+
logger.debug(s"🔒 Closed Hadoop input stream for: $filePath")
475496
case Failure(ex) =>
476-
logger.warn(s"⚠️ Failed to close JSON Array reader: ${ex.getMessage}")
497+
logger.warn(s"⚠️ Failed to close Hadoop input stream: ${ex.getMessage}")
477498
}
478499
}
479500
)
@@ -504,7 +525,6 @@ package object file {
504525
throw new IllegalArgumentException(s"File is not a JSON array: $filePath")
505526
}
506527

507-
import scala.jdk.CollectionConverters._
508528
arrayNode.elements().asScala.map(node => mapper.writeValueAsString(node)).toList
509529
} finally {
510530
is.close()
@@ -849,18 +869,17 @@ package object file {
849869
val startTime = System.currentTimeMillis()
850870

851871
Source
852-
.unfoldResource[String, (DeltaLog, CloseableIterator[RowRecord], Snapshot)](
872+
.unfoldResource[String, CloseableIterator[RowRecord]](
853873
create = () => {
854874
logger.info(s"📂 Opening Delta Lake table at version $version: $filePath")
855875
val deltaLog = DeltaLog.forTable(conf, filePath)
856876
val snapshot = deltaLog.getSnapshotForVersionAsOf(version)
857877

858878
logger.info(s"📊 Delta table version $version, files: ${snapshot.getAllFiles.size()}")
859879

860-
val iterator = snapshot.open()
861-
(deltaLog, iterator, snapshot)
880+
snapshot.open()
862881
},
863-
read = { case (deltaLog, iterator, snapshot) =>
882+
read = iterator =>
864883
blocking {
865884
Try {
866885
if (iterator.hasNext) {
@@ -886,11 +905,8 @@ package object file {
886905
logger.error(s"❌ Error reading Delta row at position $rowCount", ex)
887906
None
888907
}
889-
}
890-
},
891-
close = { case (deltaLog, iterator, snapshot) =>
892-
Try(iterator.close())
893-
}
908+
},
909+
close = iterator => Try(iterator.close())
894910
)
895911
.buffer(bufferSize, akka.stream.OverflowStrategy.backpressure)
896912
}
@@ -961,7 +977,7 @@ package object file {
961977
} else if (lowerPath.endsWith(".jsonl") || lowerPath.endsWith(".ndjson")) {
962978
Json
963979
} else if (lowerPath.endsWith(".json")) {
964-
// Distinguer JSON Lines vs JSON Array en lisant le premier caractère
980+
// Distinguishing JSON Lines vs JSON Array by reading the first character
965981
detectJsonType(filePath)
966982
} else if (isDeltaTable(filePath)) {
967983
Delta
@@ -978,7 +994,7 @@ package object file {
978994
Try {
979995
val fs = FileSystem.get(conf)
980996
val deltaLogPath = new Path(filePath, "_delta_log")
981-
fs.exists(deltaLogPath) && fs.isDirectory(deltaLogPath)
997+
fs.exists(deltaLogPath) && fs.getFileStatus(deltaLogPath).isDirectory
982998
}.getOrElse(false)
983999
}
9841000

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package app.softnetwork.elastic.client.file
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.scaladsl.Sink
5+
import org.scalatest.BeforeAndAfterAll
6+
import org.scalatest.concurrent.ScalaFutures
7+
import org.scalatest.matchers.should.Matchers
8+
import org.scalatest.wordspec.AnyWordSpec
9+
import org.scalatest.time.{Seconds, Span}
10+
11+
import scala.concurrent.ExecutionContext
12+
13+
class FileSourceSpec extends AnyWordSpec with Matchers with ScalaFutures with BeforeAndAfterAll {
14+
15+
implicit val system: ActorSystem = ActorSystem("FileSourceTest")
16+
implicit val ec: ExecutionContext = system.dispatcher
17+
implicit val patience: PatienceConfig = PatienceConfig(timeout = Span(10, Seconds))
18+
19+
"FileFormatDetector" should {
20+
"detect Parquet files" in {
21+
FileFormatDetector.detect("data/file.parquet") shouldBe Parquet
22+
FileFormatDetector.detect("data/file.parq") shouldBe Parquet
23+
}
24+
25+
"detect JSON files" in {
26+
FileFormatDetector.detect("data/file.jsonl") shouldBe Json
27+
FileFormatDetector.detect("data/file.ndjson") shouldBe Json
28+
}
29+
30+
"detect unknown files" in {
31+
FileFormatDetector.detect("data/file.txt") shouldBe Unknown
32+
}
33+
34+
"throw on unknown files with detectOrThrow" in {
35+
assertThrows[IllegalArgumentException] {
36+
FileFormatDetector.detectOrThrow("data/file.txt")
37+
}
38+
}
39+
}
40+
41+
"JsonFileSource" should {
42+
"read JSON lines file" in {
43+
// Create a temporary file for testing
44+
val tempFile = java.io.File.createTempFile("test", ".jsonl")
45+
tempFile.deleteOnExit()
46+
47+
val writer = new java.io.PrintWriter(tempFile)
48+
writer.println("""{"id":1,"name":"Alice"}""")
49+
writer.println("""{"id":2,"name":"Bob"}""")
50+
writer.close()
51+
52+
val result = JsonFileSource
53+
.fromFile(tempFile.getAbsolutePath)
54+
.runWith(Sink.seq)
55+
.futureValue
56+
57+
result should have size 2
58+
result.head should include("Alice")
59+
result.last should include("Bob")
60+
}
61+
}
62+
63+
"JsonFileFactory" should {
64+
"read JSON lines file" in {
65+
// Create a temporary file for testing
66+
val tempFile = java.io.File.createTempFile("test", ".jsonl")
67+
tempFile.deleteOnExit()
68+
69+
val writer = new java.io.PrintWriter(tempFile)
70+
writer.println("""{"id":1,"name":"Alice","nested":{"age":30}}""")
71+
writer.println("""{"id":2,"name":"Bob","nested":{"age":25}}""")
72+
writer.close()
73+
74+
val result = FileSourceFactory
75+
.fromFile(tempFile.getAbsolutePath)
76+
.runWith(Sink.seq)
77+
.futureValue
78+
79+
result should have size 2
80+
result.head should include("Alice")
81+
result.last should include("Bob")
82+
}
83+
84+
"read JSON Array file (single line)" in {
85+
val tempFile = java.io.File.createTempFile("test", ".json")
86+
tempFile.deleteOnExit()
87+
88+
val writer = new java.io.PrintWriter(tempFile)
89+
writer.println(
90+
"""[{"id":1,"name":"Alice","nested": {"age":30}},{"id":2,"name":"Bob","nested":{"age":25}}]"""
91+
)
92+
writer.close()
93+
94+
val result = FileSourceFactory
95+
.fromFile(tempFile.getAbsolutePath, format = JsonArray)
96+
.runWith(Sink.seq)
97+
.futureValue
98+
99+
result should have size 2
100+
result.head should include("Alice")
101+
result.last should include("Bob")
102+
}
103+
104+
"read JSON Array file (multi-line)" in {
105+
val tempFile = java.io.File.createTempFile("test", ".json")
106+
tempFile.deleteOnExit()
107+
108+
val writer = new java.io.PrintWriter(tempFile)
109+
writer.println("[")
110+
writer.println(""" {
111+
"id":1,
112+
"name":"Alice",
113+
"nested":{
114+
"age":30
115+
}
116+
},""")
117+
writer.println(""" {"id":2,"name":"Bob","nested":{"age":25}}""")
118+
writer.println("]")
119+
writer.close()
120+
121+
val result = FileSourceFactory
122+
.fromFile(tempFile.getAbsolutePath)
123+
.runWith(Sink.seq)
124+
.futureValue
125+
126+
result should have size 2
127+
result.head should include("Alice")
128+
result.last should include("Bob")
129+
}
130+
}
131+
132+
override def afterAll(): Unit = {
133+
system.terminate()
134+
super.afterAll()
135+
}
136+
}

0 commit comments

Comments
 (0)