@@ -15,7 +15,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
1515import com .fasterxml .jackson .module .scala .DefaultScalaModule
1616import org .apache .avro .generic .GenericRecord
1717import org .apache .hadoop .conf .Configuration
18- import org .apache .hadoop .fs .{FSDataInputStream , FileSystem , Path }
18+ import org .apache .hadoop .fs .{FileSystem , Path }
1919import org .apache .parquet .avro .AvroParquetReader
2020import org .apache .parquet .hadoop .ParquetReader
2121import org .apache .parquet .hadoop .util .HadoopInputFile
@@ -176,25 +176,42 @@ package object file {
176176 }
177177
178178 /** Validates that the file exists and is readable
179+ *
180+ * @param filePath
181+ * Path to validate
182+ * @param checkIsFile
183+ * If true, validates it's a file; if false, validates it's a directory
184+ * @param conf
185+ * Hadoop configuration
179186 */
180- protected def validateFile (filePath : String )(implicit conf : Configuration ): Unit = {
187+ protected def validateFile (
188+ filePath : String ,
189+ checkIsFile : Boolean = true
190+ )(implicit conf : Configuration ): Unit = {
181191 val path = new Path (filePath)
182192 val fs = FileSystem .get(conf)
183193
184194 if (! fs.exists(path)) {
185195 throw new IllegalArgumentException (s " File does not exist: $filePath" )
186196 }
187197
188- if (! fs.isFile(path)) {
198+ val status = fs.getFileStatus(path)
199+
200+ if (checkIsFile && ! status.isFile) {
189201 throw new IllegalArgumentException (s " Path is not a file: $filePath" )
190202 }
191203
192- val status = fs.getFileStatus(path)
193- if (status.getLen == 0 ) {
204+ if (! checkIsFile && ! status.isDirectory) {
205+ throw new IllegalArgumentException (s " Path is not a directory: $filePath" )
206+ }
207+
208+ if (checkIsFile && status.getLen == 0 ) {
194209 logger.warn(s " ⚠️ File is empty: $filePath" )
195210 }
196211
197- logger.info(s " 📁 Loading file: $filePath ( ${status.getLen} bytes) " )
212+ val pathType = if (checkIsFile) " file" else " directory"
213+ val sizeInfo = if (checkIsFile) s " ( ${status.getLen} bytes) " else " "
214+ logger.info(s " 📁 Loading $pathType: $filePath $sizeInfo" )
198215 }
199216 }
200217
@@ -227,7 +244,12 @@ package object file {
227244 conf : Configuration = hadoopConfiguration
228245 ): Source [String , NotUsed ] = {
229246 // Validate file before processing
230- validateFile(filePath)
247+ Try (validateFile(filePath)) match {
248+ case Success (_) => // OK
249+ case Failure (ex) =>
250+ logger.error(s " ❌ Validation failed for Parquet file: $filePath" , ex)
251+ return Source .failed(ex)
252+ }
231253
232254 var recordCount = 0L
233255 val startTime = System .currentTimeMillis()
@@ -308,7 +330,12 @@ package object file {
308330 ): Source [String , NotUsed ] = {
309331
310332 // Validate file before processing
311- validateFile(filePath)
333+ Try (validateFile(filePath)) match {
334+ case Success (_) => // OK
335+ case Failure (ex) =>
336+ logger.error(s " ❌ Validation failed for JSON file: $filePath" , ex)
337+ return Source .failed(ex)
338+ }
312339
313340 var lineCount = 0L
314341 val startTime = System .currentTimeMillis()
@@ -402,7 +429,12 @@ package object file {
402429 conf : Configuration = hadoopConfiguration
403430 ): Source [String , NotUsed ] = {
404431
405- validateFile(filePath)
432+ Try (validateFile(filePath)) match {
433+ case Success (_) => // OK
434+ case Failure (ex) =>
435+ logger.error(s " ❌ Validation failed for JSON Array file: $filePath" , ex)
436+ return Source .failed(ex)
437+ }
406438
407439 var elementCount = 0L
408440 val startTime = System .currentTimeMillis()
@@ -550,7 +582,12 @@ package object file {
550582 conf : Configuration = hadoopConfiguration
551583 ): Source [String , NotUsed ] = {
552584
553- validateFile(filePath)
585+ Try (validateFile(filePath, checkIsFile = false )) match {
586+ case Success (_) => // OK
587+ case Failure (ex) =>
588+ logger.error(s " ❌ Validation failed for Delta Lake table: $filePath" , ex)
589+ return Source .failed(ex)
590+ }
554591
555592 var rowCount = 0L
556593 val startTime = System .currentTimeMillis()
@@ -863,7 +900,12 @@ package object file {
863900 conf : Configuration = hadoopConfiguration
864901 ): Source [String , NotUsed ] = {
865902
866- validateFile(filePath)
903+ Try (validateFile(filePath, checkIsFile = false )) match {
904+ case Success (_) => // OK
905+ case Failure (ex) =>
906+ logger.error(s " ❌ Validation failed for Delta Lake table: $filePath" , ex)
907+ return Source .failed(ex)
908+ }
867909
868910 var rowCount = 0L
869911 val startTime = System .currentTimeMillis()
@@ -922,7 +964,12 @@ package object file {
922964 conf : Configuration = hadoopConfiguration
923965 ): Source [String , NotUsed ] = {
924966
925- validateFile(filePath)
967+ Try (validateFile(filePath, checkIsFile = false )) match {
968+ case Success (_) => // OK
969+ case Failure (ex) =>
970+ logger.error(s " ❌ Validation failed for Delta Lake table: $filePath" , ex)
971+ return Source .failed(ex)
972+ }
926973
927974 logger.info(s " 📂 Opening Delta Lake table at timestamp $timestampMillis: $filePath" )
928975
@@ -970,7 +1017,6 @@ package object file {
9701017
9711018 def detect (filePath : String )(implicit conf : Configuration = hadoopConfiguration): FileFormat = {
9721019 val lowerPath = filePath.toLowerCase
973- val path = new Path (filePath)
9741020
9751021 if (lowerPath.endsWith(" .parquet" ) || lowerPath.endsWith(" .parq" )) {
9761022 Parquet
0 commit comments