33use std:: sync:: Arc ;
44use std:: sync:: Mutex ;
55
6- use delta_kernel:: arrow:: array:: { Array , ArrayData , RecordBatch , StructArray } ;
6+ use delta_kernel:: arrow:: array:: { Array , ArrayData , StructArray } ;
77use delta_kernel:: arrow:: ffi:: to_ffi;
8- use delta_kernel:: engine:: arrow_data:: ArrowEngineData ;
8+ use delta_kernel:: engine:: arrow_data:: EngineDataArrowExt ;
99use delta_kernel:: table_changes:: scan:: TableChangesScan ;
1010use delta_kernel:: table_changes:: TableChanges ;
1111use delta_kernel:: EngineData ;
@@ -319,11 +319,7 @@ fn scan_table_changes_next_impl(data: &ScanTableChangesIterator) -> DeltaResult<
319319 return Ok ( ArrowFFIData :: empty ( ) ) ;
320320 } ;
321321
322- let record_batch: RecordBatch = data
323- . into_any ( )
324- . downcast :: < ArrowEngineData > ( )
325- . map_err ( |_| delta_kernel:: Error :: EngineDataType ( "ArrowEngineData" . to_string ( ) ) ) ?
326- . into ( ) ;
322+ let record_batch = data. try_into_record_batch ( ) ?;
327323
328324 let batch_struct_array: StructArray = record_batch. into ( ) ;
329325 let array_data: ArrayData = batch_struct_array. into_data ( ) ;
@@ -346,6 +342,7 @@ mod tests {
346342 use delta_kernel:: arrow:: record_batch:: RecordBatch ;
347343 use delta_kernel:: arrow:: util:: pretty:: pretty_format_batches;
348344 use delta_kernel:: engine:: arrow_conversion:: TryIntoArrow as _;
345+ use delta_kernel:: engine:: arrow_data:: ArrowEngineData ;
349346 use delta_kernel:: engine:: default:: DefaultEngine ;
350347 use delta_kernel:: schema:: { DataType , StructField , StructType } ;
351348 use delta_kernel:: Engine ;
@@ -355,7 +352,7 @@ mod tests {
355352 use std:: sync:: Arc ;
356353 use test_utils:: {
357354 actions_to_string_with_metadata, add_commit, generate_batch, record_batch_to_bytes,
358- to_arrow , IntoArray as _, TestAction ,
355+ IntoArray as _, TestAction ,
359356 } ;
360357
361358 const PARQUET_FILE1 : & str =
@@ -480,7 +477,7 @@ mod tests {
480477 ) -> DeltaResult < Vec < RecordBatch > > {
481478 let scan_results = scan. execute ( engine) ?;
482479 scan_results
483- . map ( |data| -> DeltaResult < _ > { to_arrow ( data? ) } )
480+ . map ( EngineDataArrowExt :: try_into_record_batch )
484481 . try_collect ( )
485482 }
486483
@@ -699,7 +696,7 @@ mod tests {
699696 }
700697 let engine_data =
701698 ok_or_panic ( unsafe { get_engine_data ( data. array , & data. schema , allocate_err) } ) ;
702- let record_batch = unsafe { to_arrow ( engine_data. into_inner ( ) ) } ?;
699+ let record_batch = unsafe { engine_data. into_inner ( ) . try_into_record_batch ( ) } ?;
703700
704701 println ! ( "Batch ({i}) num rows {:?}" , record_batch. num_rows( ) ) ;
705702 batches. push ( record_batch) ;
0 commit comments