Skip to content

Commit b6a76ff

Browse files
committed
add file source metadata methods + add headers
1 parent cc7805e commit b6a76ff

File tree

3 files changed

+180
-2
lines changed

3 files changed

+180
-2
lines changed

core/src/main/scala/app/softnetwork/elastic/client/file/package.scala

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2025 SOFTNETWORK
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package app.softnetwork.elastic.client
218

319
import akka.NotUsed
@@ -17,7 +33,7 @@ import org.apache.avro.generic.GenericRecord
1733
import org.apache.hadoop.conf.Configuration
1834
import org.apache.hadoop.fs.{FileSystem, Path}
1935
import org.apache.parquet.avro.AvroParquetReader
20-
import org.apache.parquet.hadoop.ParquetReader
36+
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader}
2137
import org.apache.parquet.hadoop.util.HadoopInputFile
2238
import io.delta.standalone.DeltaLog
2339
import io.delta.standalone.data.{CloseableIterator, RowRecord}
@@ -311,6 +327,36 @@ package object file {
311327
)
312328
.buffer(bufferSize, akka.stream.OverflowStrategy.backpressure)
313329
}
330+
331+
case class ParquetMetadata(
332+
numRowGroups: Int,
333+
numRows: Long,
334+
schema: String
335+
)
336+
337+
def getFileMetadata(filePath: String)(implicit conf: Configuration): ParquetMetadata = {
338+
Try(validateFile(filePath, checkIsFile = true)) match {
339+
case Success(_) => // OK
340+
case Failure(ex) =>
341+
logger.error(s"❌ Validation failed for Parquet file: $filePath", ex)
342+
throw ex
343+
}
344+
345+
val path = new Path(filePath)
346+
val inputFile = HadoopInputFile.fromPath(path, conf)
347+
val reader = ParquetFileReader.open(inputFile)
348+
349+
try {
350+
val metadata = reader.getFooter.getFileMetaData
351+
ParquetMetadata(
352+
numRowGroups = reader.getRowGroups.size(),
353+
numRows = reader.getRecordCount,
354+
schema = metadata.getSchema.toString
355+
)
356+
} finally {
357+
reader.close()
358+
}
359+
}
314360
}
315361

316362
/** Source for JSON files (NDJSON or JSON Lines) */
@@ -543,7 +589,12 @@ package object file {
543589
conf: Configuration = hadoopConfiguration
544590
): Source[String, NotUsed] = {
545591

546-
validateFile(filePath)
592+
Try(validateFile(filePath)) match {
593+
case Success(_) => // OK
594+
case Failure(ex) =>
595+
logger.error(s"❌ Validation failed for JSON Array file: $filePath", ex)
596+
return Source.failed(ex)
597+
}
547598

548599
logger.info(s"📂 Loading JSON Array file in memory: $filePath")
549600

@@ -566,6 +617,75 @@ package object file {
566617
.mapConcat(identity)
567618
.buffer(bufferSize, akka.stream.OverflowStrategy.backpressure)
568619
}
620+
621+
/** Get metadata about the JSON array
622+
*/
623+
case class JsonArrayMetadata(
624+
elementCount: Int,
625+
hasNestedArrays: Boolean,
626+
hasNestedObjects: Boolean,
627+
maxDepth: Int
628+
)
629+
630+
def getMetadata(filePath: String)(implicit conf: Configuration): JsonArrayMetadata = {
631+
Try(validateFile(filePath, checkIsFile = true)) match {
632+
case Success(_) => // OK
633+
case Failure(ex) =>
634+
logger.error(s"❌ Validation failed for JSON Array file: $filePath", ex)
635+
throw ex
636+
}
637+
638+
val is: InputStream = HadoopInputFile.fromPath(new Path(filePath), conf).newStream()
639+
640+
try {
641+
val arrayNode = mapper.readTree(is)
642+
if (!arrayNode.isArray) {
643+
throw new IllegalArgumentException(s"File is not a JSON array: $filePath")
644+
}
645+
646+
val elements = arrayNode.elements().asScala.toList
647+
val hasNestedArrays = elements.exists(hasArrayField)
648+
val hasNestedObjects = elements.exists(hasObjectField)
649+
val maxDepth = elements.map(calculateDepth).maxOption.getOrElse(0)
650+
651+
JsonArrayMetadata(
652+
elementCount = elements.size,
653+
hasNestedArrays = hasNestedArrays,
654+
hasNestedObjects = hasNestedObjects,
655+
maxDepth = maxDepth
656+
)
657+
} finally {
658+
is.close()
659+
}
660+
}
661+
662+
private def hasArrayField(node: JsonNode): Boolean = {
663+
if (node.isArray) return true
664+
if (node.isObject) {
665+
node.fields().asScala.exists(entry => hasArrayField(entry.getValue))
666+
} else {
667+
false
668+
}
669+
}
670+
671+
private def hasObjectField(node: JsonNode): Boolean = {
672+
if (node.isObject) return true
673+
if (node.isArray) {
674+
node.elements().asScala.exists(hasObjectField)
675+
} else {
676+
false
677+
}
678+
}
679+
680+
private def calculateDepth(node: JsonNode): Int = {
681+
if (node.isArray) {
682+
1 + node.elements().asScala.map(calculateDepth).maxOption.getOrElse(0)
683+
} else if (node.isObject) {
684+
1 + node.fields().asScala.map(e => calculateDepth(e.getValue)).maxOption.getOrElse(0)
685+
} else {
686+
0
687+
}
688+
}
569689
}
570690

571691
/** Source for Delta Lake files using Delta Standalone

core/src/test/scala/app/softnetwork/elastic/client/file/FileSourceSpec.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,48 @@ class FileSourceSpec extends AnyWordSpec with Matchers with ScalaFutures with Be
359359
json should include("France")
360360
}
361361

362+
"get Parquet file metadata" in {
363+
val tempDir = Files.createTempDirectory("parquet-meta-test").toFile
364+
tempDir.deleteOnExit()
365+
val parquetFile = new File(tempDir, "meta.parquet")
366+
parquetFile.deleteOnExit()
367+
368+
val schemaString =
369+
"""message TestRecord {
370+
| required int32 id;
371+
| required binary name (UTF8);
372+
|}""".stripMargin
373+
374+
val schema: MessageType = MessageTypeParser.parseMessageType(schemaString)
375+
val groupFactory = new SimpleGroupFactory(schema)
376+
377+
val path = new Path(parquetFile.getAbsolutePath)
378+
val writer = org.apache.parquet.hadoop.example.ExampleParquetWriter
379+
.builder(path)
380+
.withType(schema)
381+
.withCompressionCodec(CompressionCodecName.SNAPPY)
382+
.withConf(conf)
383+
.build()
384+
385+
try {
386+
(1 to 100).foreach { i =>
387+
val record = groupFactory.newGroup()
388+
record.add("id", i)
389+
record.add("name", s"Person$i")
390+
writer.write(record)
391+
}
392+
} finally {
393+
writer.close()
394+
}
395+
396+
val metadata = ParquetFileSource.getFileMetadata(parquetFile.getAbsolutePath)
397+
398+
metadata.numRowGroups should be > 0
399+
metadata.numRows shouldBe 100
400+
metadata.schema should include("id")
401+
metadata.schema should include("name")
402+
}
403+
362404
"read Delta Lake table" in {
363405
val tempDir = Files.createTempDirectory("delta-test").toFile
364406
tempDir.deleteOnExit()

testkit/src/main/scala/app/softnetwork/elastic/client/BulkApiSpec.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2025 SOFTNETWORK
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package app.softnetwork.elastic.client
218

319
import akka.actor.ActorSystem

0 commit comments

Comments
 (0)