@@ -41,9 +41,8 @@ pub struct SerializableScanState {
4141 pub predicate : Option < PredicateRef > ,
4242 /// Opaque internal state blob containing schemas, transform spec, and column mapping mode
4343 pub internal_state_blob : Vec < u8 > ,
44- /// Iterator over file paths that have been removed from the table or already processed.
45- /// Note: Deletion vector info is intentionally dropped; only file paths are retained.
46- pub seen_file_keys : Box < dyn Iterator < Item = String > > ,
44+ /// Set of file action keys that have been removed from the table or already processed.
45+ pub seen_file_keys : HashSet < FileActionKey > ,
4746}
4847
4948/// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan.
@@ -135,16 +134,10 @@ impl ScanLogReplayProcessor {
135134 /// Consumes the processor and returns a `SerializableScanState` containing:
136135 /// - The predicate (if any) for data skipping
137136 /// - An opaque internal state blob (schemas, transform spec, column mapping mode)
138- /// - An iterator over seen file paths (DV info is intentionally dropped)
137+ /// - The set of seen file keys including their deletion vector information
139138 ///
140139 /// The returned state can be used with `from_serializable_state` to reconstruct the
141140 /// processor on remote compute nodes.
142- ///
143- /// # Note
144- /// Deletion vector information is intentionally dropped from `seen_file_keys` during
145- /// serialization because the checkpoint phase does not need to examine deletion vectors.
146- /// If a `(path, dv_info)` has been seen, we can safely skip any `(path, null)` or
147- /// `(path, dv_info)` entries in checkpoint files.
148141 #[ internal_api]
149142 #[ allow( unused) ]
150143 pub ( crate ) fn into_serializable_state ( self ) -> DeltaResult < SerializableScanState > {
@@ -165,14 +158,10 @@ impl ScanLogReplayProcessor {
165158 let internal_state_blob = serde_json:: to_vec ( & internal_state)
166159 . map_err ( |e| Error :: generic ( format ! ( "Failed to serialize internal state: {}" , e) ) ) ?;
167160
168- // NOTE: We only provide the path since the checkpoint phase does not need to look at
169- // deletion vectors. If a (path, dv_info) has been seen, we can safely skip any (path, null)
170- // or (path, dv_info) that is present in a checkpoint file.
171- let seen_file_keys = Box :: new ( self . seen_file_keys . into_iter ( ) . map ( |key| key. path ) ) ;
172161 let state = SerializableScanState {
173162 predicate,
174163 internal_state_blob,
175- seen_file_keys,
164+ seen_file_keys : self . seen_file_keys ,
176165 } ;
177166
178167 Ok ( state)
@@ -186,15 +175,10 @@ impl ScanLogReplayProcessor {
186175 ///
187176 /// # Parameters
188177 /// - `engine`: Engine for creating evaluators and filters
189- /// - `state`: The serialized state containing predicate, internal state blob, and seen file paths
178+ /// - `state`: The serialized state containing predicate, internal state blob, and seen file keys
190179 ///
191180 /// # Returns
192- /// An `Arc<ScanLogReplayProcessor>`. This is wrapped in an Arc to indicate that the
193- /// seen_file_keys set may never be updated again if it was built from a `SerializableScanState`.
194- /// This is because serialization removes deletion vector information from the seen_file_keys.
195- /// Thus, it is only safe to apply this processor to checkpoint actions (actions that will not
196- /// modify the deduplication set). Checkpoint actions should be removed if their path is in the
197- /// seen_file_keys set.
181+ /// A new `ScanLogReplayProcessor` wrapped in an Arc.
198182 ///
199183 #[ internal_api]
200184 #[ allow( unused) ]
@@ -227,11 +211,7 @@ impl ScanLogReplayProcessor {
227211 column_mapping_mode : internal_state. column_mapping_mode ,
228212 } ) ;
229213
230- let seen_file_keys = state
231- . seen_file_keys
232- . map ( |path| FileActionKey :: new ( path, None ) )
233- . collect ( ) ;
234- let processor = Self :: new_with_seen_files ( engine, state_info, seen_file_keys) ?;
214+ let processor = Self :: new_with_seen_files ( engine, state_info, state. seen_file_keys ) ?;
235215
236216 Ok ( Arc :: new ( processor) )
237217 }
@@ -557,6 +537,7 @@ pub(crate) fn scan_action_iter(
557537
558538#[ cfg( test) ]
559539mod tests {
540+ use std:: collections:: HashSet ;
560541 use std:: { collections:: HashMap , sync:: Arc } ;
561542
562543 use crate :: actions:: get_commit_schema;
@@ -786,7 +767,7 @@ mod tests {
786767
787768 #[ test]
788769 fn test_serialization_basic_state_and_dv_dropping ( ) {
789- // Test basic StateInfo preservation and DV info dropping
770+ // Test basic StateInfo preservation and FileActionKey preservation
790771 let engine = SyncEngine :: new ( ) ;
791772 let schema: SchemaRef = Arc :: new ( StructType :: new_unchecked ( [
792773 StructField :: new ( "id" , DataType :: INTEGER , true ) ,
@@ -799,21 +780,12 @@ mod tests {
799780 . unwrap ( ) ;
800781
801782 // Add file keys with and without DV info
802- processor
803- . seen_file_keys
804- . insert ( crate :: log_replay:: FileActionKey :: new ( "file1.parquet" , None ) ) ;
805- processor
806- . seen_file_keys
807- . insert ( crate :: log_replay:: FileActionKey :: new (
808- "file2.parquet" ,
809- Some ( "dv-1" . to_string ( ) ) ,
810- ) ) ;
811- processor
812- . seen_file_keys
813- . insert ( crate :: log_replay:: FileActionKey :: new (
814- "file3.parquet" ,
815- Some ( "dv-2" . to_string ( ) ) ,
816- ) ) ;
783+ let key1 = crate :: log_replay:: FileActionKey :: new ( "file1.parquet" , None ) ;
784+ let key2 = crate :: log_replay:: FileActionKey :: new ( "file2.parquet" , Some ( "dv-1" . to_string ( ) ) ) ;
785+ let key3 = crate :: log_replay:: FileActionKey :: new ( "file3.parquet" , Some ( "dv-2" . to_string ( ) ) ) ;
786+ processor. seen_file_keys . insert ( key1. clone ( ) ) ;
787+ processor. seen_file_keys . insert ( key2. clone ( ) ) ;
788+ processor. seen_file_keys . insert ( key3. clone ( ) ) ;
817789
818790 let state_info = processor. state_info . clone ( ) ;
819791 let deserialized = ScanLogReplayProcessor :: from_serializable_state (
@@ -836,19 +808,11 @@ mod tests {
836808 state_info. column_mapping_mode
837809 ) ;
838810
839- // Verify DV info dropped but paths preserved
811+ // Verify all file keys are preserved with their DV info
840812 assert_eq ! ( deserialized. seen_file_keys. len( ) , 3 ) ;
841- for key in & deserialized. seen_file_keys {
842- assert ! ( key. dv_unique_id. is_none( ) , "DV info should be dropped" ) ;
843- }
844- let paths: std:: collections:: HashSet < _ > = deserialized
845- . seen_file_keys
846- . iter ( )
847- . map ( |k| k. path . as_str ( ) )
848- . collect ( ) ;
849- assert ! ( paths. contains( "file1.parquet" ) ) ;
850- assert ! ( paths. contains( "file2.parquet" ) ) ;
851- assert ! ( paths. contains( "file3.parquet" ) ) ;
813+ assert ! ( deserialized. seen_file_keys. contains( & key1) ) ;
814+ assert ! ( deserialized. seen_file_keys. contains( & key2) ) ;
815+ assert ! ( deserialized. seen_file_keys. contains( & key3) ) ;
852816 }
853817
854818 #[ test]
@@ -994,7 +958,7 @@ mod tests {
994958 let invalid_state = SerializableScanState {
995959 predicate : None ,
996960 internal_state_blob : vec ! [ 0 , 1 , 2 , 3 , 255 ] , // Invalid JSON
997- seen_file_keys : Box :: new ( std :: iter :: empty ( ) ) ,
961+ seen_file_keys : HashSet :: new ( ) ,
998962 } ;
999963 assert ! ( ScanLogReplayProcessor :: from_serializable_state( & engine, invalid_state) . is_err( ) ) ;
1000964 }
@@ -1020,7 +984,7 @@ mod tests {
1020984 let invalid_state = SerializableScanState {
1021985 predicate : Some ( predicate) , // Predicate exists but schema is None
1022986 internal_state_blob : invalid_blob,
1023- seen_file_keys : Box :: new ( std :: iter :: empty ( ) ) ,
987+ seen_file_keys : HashSet :: new ( ) ,
1024988 } ;
1025989 let result = ScanLogReplayProcessor :: from_serializable_state ( & engine, invalid_state) ;
1026990 assert ! ( result. is_err( ) ) ;
0 commit comments