Skip to content

Commit 7e33173

Browse files
committed
make dedup compatible
1 parent acc045d commit 7e33173

File tree

3 files changed

+42
-6
lines changed

3 files changed

+42
-6
lines changed

kernel/src/log_replay.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
//! deduplication with `FileActionDeduplicator` which tracks unique files across log batches
1515
//! to minimize memory usage for tables with extensive history.
1616
use crate::engine_data::GetData;
17-
use crate::log_replay::deduplicator::Deduplicator;
17+
use crate::log_replay::deduplicator::{extract_dv_unique_id, Deduplicator};
1818
use crate::scan::data_skipping::DataSkippingFilter;
1919
use crate::{DeltaResult, EngineData};
2020

@@ -144,7 +144,7 @@ impl<'seen> Deduplicator for FileActionDeduplicator<'seen> {
144144
) -> DeltaResult<Option<(FileActionKey, bool)>> {
145145
// Try to extract an add action by the required path column
146146
if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? {
147-
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
147+
let dv_unique_id = extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
148148
return Ok(Some((FileActionKey::new(path, dv_unique_id), true)));
149149
}
150150

@@ -155,7 +155,7 @@ impl<'seen> Deduplicator for FileActionDeduplicator<'seen> {
155155

156156
// Try to extract a remove action by the required path column
157157
if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? {
158-
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?;
158+
let dv_unique_id = extract_dv_unique_id(i, getters, self.remove_dv_start_index)?;
159159
return Ok(Some((FileActionKey::new(path, dv_unique_id), false)));
160160
}
161161

kernel/src/log_replay/deduplicator.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,25 @@ pub(crate) trait Deduplicator {
7171
pub(crate) struct CheckpointDeduplicator<'a> {
7272
seen_file_keys: &'a HashSet<FileActionKey>,
7373
add_path_index: usize,
74+
add_dv_start_index: usize,
7475
}
7576
impl CheckpointDeduplicator<'_> {
7677
#[allow(unused)]
7778
pub(crate) fn try_new<'a>(
7879
seen_file_keys: &'a HashSet<FileActionKey>,
7980
add_path_index: usize,
81+
add_dv_start_index: usize,
8082
) -> DeltaResult<CheckpointDeduplicator<'a>> {
8183
Ok(CheckpointDeduplicator {
8284
seen_file_keys,
8385
add_path_index,
86+
add_dv_start_index,
8487
})
8588
}
8689
}
8790

8891
impl Deduplicator for CheckpointDeduplicator<'_> {
89-
type Key = String;
92+
type Key = FileActionKey;
9093

9194
fn extract_file_action<'a>(
9295
&self,
@@ -96,7 +99,8 @@ impl Deduplicator for CheckpointDeduplicator<'_> {
9699
) -> DeltaResult<Option<(Self::Key, bool)>> {
97100
// Try to extract an add action by the required path column
98101
if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? {
99-
Ok(Some((path.to_string(), true)))
102+
let dv_unique_id = extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
103+
Ok(Some((FileActionKey::new(path, dv_unique_id), true)))
100104
} else {
101105
Ok(None)
102106
}
@@ -110,3 +114,31 @@ impl Deduplicator for CheckpointDeduplicator<'_> {
110114
false
111115
}
112116
}
117+
118+
/// Extracts the deletion vector unique ID if it exists.
119+
///
120+
/// This function retrieves the necessary fields for constructing a deletion vector unique ID
121+
/// by accessing `getters` at `dv_start_index` and the following two indices. Specifically:
122+
/// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`).
123+
/// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`).
124+
/// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`).
125+
pub(crate) fn extract_dv_unique_id<'a>(
126+
i: usize,
127+
getters: &[&'a dyn GetData<'a>],
128+
dv_start_index: usize,
129+
) -> DeltaResult<Option<String>> {
130+
match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? {
131+
Some(storage_type) => {
132+
let path_or_inline =
133+
getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
134+
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;
135+
136+
Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
137+
storage_type,
138+
path_or_inline,
139+
offset,
140+
)))
141+
}
142+
None => Ok(None),
143+
}
144+
}

kernel/src/scan/log_replay.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,11 @@ impl ParallelizableLogReplayProcessor for ScanLogReplayProcessor {
495495
let selection_vector = self.build_selection_vector(actions.as_ref())?;
496496
assert_eq!(selection_vector.len(), actions.len());
497497

498-
let deduplicator = CheckpointDeduplicator::try_new(&self.seen_file_keys, ADD_PATH_INDEX)?;
498+
let deduplicator = CheckpointDeduplicator::try_new(
499+
&self.seen_file_keys,
500+
ADD_PATH_INDEX,
501+
ADD_DV_START_INDEX,
502+
)?;
499503
let mut visitor = AddRemoveDedupVisitor::new(
500504
deduplicator,
501505
selection_vector,

0 commit comments

Comments
 (0)