Skip to content

Commit 87bd895

Browse files
committed
update to fileactionkey
1 parent 74cdbd4 commit 87bd895

File tree

2 files changed

+22
-58
lines changed

2 files changed

+22
-58
lines changed

kernel/src/log_replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tracing::debug;
2727
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2828
/// of adds and removes during log replay.
2929
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
30-
pub(crate) struct FileActionKey {
30+
pub struct FileActionKey {
3131
pub(crate) path: String,
3232
pub(crate) dv_unique_id: Option<String>,
3333
}

kernel/src/scan/log_replay.rs

Lines changed: 21 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ pub struct SerializableScanState {
4141
pub predicate: Option<PredicateRef>,
4242
/// Opaque internal state blob containing schemas, transform spec, and column mapping mode
4343
pub internal_state_blob: Vec<u8>,
44-
/// Iterator over file paths that have been removed from the table or already processed.
45-
/// Note: Deletion vector info is intentionally dropped; only file paths are retained.
46-
pub seen_file_keys: Box<dyn Iterator<Item = String>>,
44+
/// Set of file action keys that have been removed from the table or already processed.
45+
pub seen_file_keys: HashSet<FileActionKey>,
4746
}
4847

4948
/// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan.
@@ -135,16 +134,10 @@ impl ScanLogReplayProcessor {
135134
/// Consumes the processor and returns a `SerializableScanState` containing:
136135
/// - The predicate (if any) for data skipping
137136
/// - An opaque internal state blob (schemas, transform spec, column mapping mode)
138-
/// - An iterator over seen file paths (DV info is intentionally dropped)
137+
/// - The set of seen file keys including their deletion vector information
139138
///
140139
/// The returned state can be used with `from_serializable_state` to reconstruct the
141140
/// processor on remote compute nodes.
142-
///
143-
/// # Note
144-
/// Deletion vector information is intentionally dropped from `seen_file_keys` during
145-
/// serialization because the checkpoint phase does not need to examine deletion vectors.
146-
/// If a `(path, dv_info)` has been seen, we can safely skip any `(path, null)` or
147-
/// `(path, dv_info)` entries in checkpoint files.
148141
#[internal_api]
149142
#[allow(unused)]
150143
pub(crate) fn into_serializable_state(self) -> DeltaResult<SerializableScanState> {
@@ -165,14 +158,10 @@ impl ScanLogReplayProcessor {
165158
let internal_state_blob = serde_json::to_vec(&internal_state)
166159
.map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?;
167160

168-
// NOTE: We only provide the path since the checkpoint phase does not need to look at
169-
// deletion vectors. If a (path, dv_info) has been seen, we can safely skip any (path, null)
170-
// or (path, dv_info) that is present in a checkpoint file.
171-
let seen_file_keys = Box::new(self.seen_file_keys.into_iter().map(|key| key.path));
172161
let state = SerializableScanState {
173162
predicate,
174163
internal_state_blob,
175-
seen_file_keys,
164+
seen_file_keys: self.seen_file_keys,
176165
};
177166

178167
Ok(state)
@@ -186,15 +175,10 @@ impl ScanLogReplayProcessor {
186175
///
187176
/// # Parameters
188177
/// - `engine`: Engine for creating evaluators and filters
189-
/// - `state`: The serialized state containing predicate, internal state blob, and seen file paths
178+
/// - `state`: The serialized state containing predicate, internal state blob, and seen file keys
190179
///
191180
/// # Returns
192-
/// An `Arc<ScanLogReplayProcessor>`. This is wrapped in an Arc to indicate that the
193-
/// seen_file_keys set may never be updated again if it was built from a `SerializableScanState`.
194-
/// This is because serialization removes deletion vector information from the seen_file_keys.
195-
/// Thus, it is only safe to apply this processor to checkpoint actions (actions that will not
196-
/// modify the deduplication set). Checkpoint actions should be removed if their path is in the
197-
/// seen_file_keys set.
181+
/// A new `ScanLogReplayProcessor` wrapped in an Arc.
198182
///
199183
#[internal_api]
200184
#[allow(unused)]
@@ -227,11 +211,7 @@ impl ScanLogReplayProcessor {
227211
column_mapping_mode: internal_state.column_mapping_mode,
228212
});
229213

230-
let seen_file_keys = state
231-
.seen_file_keys
232-
.map(|path| FileActionKey::new(path, None))
233-
.collect();
234-
let processor = Self::new_with_seen_files(engine, state_info, seen_file_keys)?;
214+
let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?;
235215

236216
Ok(Arc::new(processor))
237217
}
@@ -557,6 +537,7 @@ pub(crate) fn scan_action_iter(
557537

558538
#[cfg(test)]
559539
mod tests {
540+
use std::collections::HashSet;
560541
use std::{collections::HashMap, sync::Arc};
561542

562543
use crate::actions::get_commit_schema;
@@ -786,7 +767,7 @@ mod tests {
786767

787768
#[test]
788769
fn test_serialization_basic_state_and_dv_dropping() {
789-
// Test basic StateInfo preservation and DV info dropping
770+
// Test basic StateInfo preservation and FileActionKey preservation
790771
let engine = SyncEngine::new();
791772
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
792773
StructField::new("id", DataType::INTEGER, true),
@@ -799,21 +780,12 @@ mod tests {
799780
.unwrap();
800781

801782
// Add file keys with and without DV info
802-
processor
803-
.seen_file_keys
804-
.insert(crate::log_replay::FileActionKey::new("file1.parquet", None));
805-
processor
806-
.seen_file_keys
807-
.insert(crate::log_replay::FileActionKey::new(
808-
"file2.parquet",
809-
Some("dv-1".to_string()),
810-
));
811-
processor
812-
.seen_file_keys
813-
.insert(crate::log_replay::FileActionKey::new(
814-
"file3.parquet",
815-
Some("dv-2".to_string()),
816-
));
783+
let key1 = crate::log_replay::FileActionKey::new("file1.parquet", None);
784+
let key2 = crate::log_replay::FileActionKey::new("file2.parquet", Some("dv-1".to_string()));
785+
let key3 = crate::log_replay::FileActionKey::new("file3.parquet", Some("dv-2".to_string()));
786+
processor.seen_file_keys.insert(key1.clone());
787+
processor.seen_file_keys.insert(key2.clone());
788+
processor.seen_file_keys.insert(key3.clone());
817789

818790
let state_info = processor.state_info.clone();
819791
let deserialized = ScanLogReplayProcessor::from_serializable_state(
@@ -836,19 +808,11 @@ mod tests {
836808
state_info.column_mapping_mode
837809
);
838810

839-
// Verify DV info dropped but paths preserved
811+
// Verify all file keys are preserved with their DV info
840812
assert_eq!(deserialized.seen_file_keys.len(), 3);
841-
for key in &deserialized.seen_file_keys {
842-
assert!(key.dv_unique_id.is_none(), "DV info should be dropped");
843-
}
844-
let paths: std::collections::HashSet<_> = deserialized
845-
.seen_file_keys
846-
.iter()
847-
.map(|k| k.path.as_str())
848-
.collect();
849-
assert!(paths.contains("file1.parquet"));
850-
assert!(paths.contains("file2.parquet"));
851-
assert!(paths.contains("file3.parquet"));
813+
assert!(deserialized.seen_file_keys.contains(&key1));
814+
assert!(deserialized.seen_file_keys.contains(&key2));
815+
assert!(deserialized.seen_file_keys.contains(&key3));
852816
}
853817

854818
#[test]
@@ -994,7 +958,7 @@ mod tests {
994958
let invalid_state = SerializableScanState {
995959
predicate: None,
996960
internal_state_blob: vec![0, 1, 2, 3, 255], // Invalid JSON
997-
seen_file_keys: Box::new(std::iter::empty()),
961+
seen_file_keys: HashSet::new(),
998962
};
999963
assert!(ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state).is_err());
1000964
}
@@ -1020,7 +984,7 @@ mod tests {
1020984
let invalid_state = SerializableScanState {
1021985
predicate: Some(predicate), // Predicate exists but schema is None
1022986
internal_state_blob: invalid_blob,
1023-
seen_file_keys: Box::new(std::iter::empty()),
987+
seen_file_keys: HashSet::new(),
1024988
};
1025989
let result = ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state);
1026990
assert!(result.is_err());

0 commit comments

Comments
 (0)