Skip to content

Commit 493cca1

Browse files
committed
extract file deduplicator
1 parent c8b8469 commit 493cca1

File tree

4 files changed

+80
-61
lines changed

4 files changed

+80
-61
lines changed

kernel/src/action_reconciliation/log_replay.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
//! actions selected
3232
//!
3333
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
34+
use crate::log_replay::deduplicator::Deduplicator;
3435
use crate::log_replay::{
3536
ActionsBatch, FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor,
3637
};

kernel/src/log_replay.rs

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! to minimize memory usage for tables with extensive history.
1616
use crate::actions::deletion_vector::DeletionVectorDescriptor;
1717
use crate::engine_data::{GetData, TypedGetData};
18+
use crate::log_replay::deduplicator::Deduplicator;
1819
use crate::scan::data_skipping::DataSkippingFilter;
1920
use crate::{DeltaResult, EngineData};
2021

@@ -24,6 +25,8 @@ use std::collections::HashSet;
2425

2526
use tracing::debug;
2627

28+
pub(crate) mod deduplicator;
29+
2730
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2831
/// of adds and removes during log replay.
2932
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
@@ -87,36 +90,6 @@ impl<'seen> FileActionDeduplicator<'seen> {
8790
}
8891
}
8992

90-
/// Checks if log replay already processed this logical file (in which case the current action
91-
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
92-
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
93-
/// and should process it.
94-
pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
95-
// Note: each (add.path + add.dv_unique_id()) pair has a
96-
// unique Add + Remove pair in the log. For example:
97-
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json
98-
99-
if self.seen_file_keys.contains(&key) {
100-
debug!(
101-
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
102-
key.path, key.dv_unique_id, self.is_log_batch
103-
);
104-
true
105-
} else {
106-
debug!(
107-
"Including ({}, {:?}) in scan, is log {}",
108-
key.path, key.dv_unique_id, self.is_log_batch
109-
);
110-
if self.is_log_batch {
111-
// Remember file actions from this batch so we can ignore duplicates as we process
112-
// batches from older commit and/or checkpoint files. We don't track checkpoint
113-
// batches because they are already the oldest actions and never replace anything.
114-
self.seen_file_keys.insert(key);
115-
}
116-
false
117-
}
118-
}
119-
12093
/// Extracts the deletion vector unique ID if it exists.
12194
///
12295
/// This function retrieves the necessary fields for constructing a deletion vector unique ID
@@ -145,6 +118,38 @@ impl<'seen> FileActionDeduplicator<'seen> {
145118
None => Ok(None),
146119
}
147120
}
121+
}
122+
impl<'seen> Deduplicator for FileActionDeduplicator<'seen> {
123+
type Key = FileActionKey;
124+
/// Checks if log replay already processed this logical file (in which case the current action
125+
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
126+
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
127+
/// and should process it.
128+
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
129+
// Note: each (add.path + add.dv_unique_id()) pair has a
130+
// unique Add + Remove pair in the log. For example:
131+
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json
132+
133+
if self.seen_file_keys.contains(&key) {
134+
debug!(
135+
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
136+
key.path, key.dv_unique_id, self.is_log_batch
137+
);
138+
true
139+
} else {
140+
debug!(
141+
"Including ({}, {:?}) in scan, is log {}",
142+
key.path, key.dv_unique_id, self.is_log_batch
143+
);
144+
if self.is_log_batch {
145+
// Remember file actions from this batch so we can ignore duplicates as we process
146+
// batches from older commit and/or checkpoint files. We don't track checkpoint
147+
// batches because they are already the oldest actions and never replace anything.
148+
self.seen_file_keys.insert(key);
149+
}
150+
false
151+
}
152+
}
148153

149154
/// Extracts a file action key and determines if it's an add operation.
150155
/// This method examines the data at the given index using the provided getters
@@ -159,7 +164,7 @@ impl<'seen> FileActionDeduplicator<'seen> {
159164
/// - `Ok(Some((key, is_add)))`: When a file action is found, returns the key and whether it's an add operation
160165
/// - `Ok(None)`: When no file action is found
161166
/// - `Err(...)`: On any error during extraction
162-
pub(crate) fn extract_file_action<'a>(
167+
fn extract_file_action<'a>(
163168
&self,
164169
i: usize,
165170
getters: &[&'a dyn GetData<'a>],
@@ -190,7 +195,7 @@ impl<'seen> FileActionDeduplicator<'seen> {
190195
///
191196
/// `true` indicates we are processing a batch from a commit file.
192197
/// `false` indicates we are processing a batch from a checkpoint.
193-
pub(crate) fn is_log_batch(&self) -> bool {
198+
fn is_log_batch(&self) -> bool {
194199
self.is_log_batch
195200
}
196201
}
@@ -362,6 +367,7 @@ pub(crate) trait HasSelectionVector {
362367
mod tests {
363368
use super::*;
364369
use crate::engine_data::GetData;
370+
use crate::log_replay::deduplicator::Deduplicator;
365371
use crate::DeltaResult;
366372
use std::collections::{HashMap, HashSet};
367373

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use crate::{engine_data::GetData, DeltaResult};
2+
3+
pub(crate) trait Deduplicator {
4+
type Key;
5+
fn extract_file_action<'a>(
6+
&self,
7+
i: usize,
8+
getters: &[&'a dyn GetData<'a>],
9+
skip_removes: bool,
10+
) -> DeltaResult<Option<(Self::Key, bool)>>;
11+
fn check_and_record_seen(&mut self, key: Self::Key) -> bool;
12+
fn is_log_batch(&self) -> bool;
13+
}

kernel/src/scan/log_replay.rs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::actions::get_log_add_schema;
1212
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
1313
use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef};
1414
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _};
15+
use crate::log_replay::deduplicator::Deduplicator;
1516
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
1617
use crate::scan::Scalar;
1718
use crate::schema::ToSchema as _;
@@ -46,6 +47,15 @@ pub struct SerializableScanState {
4647
pub seen_file_keys: HashSet<FileActionKey>,
4748
}
4849

50+
// These index positions correspond to the order of columns defined in
51+
// `selected_column_names_and_types()`
52+
pub(crate) const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
53+
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
54+
pub(crate) const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
55+
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
56+
pub(crate) const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
57+
pub(crate) const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns
58+
4959
/// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan.
5060
///
5161
/// During a table scan, the processor reads batches of log actions (in reverse chronological order)
@@ -226,40 +236,23 @@ impl ScanLogReplayProcessor {
226236
/// replay visits actions newest-first, so once we've seen a file action for a given (path, dvId)
227237
/// pair, we should ignore all subsequent (older) actions for that same (path, dvId) pair. If the
228238
/// first action for a given file is a remove, then that file does not show up in the result at all.
229-
struct AddRemoveDedupVisitor<'seen> {
230-
deduplicator: FileActionDeduplicator<'seen>,
239+
struct AddRemoveDedupVisitor<D: Deduplicator> {
240+
deduplicator: D,
231241
selection_vector: Vec<bool>,
232242
state_info: Arc<StateInfo>,
233243
partition_filter: Option<PredicateRef>,
234244
row_transform_exprs: Vec<Option<ExpressionRef>>,
235245
}
236246

237-
impl AddRemoveDedupVisitor<'_> {
238-
// These index positions correspond to the order of columns defined in
239-
// `selected_column_names_and_types()`
240-
const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
241-
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
242-
const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
243-
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
244-
const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
245-
const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns
246-
247+
impl<D: Deduplicator> AddRemoveDedupVisitor<D> {
247248
fn new(
248-
seen: &mut HashSet<FileActionKey>,
249+
deduplicator: D,
249250
selection_vector: Vec<bool>,
250251
state_info: Arc<StateInfo>,
251252
partition_filter: Option<PredicateRef>,
252-
is_log_batch: bool,
253-
) -> AddRemoveDedupVisitor<'_> {
253+
) -> AddRemoveDedupVisitor<D> {
254254
AddRemoveDedupVisitor {
255-
deduplicator: FileActionDeduplicator::new(
256-
seen,
257-
is_log_batch,
258-
Self::ADD_PATH_INDEX,
259-
Self::REMOVE_PATH_INDEX,
260-
Self::ADD_DV_START_INDEX,
261-
Self::REMOVE_DV_START_INDEX,
262-
),
255+
deduplicator,
263256
selection_vector,
264257
state_info,
265258
partition_filter,
@@ -312,7 +305,7 @@ impl AddRemoveDedupVisitor<'_> {
312305
let partition_values = match &self.state_info.transform_spec {
313306
Some(transform) if is_add => {
314307
let partition_values =
315-
getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
308+
getters[ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
316309
let partition_values = parse_partition_values(
317310
&self.state_info.logical_schema,
318311
transform,
@@ -331,8 +324,7 @@ impl AddRemoveDedupVisitor<'_> {
331324
if self.deduplicator.check_and_record_seen(file_key) || !is_add {
332325
return Ok(false);
333326
}
334-
let base_row_id: Option<i64> =
335-
getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
327+
let base_row_id: Option<i64> = getters[BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
336328
let transform = self
337329
.state_info
338330
.transform_spec
@@ -355,7 +347,7 @@ impl AddRemoveDedupVisitor<'_> {
355347
}
356348
}
357349

358-
impl RowVisitor for AddRemoveDedupVisitor<'_> {
350+
impl<D: Deduplicator> RowVisitor for AddRemoveDedupVisitor<D> {
359351
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
360352
// NOTE: The visitor assumes a schema with adds first and removes optionally afterward.
361353
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
@@ -501,12 +493,19 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
501493
let selection_vector = self.build_selection_vector(actions.as_ref())?;
502494
assert_eq!(selection_vector.len(), actions.len());
503495

504-
let mut visitor = AddRemoveDedupVisitor::new(
496+
let deduplicator = FileActionDeduplicator::new(
505497
&mut self.seen_file_keys,
498+
is_log_batch,
499+
ADD_PATH_INDEX,
500+
REMOVE_PATH_INDEX,
501+
ADD_DV_START_INDEX,
502+
REMOVE_DV_START_INDEX,
503+
);
504+
let mut visitor = AddRemoveDedupVisitor::new(
505+
deduplicator,
506506
selection_vector,
507507
self.state_info.clone(),
508508
self.partition_filter.clone(),
509-
is_log_batch,
510509
);
511510
visitor.visit_rows_of(actions.as_ref())?;
512511

0 commit comments

Comments
 (0)