@@ -12,8 +12,11 @@ use crate::actions::get_log_add_schema;
1212use crate :: engine_data:: { GetData , RowVisitor , TypedGetData as _} ;
1313use crate :: expressions:: { column_name, ColumnName , Expression , ExpressionRef , PredicateRef } ;
1414use crate :: kernel_predicates:: { DefaultKernelPredicateEvaluator , KernelPredicateEvaluator as _} ;
15- use crate :: log_replay:: deduplicator:: Deduplicator ;
16- use crate :: log_replay:: { ActionsBatch , FileActionDeduplicator , FileActionKey , LogReplayProcessor } ;
15+ use crate :: log_replay:: deduplicator:: { self , CheckpointDeduplicator , Deduplicator } ;
16+ use crate :: log_replay:: {
17+ ActionsBatch , FileActionDeduplicator , FileActionKey , LogReplayProcessor ,
18+ ParallelizableLogReplayProcessor ,
19+ } ;
1720use crate :: scan:: Scalar ;
1821use crate :: schema:: ToSchema as _;
1922use crate :: schema:: { ColumnNamesAndTypes , DataType , MapType , StructField , StructType } ;
@@ -480,6 +483,37 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
480483 EXPR . clone ( )
481484}
482485
486+ impl ParallelizableLogReplayProcessor for ScanLogReplayProcessor {
487+ fn process_actions_batch ( & self , actions_batch : ActionsBatch ) -> DeltaResult < Self :: Output > {
488+ let ActionsBatch {
489+ actions,
490+ is_log_batch : _,
491+ } = actions_batch;
492+ // Build an initial selection vector for the batch which has had the data skipping filter
493+ // applied. The selection vector is further updated by the deduplication visitor to remove
494+ // rows that are not valid adds.
495+ let selection_vector = self . build_selection_vector ( actions. as_ref ( ) ) ?;
496+ assert_eq ! ( selection_vector. len( ) , actions. len( ) ) ;
497+
498+ let deduplicator = CheckpointDeduplicator :: try_new ( & self . seen_file_keys , ADD_PATH_INDEX ) ?;
499+ let mut visitor = AddRemoveDedupVisitor :: new (
500+ deduplicator,
501+ selection_vector,
502+ self . state_info . clone ( ) ,
503+ self . partition_filter . clone ( ) ,
504+ ) ;
505+ visitor. visit_rows_of ( actions. as_ref ( ) ) ?;
506+
507+ // TODO: Teach expression eval to respect the selection vector we just computed so carefully!
508+ let result = self . add_transform . evaluate ( actions. as_ref ( ) ) ?;
509+ ScanMetadata :: try_new (
510+ result,
511+ visitor. selection_vector ,
512+ visitor. row_transform_exprs ,
513+ )
514+ }
515+ }
516+
483517impl LogReplayProcessor for ScanLogReplayProcessor {
484518 type Output = ScanMetadata ;
485519
0 commit comments