From 69a8312ad609546d2fbfac74f9d9082108bca41e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 14:56:43 -0800 Subject: [PATCH 01/10] commit reader --- kernel/src/lib.rs | 1 + kernel/src/log_reader/commit.rs | 130 ++++++++++++++++++++++++++++++++ kernel/src/log_reader/mod.rs | 1 + kernel/src/log_segment.rs | 13 +--- kernel/src/scan/log_replay.rs | 2 +- kernel/src/scan/mod.rs | 2 +- 6 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 kernel/src/log_reader/commit.rs create mode 100644 kernel/src/log_reader/mod.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 6594290fb6..861f2b9ef1 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -93,6 +93,7 @@ pub mod error; pub mod expressions; mod log_compaction; mod log_path; +mod log_reader; pub mod metrics; pub mod scan; pub mod schema; diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs new file mode 100644 index 0000000000..da67eb2fda --- /dev/null +++ b/kernel/src/log_reader/commit.rs @@ -0,0 +1,130 @@ +//! Commit phase for log replay - processes JSON commit files. + +use crate::log_replay::ActionsBatch; +use crate::log_segment::LogSegment; +use crate::schema::SchemaRef; +use crate::{DeltaResult, Engine}; + +/// Phase that processes JSON commit files into [`ActionsBatch`]s +pub(crate) struct CommitReader { + actions: Box> + Send>, +} + +impl CommitReader { + /// Create a new commit phase from a log segment. + /// + /// # Parameters + /// - `engine`: Engine for reading files + /// - `log_segment`: The log segment to process + /// - `schema`: The schema to read the json files + pub(crate) fn try_new( + engine: &dyn Engine, + log_segment: &LogSegment, + schema: SchemaRef, + ) -> DeltaResult { + let commit_files = log_segment.find_commit_cover(); + let actions = engine + .json_handler() + .read_json_files(&commit_files, schema, None)? + .map(|batch| batch.map(|b| ActionsBatch::new(b, true))); + + Ok(Self { + actions: Box::new(actions), + }) + } +} + +impl Iterator for CommitReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + self.actions.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::DefaultEngine; + use crate::scan::COMMIT_READ_SCHEMA; + use crate::{Error, Snapshot, SnapshotRef}; + use itertools::Itertools; + use object_store::local::LocalFileSystem; + use std::path::PathBuf; + use std::sync::Arc; + use url::Url; + + fn load_test_table( + table_name: &str, + ) -> DeltaResult<( + Arc>, + SnapshotRef, + url::Url, + )> { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/data"); + path.push(table_name); + + let path = std::fs::canonicalize(path) + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + + let url = Url::from_directory_path(path) + .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + + Ok((engine, snapshot, url)) + } + + #[test] + fn test_commit_phase_processes_commits() -> Result<(), Box> { + let (engine, snapshot, _url) = load_test_table("app-txn-no-checkpoint")?; + let log_segment = Arc::new(snapshot.log_segment().clone()); + + let schema = COMMIT_READ_SCHEMA.clone(); + let commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?; + + let mut file_paths = vec![]; + for result in commit_phase { + let batch = result?; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(is_log_batch); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + file_paths.extend(batch_paths); + } + + file_paths.sort(); + let expected_files = vec![ + "modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet", + "modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet", + "modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet", + "modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet", + ]; + assert_eq!( + file_paths, expected_files, + "CommitReader should find exactly the expected files" + ); + + Ok(()) + } +} diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs new file mode 100644 index 0000000000..d1337ef59a --- /dev/null +++ b/kernel/src/log_reader/mod.rs @@ -0,0 +1 @@ +pub(crate) mod commit; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 3b223e633e..10b60275a0 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -11,6 +11,7 @@ use crate::actions::{ PROTOCOL_NAME, SIDECAR_NAME, }; use crate::last_checkpoint_hint::LastCheckpointHint; +use crate::log_reader::commit::CommitReader; use crate::log_replay::ActionsBatch; use crate::metrics::{MetricEvent, MetricId, MetricsReporter}; use crate::path::{LogPathFileType, ParsedLogPath}; @@ -330,15 +331,7 @@ impl LogSegment { meta_predicate: Option, ) -> DeltaResult> + Send> { // `replay` expects commit files to be sorted in descending order, so the return value here is correct - let commits_and_compactions = self.find_commit_cover(); - let commit_stream = engine - .json_handler() - .read_json_files( - &commits_and_compactions, - commit_read_schema, - meta_predicate.clone(), - )? - .map_ok(|batch| ActionsBatch::new(batch, true)); + let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; let checkpoint_stream = self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; @@ -367,7 +360,7 @@ impl LogSegment { /// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes /// that all files in `self.ascending_commit_files` and `self.ascending_compaction_files` are in /// range for this log segment. This invariant is maintained by our listing code. - fn find_commit_cover(&self) -> Vec { + pub(crate) fn find_commit_cover(&self) -> Vec { // Create an iterator sorted in ascending order by (initial version, end version), e.g. // [00.json, 00.09.compacted.json, 00.99.compacted.json, 01.json, 02.json, ..., 10.json, // 10.19.compacted.json, 11.json, ...] diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 7bf69eb728..e0fe51b3fb 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor { impl ScanLogReplayProcessor { /// Create a new [`ScanLogReplayProcessor`] instance - fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { + pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { // Extract the physical predicate from StateInfo's PhysicalPredicate enum. // The DataSkippingFilter and partition_filter components expect the predicate // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 0a991b07e6..b1545b09ea 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -47,7 +47,7 @@ mod tests; // safety: we define get_commit_schema() and _know_ it contains ADD_NAME and REMOVE_NAME #[allow(clippy::unwrap_used)] -static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { +pub(crate) static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { get_commit_schema() .project(&[ADD_NAME, REMOVE_NAME]) .unwrap() From 41eba152cc46b0ab3c4daf5ba11d80d5b52e4975 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 15:20:45 -0800 Subject: [PATCH 02/10] manifest simplify --- kernel/src/log_reader/checkpoint_manifest.rs | 262 +++++++++++++++++++ kernel/src/log_reader/commit.rs | 33 +-- kernel/src/log_reader/mod.rs | 1 + kernel/src/utils.rs | 44 +++- 4 files changed, 306 insertions(+), 34 deletions(-) create mode 100644 kernel/src/log_reader/checkpoint_manifest.rs diff --git a/kernel/src/log_reader/checkpoint_manifest.rs b/kernel/src/log_reader/checkpoint_manifest.rs new file mode 100644 index 0000000000..bf21a9e8ed --- /dev/null +++ b/kernel/src/log_reader/checkpoint_manifest.rs @@ -0,0 +1,262 @@ +//! Manifest phase for log replay - processes single-part checkpoints and manifest checkpoints. + +use std::sync::{Arc, LazyLock}; + +use itertools::Itertools; +use url::Url; + +use crate::actions::visitors::SidecarVisitor; +use crate::actions::{Add, Remove, Sidecar, ADD_NAME}; +use crate::actions::{REMOVE_NAME, SIDECAR_NAME}; +use crate::log_replay::ActionsBatch; +use crate::path::ParsedLogPath; +use crate::schema::{SchemaRef, StructField, StructType, ToSchema}; +use crate::utils::require; +use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; + +/// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file +/// and extracts the sidecar actions during iteration. +#[allow(unused)] +pub(crate) struct CheckpointManifestReader { + actions: Box> + Send>, + sidecar_visitor: SidecarVisitor, + log_root: Url, + is_complete: bool, + manifest_file: FileMeta, +} + +impl CheckpointManifestReader { + /// Create a new manifest phase for a single-part checkpoint. + /// + /// The schema is automatically augmented with the sidecar column since the manifest + /// phase needs to extract sidecar references for phase transitions. + /// + /// # Parameters + /// - `manifest_file`: The checkpoint manifest file to process + /// - `log_root`: Root URL for resolving sidecar paths + /// - `engine`: Engine for reading files + #[allow(unused)] + pub(crate) fn try_new( + engine: Arc, + manifest: &ParsedLogPath, + log_root: Url, + ) -> DeltaResult { + static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new_unchecked([ + StructField::nullable(ADD_NAME, Add::to_schema()), + StructField::nullable(REMOVE_NAME, Remove::to_schema()), + StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()), + ])) + }); + + let actions = match manifest.extension.as_str() { + "json" => engine.json_handler().read_json_files( + std::slice::from_ref(&manifest.location), + MANIFEST_READ_SCHMEA.clone(), + None, + )?, + "parquet" => engine.parquet_handler().read_parquet_files( + std::slice::from_ref(&manifest.location), + MANIFEST_READ_SCHMEA.clone(), + None, + )?, + extension => { + return Err(Error::generic(format!( + "Unsupported checkpoint extension: {extension}", + ))); + } + }; + + let actions = Box::new(actions.map(|batch_res| Ok(ActionsBatch::new(batch_res?, false)))); + Ok(Self { + actions, + sidecar_visitor: SidecarVisitor::default(), + log_root, + is_complete: false, + manifest_file: manifest.location.clone(), + }) + } + + /// Extract the sidecars from the manifest file if there were any. + /// NOTE: The iterator must be completely exhausted before calling this + #[allow(unused)] + pub(crate) fn extract_sidecars(self) -> DeltaResult> { + require!( + self.is_complete, + Error::generic(format!( + "Cannot extract sidecars from in-progress ManifestReader for file: {}", + self.manifest_file.location + )) + ); + + let sidecars: Vec<_> = self + .sidecar_visitor + .sidecars + .into_iter() + .map(|s| s.to_filemeta(&self.log_root)) + .try_collect()?; + + Ok(sidecars) + } +} + +impl Iterator for CheckpointManifestReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + let result = self.actions.next().map(|batch_result| { + let batch = batch_result?; + self.sidecar_visitor.visit_rows_of(batch.actions())?; + Ok(batch) + }); + + if result.is_none() { + self.is_complete = true; + } + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{Array, StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; + use crate::SnapshotRef; + + use itertools::Itertools; + use std::sync::Arc; + + /// Helper function to test manifest phase with expected add paths and sidecars + fn verify_manifest_phase( + engine: Arc, + snapshot: SnapshotRef, + expected_add_paths: &[&str], + expected_sidecars: &[&str], + ) -> DeltaResult<()> { + let log_segment = snapshot.log_segment(); + let log_root = log_segment.log_root.clone(); + assert_eq!(log_segment.checkpoint_parts.len(), 1); + let checkpoint_file = &log_segment.checkpoint_parts[0]; + let mut manifest_phase = + CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?; + + // Extract add file paths and verify expectations + let mut file_paths = vec![]; + for result in manifest_phase.by_ref() { + let batch = result?; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(!is_log_batch, "Manifest should not be a log batch"); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + file_paths.extend(batch_paths); + } + + // Verify collected add paths + file_paths.sort(); + assert_eq!( + file_paths, expected_add_paths, + "CheckpointManifestReader should extract expected Add file paths from checkpoint" + ); + + // Check sidecars + let actual_sidecars = manifest_phase.extract_sidecars()?; + + assert_eq!( + actual_sidecars.len(), + expected_sidecars.len(), + "Should collect exactly {} actual_sidecars", + expected_sidecars.len() + ); + + // Extract and verify the sidecar paths + let mut collected_paths: Vec = actual_sidecars + .iter() + .map(|fm| { + fm.location + .path_segments() + .and_then(|mut segments| segments.next_back()) + .unwrap_or("") + .to_string() + }) + .collect(); + + collected_paths.sort(); + // Verify they're the expected sidecar files + assert_eq!(collected_paths, expected_sidecars.to_vec()); + + Ok(()) + } + + #[test] + fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; + verify_manifest_phase( + engine, + snapshot, + &["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"], + &[], + ) + } + + #[test] + fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; + + let manifest_phase = CheckpointManifestReader::try_new( + engine.clone(), + &snapshot.log_segment().checkpoint_parts[0], + snapshot.log_segment().log_root.clone(), + )?; + + let result = manifest_phase.extract_sidecars(); + assert_result_error_with_message( + result, + "Cannot extract sidecars from in-progress ManifestReader for file", + ); + Ok(()) + } + + #[test] + fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet", + ], + ) + } + + #[test] + fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet", + ], + ) + } +} diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index da67eb2fda..0012802856 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -47,43 +47,14 @@ mod tests { use super::*; use crate::arrow::array::{StringArray, StructArray}; use crate::engine::arrow_data::EngineDataArrowExt as _; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; - use crate::engine::default::DefaultEngine; use crate::scan::COMMIT_READ_SCHEMA; - use crate::{Error, Snapshot, SnapshotRef}; + use crate::utils::test_utils::load_test_table; use itertools::Itertools; - use object_store::local::LocalFileSystem; - use std::path::PathBuf; use std::sync::Arc; - use url::Url; - - fn load_test_table( - table_name: &str, - ) -> DeltaResult<( - Arc>, - SnapshotRef, - url::Url, - )> { - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.push("tests/data"); - path.push(table_name); - - let path = std::fs::canonicalize(path) - .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; - - let url = Url::from_directory_path(path) - .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; - - Ok((engine, snapshot, url)) - } #[test] fn test_commit_phase_processes_commits() -> Result<(), Box> { - let (engine, snapshot, _url) = load_test_table("app-txn-no-checkpoint")?; + let (engine, snapshot, _tempdir) = load_test_table("app-txn-no-checkpoint")?; let log_segment = Arc::new(snapshot.log_segment().clone()); let schema = COMMIT_READ_SCHEMA.clone(); diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs index d1337ef59a..b3c344e09b 100644 --- a/kernel/src/log_reader/mod.rs +++ b/kernel/src/log_reader/mod.rs @@ -1 +1,2 @@ +pub(crate) mod checkpoint_manifest; pub(crate) mod commit; diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 8a4a8fb9a7..2b0f15cb1e 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -146,6 +146,7 @@ impl<'a, T: ToOwned + ?Sized> CowExt<(Cow<'a, T>, Cow<'a, T>)> for (Cow<'a, T>, #[cfg(test)] pub(crate) mod test_utils { + use std::path::PathBuf; use std::{path::Path, sync::Arc}; use itertools::Itertools; @@ -153,7 +154,8 @@ pub(crate) mod test_utils { use object_store::ObjectStore; use serde::Serialize; use tempfile::TempDir; - use test_utils::delta_path_for_version; + use test_utils::{delta_path_for_version, load_test_data}; + use url::Url; use crate::actions::{ get_all_actions_schema, Add, Cdc, CommitInfo, Metadata, Protocol, Remove, @@ -161,9 +163,10 @@ pub(crate) mod test_utils { use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; + use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; - use crate::Engine; - use crate::EngineData; + use crate::{DeltaResult, EngineData, Error, SnapshotRef}; + use crate::{Engine, Snapshot}; #[derive(Serialize)] pub(crate) enum Action { @@ -365,6 +368,41 @@ pub(crate) mod test_utils { &KernelDataType::Primitive(PrimitiveType::Integer) ); } + + /// Load a test table from tests/data directory. + /// Tries compressed (tar.zst) first, falls back to extracted. + /// Returns (engine, snapshot, optional tempdir). The TempDir must be kept alive + /// for the duration of the test to prevent premature cleanup of extracted files. + pub(crate) fn load_test_table( + table_name: &str, + ) -> DeltaResult<(Arc, SnapshotRef, Option)> { + // Try loading compressed table first, fall back to extracted + let (path, tempdir) = match load_test_data("tests/data", table_name) { + Ok(test_dir) => { + let test_path = test_dir.path().join(table_name); + (test_path, Some(test_dir)) + } + Err(_) => { + // Fall back to already-extracted table + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let mut path = PathBuf::from(manifest_dir); + path.push("tests/data"); + path.push(table_name); + let path = std::fs::canonicalize(path) + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + (path, None) + } + }; + + // Create engine and snapshot from the resolved path + let url = Url::from_directory_path(&path) + .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + Ok((engine, snapshot, tempdir)) + } } #[cfg(test)] From 748141bef5ea004ed31133e46679f84f1f68783d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 16:55:53 -0800 Subject: [PATCH 03/10] rename sequential phase --- kernel/src/lib.rs | 1 + kernel/src/parallel/mod.rs | 1 + kernel/src/parallel/sequential_phase.rs | 364 ++++++++++++++++++++++++ 3 files changed, 366 insertions(+) create mode 100644 kernel/src/parallel/mod.rs create mode 100644 kernel/src/parallel/sequential_phase.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 861f2b9ef1..3c365c8332 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -95,6 +95,7 @@ mod log_compaction; mod log_path; mod log_reader; pub mod metrics; +mod parallel; pub mod scan; pub mod schema; pub mod snapshot; diff --git a/kernel/src/parallel/mod.rs b/kernel/src/parallel/mod.rs new file mode 100644 index 0000000000..a39cfa30f0 --- /dev/null +++ b/kernel/src/parallel/mod.rs @@ -0,0 +1 @@ +pub(crate) mod sequential_phase; diff --git a/kernel/src/parallel/sequential_phase.rs b/kernel/src/parallel/sequential_phase.rs new file mode 100644 index 0000000000..8de8b1b086 --- /dev/null +++ b/kernel/src/parallel/sequential_phase.rs @@ -0,0 +1,364 @@ +//! Sequential log replay processor that happens before the parallel phase. +//! +//! This module provides sequential phase log replay that processes commits and +//! single-part checkpoint manifests, then returns the processor and any files (sidecars or +//! multi-part checkpoint parts) for parallel processing by the parallel phase. This phase +//! must be completed before the parallel phase can start. +//! +//! For multi-part checkpoints, the sequential phase skips manifest processing and returns +//! the checkpoint parts for parallel processing. + +use std::sync::Arc; + +use itertools::Itertools; + +use crate::actions::get_commit_schema; +use crate::log_reader::checkpoint_manifest::CheckpointManifestReader; +use crate::log_reader::commit::CommitReader; +use crate::log_replay::LogReplayProcessor; +use crate::log_segment::LogSegment; +use crate::utils::require; +use crate::{DeltaResult, Engine, Error, FileMeta}; + +/// Sequential log replay processor for parallel execution. +/// +/// This iterator processes log replay sequentially: +/// 1. Commit files (JSON) +/// 2. Manifest (single-part checkpoint, if present) +/// +/// After exhaustion, call `finish()` to extract: +/// - The processor (for serialization and distribution) +/// - Files (sidecars or multi-part checkpoint parts) for parallel processing +/// +/// # Type Parameters +/// - `P`: A [`LogReplayProcessor`] implementation that processes action batches +/// +/// # Example +/// +/// ```ignore +/// let mut sequential = SequentialPhase::try_new(processor, log_segment, engine)?; +/// +/// // Iterate over sequential batches +/// for batch in sequential.by_ref() { +/// let metadata = batch?; +/// // Process metadata +/// } +/// +/// // Extract processor and files for distribution (if needed) +/// match sequential.finish()? { +/// AfterSequential::Parallel { processor, files } => { +/// // Parallel phase needed - distribute files for parallel processing. +/// // If crossing the network boundary, the processor must be serialized. +/// let serialized = processor.serialize()?; +/// let partitions = partition_files(files, num_workers); +/// for (worker, partition) in partitions { +/// worker.send(serialized.clone(), partition)?; +/// } +/// } +/// AfterSequential::Done(processor) => { +/// // No parallel phase needed - all processing complete sequentially +/// println!("Log replay complete"); +/// } +/// } +/// ``` +#[allow(unused)] +pub(crate) struct SequentialPhase { + // The processor that will be used to process the action batches + processor: P, + // The commit reader that will be used to read the commit files + commit_phase: CommitReader, + // The checkpoint manifest reader that will be used to read the checkpoint manifest files. + // If the checkpoint is single-part, this will be Some(CheckpointManifestReader). + checkpoint_manifest_phase: Option, + // Whether the iterator has been fully exhausted + is_finished: bool, + // Checkpoint parts for potential parallel phase processing + checkpoint_parts: Vec, +} + +unsafe impl Send for SequentialPhase

{} + +/// Result of sequential log replay processing. +#[allow(unused)] +pub(crate) enum AfterSequential { + /// All processing complete sequentially - no parallel phase needed. + Done(P), + /// Parallel phase needed - distribute files for parallel processing. + Parallel { processor: P, files: Vec }, +} + +impl SequentialPhase

{ + /// Create a new sequential phase log replay. + /// + /// # Parameters + /// - `processor`: The log replay processor + /// - `log_segment`: The log segment to process + /// - `engine`: Engine for reading files + #[allow(unused)] + pub(crate) fn try_new( + processor: P, + log_segment: &LogSegment, + engine: Arc, + ) -> DeltaResult { + let commit_phase = + CommitReader::try_new(engine.as_ref(), log_segment, get_commit_schema().clone())?; + + // Concurrently start reading the checkpoint manifest. Only create a checkpoint manifest + // reader if the checkpoint is single-part. + let checkpoint_manifest_phase = match log_segment.checkpoint_parts.as_slice() { + [single_part] => Some(CheckpointManifestReader::try_new( + engine, + single_part, + log_segment.log_root.clone(), + )?), + _ => None, + }; + + let checkpoint_parts = log_segment + .checkpoint_parts + .iter() + .map(|path| path.location.clone()) + .collect_vec(); + + Ok(Self { + processor, + commit_phase, + checkpoint_manifest_phase, + is_finished: false, + checkpoint_parts, + }) + } + + /// Complete sequential phase and extract processor + files for distribution. + /// + /// Must be called after the iterator is exhausted. + /// + /// # Returns + /// - `Done`: All processing done sequentially - no parallel phase needed + /// - `Parallel`: Parallel phase needed. The resulting files may be processed + /// in parallel. + /// + /// # Errors + /// Returns an error if called before iterator exhaustion. + #[allow(unused)] + pub(crate) fn finish(self) -> DeltaResult> { + if !self.is_finished { + return Err(Error::generic( + "Must exhaust iterator before calling finish()", + )); + } + + let parallel_files = match self.checkpoint_manifest_phase { + Some(manifest_reader) => manifest_reader.extract_sidecars()?, + None => { + let parts = self.checkpoint_parts; + require!( + parts.len() != 1, + Error::generic( + "Invariant violation: If there is exactly one checkpoint part, + there must be a manifest reader" + ) + ); + // If this is a multi-part checkpoint, use the checkpoint parts for parallel phase + parts + } + }; + + if parallel_files.is_empty() { + Ok(AfterSequential::Done(self.processor)) + } else { + Ok(AfterSequential::Parallel { + processor: self.processor, + files: parallel_files, + }) + } + } +} + +impl Iterator for SequentialPhase

{ + type Item = DeltaResult; + + fn next(&mut self) -> Option { + let next = self + .commit_phase + .next() + .or_else(|| self.checkpoint_manifest_phase.as_mut()?.next()); + + if next.is_none() { + self.is_finished = true; + } + + next.map(|batch_res| { + batch_res.and_then(|batch| self.processor.process_actions_batch(batch)) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::scan::log_replay::ScanLogReplayProcessor; + use crate::scan::state_info::StateInfo; + use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; + use std::sync::Arc; + + /// Core helper function to verify sequential processing with expected adds and sidecars. + fn verify_sequential_processing( + table_name: &str, + expected_adds: &[&str], + expected_sidecars: &[&str], + ) -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table(table_name)?; + let log_segment = snapshot.log_segment(); + + let state_info = Arc::new(StateInfo::try_new( + snapshot.schema(), + snapshot.table_configuration(), + None, + (), + )?); + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; + + // Process all batches and collect Add file paths + let mut file_paths = Vec::new(); + for result in sequential.by_ref() { + let metadata = result?; + file_paths = metadata.visit_scan_files( + file_paths, + |ps: &mut Vec, path, _, _, _, _, _| { + ps.push(path.to_string()); + }, + )?; + } + + // Assert collected adds match expected + file_paths.sort(); + assert_eq!( + file_paths, expected_adds, + "Sequential phase should collect expected Add file paths" + ); + + // Call finish() and verify result based on expected sidecars + let result = sequential.finish()?; + match (expected_sidecars, result) { + (sidecars, AfterSequential::Done(_)) => { + assert!( + sidecars.is_empty(), + "Expected Done but got sidecars {:?}", + sidecars + ); + } + (expected_sidecars, AfterSequential::Parallel { files, .. }) => { + assert_eq!( + files.len(), + expected_sidecars.len(), + "Should collect exactly {} sidecar files", + expected_sidecars.len() + ); + + // Extract and verify sidecar file paths + let mut collected_paths = files + .iter() + .map(|fm| { + fm.location + .path_segments() + .and_then(|mut segments| segments.next_back()) + .unwrap_or("") + .to_string() + }) + .collect_vec(); + + collected_paths.sort(); + assert_eq!(collected_paths, expected_sidecars); + } + } + + Ok(()) + } + + #[test] + fn test_sequential_v2_with_commits_only() -> DeltaResult<()> { + verify_sequential_processing( + "table-without-dv-small", + &["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"], + &[], // No sidecars + ) + } + + #[test] + fn test_sequential_v2_with_sidecars() -> DeltaResult<()> { + verify_sequential_processing( + "v2-checkpoints-json-with-sidecars", + &[], // No adds in sequential phase (all in checkpoint sidecars) + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet", + ], + ) + } + + #[test] + fn test_sequential_finish_before_exhaustion_error() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?; + let log_segment = snapshot.log_segment(); + + let state_info = Arc::new(StateInfo::try_new( + snapshot.schema(), + snapshot.table_configuration(), + None, + (), + )?); + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; + + // Call next() once but don't exhaust the iterator + if let Some(result) = sequential.next() { + result?; + } + + // Try to call finish() before exhausting the iterator + let result = sequential.finish(); + assert_result_error_with_message(result, "Must exhaust iterator before calling finish()"); + + Ok(()) + } + + #[test] + fn test_sequential_checkpoint_without_sidecars() -> DeltaResult<()> { + verify_sequential_processing( + "v2-checkpoints-json-without-sidecars", + &[ + // Adds from checkpoint manifest processed in sequential phase + "test%25file%25prefix-part-00000-0e32f92c-e232-4daa-b734-369d1a800502-c000.snappy.parquet", + "test%25file%25prefix-part-00000-91daf7c5-9ba0-4f76-aefd-0c3b21d33c6c-c000.snappy.parquet", + "test%25file%25prefix-part-00001-a5c41be1-ded0-4b18-a638-a927d233876e-c000.snappy.parquet", + ], + &[], // No sidecars + ) + } + + #[test] + fn test_sequential_parquet_checkpoint_with_sidecars() -> DeltaResult<()> { + verify_sequential_processing( + "v2-checkpoints-parquet-with-sidecars", + &[], // No adds in sequential phase + &[ + // Expected sidecars + "00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet", + ], + ) + } + + #[test] + fn test_sequential_checkpoint_no_commits() -> DeltaResult<()> { + verify_sequential_processing( + "with_checkpoint_no_last_checkpoint", + &["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"], // Add from commit 3 + &[], // No sidecars + ) + } +} From 6227645580128ccd91946416e5fbb15ed46e3f64 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 9 Dec 2025 16:37:28 -0800 Subject: [PATCH 04/10] address comments --- kernel/src/lib.rs | 2 +- kernel/src/parallel/sequential_phase.rs | 30 ++++++++++++++----------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 3c365c8332..3ebafbb6ab 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -95,7 +95,7 @@ mod log_compaction; mod log_path; mod log_reader; pub mod metrics; -mod parallel; +pub(crate) mod parallel; pub mod scan; pub mod schema; pub mod snapshot; diff --git a/kernel/src/parallel/sequential_phase.rs b/kernel/src/parallel/sequential_phase.rs index 8de8b1b086..c16e526d7b 100644 --- a/kernel/src/parallel/sequential_phase.rs +++ b/kernel/src/parallel/sequential_phase.rs @@ -66,7 +66,7 @@ pub(crate) struct SequentialPhase { // The processor that will be used to process the action batches processor: P, // The commit reader that will be used to read the commit files - commit_phase: CommitReader, + commit_phase: Option, // The checkpoint manifest reader that will be used to read the checkpoint manifest files. // If the checkpoint is single-part, this will be Some(CheckpointManifestReader). checkpoint_manifest_phase: Option, @@ -76,8 +76,6 @@ pub(crate) struct SequentialPhase { checkpoint_parts: Vec, } -unsafe impl Send for SequentialPhase

{} - /// Result of sequential log replay processing. #[allow(unused)] pub(crate) enum AfterSequential { @@ -100,8 +98,11 @@ impl SequentialPhase

{ log_segment: &LogSegment, engine: Arc, ) -> DeltaResult { - let commit_phase = - CommitReader::try_new(engine.as_ref(), log_segment, get_commit_schema().clone())?; + let commit_phase = Some(CommitReader::try_new( + engine.as_ref(), + log_segment, + get_commit_schema().clone(), + )?); // Concurrently start reading the checkpoint manifest. Only create a checkpoint manifest // reader if the checkpoint is single-part. @@ -181,16 +182,19 @@ impl Iterator for SequentialPhase

{ fn next(&mut self) -> Option { let next = self .commit_phase - .next() - .or_else(|| self.checkpoint_manifest_phase.as_mut()?.next()); - - if next.is_none() { + .as_mut() + .and_then(|commit_phase| commit_phase.next()) + .or_else(|| { + self.commit_phase = None; + self.checkpoint_manifest_phase.as_mut()?.next() + }); + + let Some(result) = next else { self.is_finished = true; - } + return None; + }; - next.map(|batch_res| { - batch_res.and_then(|batch| self.processor.process_actions_batch(batch)) - }) + Some(result.and_then(|batch| self.processor.process_actions_batch(batch))) } } From 4257d32d1a6eec7904b70d9eda7a643899a1086e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 3 Dec 2025 16:50:25 -0800 Subject: [PATCH 05/10] serde for expression/predicate --- kernel/src/expressions/column_names.rs | 3 +- kernel/src/expressions/mod.rs | 72 ++++++++++++++++++++------ kernel/src/expressions/scalars.rs | 11 ++-- kernel/src/schema/mod.rs | 2 +- 4 files changed, 65 insertions(+), 23 deletions(-) diff --git a/kernel/src/expressions/column_names.rs b/kernel/src/expressions/column_names.rs index 27b6160001..7cc748b794 100644 --- a/kernel/src/expressions/column_names.rs +++ b/kernel/src/expressions/column_names.rs @@ -7,7 +7,7 @@ use std::iter::Peekable; use std::ops::Deref; /// A (possibly nested) column name. -#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Ord)] +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] pub struct ColumnName { path: Vec, } @@ -449,6 +449,7 @@ macro_rules! __joined_column_expr { } #[doc(inline)] pub use __joined_column_expr as joined_column_expr; +use serde::{Deserialize, Serialize}; #[cfg(test)] mod test { diff --git a/kernel/src/expressions/mod.rs b/kernel/src/expressions/mod.rs index ccba3d5c89..469e840fec 100644 --- a/kernel/src/expressions/mod.rs +++ b/kernel/src/expressions/mod.rs @@ -5,6 +5,7 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use itertools::Itertools; +use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; pub use self::column_names::{ column_expr, column_expr_ref, column_name, column_pred, joined_column_expr, joined_column_name, @@ -31,14 +32,14 @@ pub type PredicateRef = std::sync::Arc; //////////////////////////////////////////////////////////////////////// /// A unary predicate operator. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum UnaryPredicateOp { /// Unary Is Null IsNull, } /// A binary predicate operator. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum BinaryPredicateOp { /// Comparison Less Than LessThan, @@ -53,14 +54,14 @@ pub enum BinaryPredicateOp { } /// A unary expression operator. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum UnaryExpressionOp { /// Convert struct data to JSON-encoded strings ToJson, } /// A binary expression operator. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum BinaryExpressionOp { /// Arithmetic Plus Plus, @@ -73,14 +74,14 @@ pub enum BinaryExpressionOp { } /// A variadic expression operator. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum VariadicExpressionOp { /// Collapse multiple values into one by taking the first non-null value Coalesce, } /// A junction (AND/OR) predicate operator. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum JunctionPredicateOp { /// Conjunction And, @@ -190,7 +191,7 @@ pub type OpaquePredicateOpRef = Arc; // Expressions and predicates //////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct UnaryPredicate { /// The operator. pub op: UnaryPredicateOp, @@ -198,7 +199,7 @@ pub struct UnaryPredicate { pub expr: Box, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BinaryPredicate { /// The operator. pub op: BinaryPredicateOp, @@ -208,7 +209,7 @@ pub struct BinaryPredicate { pub right: Box, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct UnaryExpression { /// The operator. pub op: UnaryExpressionOp, @@ -216,7 +217,7 @@ pub struct UnaryExpression { pub expr: Box, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BinaryExpression { /// The operator. pub op: BinaryExpressionOp, @@ -226,7 +227,7 @@ pub struct BinaryExpression { pub right: Box, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct VariadicExpression { /// The operator. pub op: VariadicExpressionOp, @@ -234,7 +235,7 @@ pub struct VariadicExpression { pub exprs: Vec, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct JunctionPredicate { /// The operator. pub op: JunctionPredicateOp, @@ -250,6 +251,22 @@ pub struct OpaquePredicate { pub op: OpaquePredicateOpRef, pub exprs: Vec, } +fn fail_serialize_opaque_predicate( + _value: &OpaquePredicate, + _serializer: S, +) -> Result +where + S: Serializer, +{ + Err(ser::Error::custom("Cannot serialize Opaque Expression")) +} + +fn fail_deserialize_opaque_predicate<'de, D>(_deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + Err(de::Error::custom("Cannot deserialize Opaque Expression")) +} impl OpaquePredicate { fn new(op: OpaquePredicateOpRef, exprs: impl IntoIterator) -> Self { @@ -274,9 +291,28 @@ impl OpaqueExpression { } } +fn fail_serialize_opaque_expression( + _value: &OpaqueExpression, + _serializer: S, +) -> Result +where + S: Serializer, +{ + Err(ser::Error::custom("Cannot serialize Opaque Expression")) +} + +fn fail_deserialize_opaque_expression<'de, D>( + _deserializer: D, +) -> Result +where + D: Deserializer<'de>, +{ + Err(de::Error::custom("Cannot deserialize Opaque Expression")) +} + /// A transformation affecting a single field (one pieces of a [`Transform`]). The transformation /// could insert 0+ new fields after the target, or could replace the target with 0+ a new fields). -#[derive(Debug, Clone, PartialEq, Default)] +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] pub struct FieldTransform { /// The list of expressions this field transform emits at the target location. pub exprs: Vec, @@ -291,7 +327,7 @@ pub struct FieldTransform { /// not specifically mentioned by the transform is passed through, unmodified and with the same /// relative field ordering. This is particularly useful for wide schemas where only a few columns /// need to be modified and/or dropped, or where a small number of columns need to be injected. -#[derive(Debug, Clone, PartialEq, Default)] +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] pub struct Transform { /// The path to the nested input struct this transform operates on (if any). If no path is /// given, the transform operates directly on top-level columns. @@ -373,7 +409,7 @@ impl Transform { /// These expressions do not track or validate data types, other than the type /// of literals. It is up to the expression evaluator to validate the /// expression against a schema and add appropriate casts as required. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Expression { /// A literal value. Literal(Scalar), @@ -394,6 +430,8 @@ pub enum Expression { Variadic(VariadicExpression), /// An expression that the engine defines and implements. Kernel interacts with the expression /// only through methods provided by the [`OpaqueExpressionOp`] trait. + #[serde(serialize_with = "fail_serialize_opaque_expression")] + #[serde(deserialize_with = "fail_deserialize_opaque_expression")] Opaque(OpaqueExpression), /// An unknown expression (i.e. one that neither kernel nor engine attempts to evaluate). For /// data skipping purposes, kernel treats unknown expressions as if they were literal NULL @@ -411,7 +449,7 @@ pub enum Expression { /// These predicates do not track or validate data types, other than the type /// of literals. It is up to the predicate evaluator to validate the /// predicate against a schema and add appropriate casts as required. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Predicate { /// A boolean-valued expression, useful for e.g. `AND(, )`. BooleanExpression(Expression), @@ -430,6 +468,8 @@ pub enum Predicate { Junction(JunctionPredicate), /// A predicate that the engine defines and implements. Kernel interacts with the predicate /// only through methods provided by the [`OpaquePredicateOp`] trait. + #[serde(serialize_with = "fail_serialize_opaque_predicate")] + #[serde(deserialize_with = "fail_deserialize_opaque_predicate")] Opaque(OpaquePredicate), /// An unknown predicate (i.e. one that neither kernel nor engine attempts to evaluate). For /// data skipping purposes, kernel treats unknown predicates as if they were literal NULL values diff --git a/kernel/src/expressions/scalars.rs b/kernel/src/expressions/scalars.rs index 73e4a9db8d..9e153d58a0 100644 --- a/kernel/src/expressions/scalars.rs +++ b/kernel/src/expressions/scalars.rs @@ -4,13 +4,14 @@ use std::fmt::{Display, Formatter}; use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc}; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use crate::schema::derive_macro_utils::ToDataType; use crate::schema::{ArrayType, DataType, DecimalType, MapType, PrimitiveType, StructField}; use crate::utils::require; use crate::{DeltaResult, Error}; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct DecimalData { bits: i128, ty: DecimalType, @@ -54,7 +55,7 @@ fn get_decimal_precision(value: i128) -> u8 { value.unsigned_abs().checked_ilog10().map_or(0, |p| p + 1) as _ } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ArrayData { tpe: ArrayType, /// This exists currently for literal list comparisons, but should not be depended on see below @@ -99,7 +100,7 @@ impl ArrayData { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct MapData { data_type: MapType, pairs: Vec<(Scalar, Scalar)>, @@ -157,7 +158,7 @@ impl MapData { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StructData { fields: Vec, values: Vec, @@ -215,7 +216,7 @@ impl StructData { /// A single value, which can be null. Used for representing literal values /// in [Expressions][crate::expressions::Expression]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum Scalar { /// 32bit integer Integer(i32), diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 86687033d5..e7db494b57 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -1227,7 +1227,7 @@ fn default_true() -> bool { true } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub struct DecimalType { precision: u8, scale: u8, From 2e700abd0d6e12553819285b311c4ba1c49b4bf1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 9 Dec 2025 17:11:39 -0800 Subject: [PATCH 06/10] serde tests --- kernel/src/expressions/mod.rs | 502 ++++++++++++++++++++++++++++++++++ 1 file changed, 502 insertions(+) diff --git a/kernel/src/expressions/mod.rs b/kernel/src/expressions/mod.rs index 469e840fec..144b97991d 100644 --- a/kernel/src/expressions/mod.rs +++ b/kernel/src/expressions/mod.rs @@ -1030,8 +1030,34 @@ impl> std::ops::Div for Expression { #[cfg(test)] mod tests { + use std::fmt::Debug; + + use serde::de::DeserializeOwned; + use serde::Serialize; + use super::{column_expr, column_pred, Expression as Expr, Predicate as Pred}; + /// Helper function to verify roundtrip serialization/deserialization + fn assert_roundtrip(value: &T) { + let json = serde_json::to_string(value).expect("serialization should succeed"); + let deserialized: T = serde_json::from_str(&json).expect("deserialization should succeed"); + assert_eq!(value, &deserialized, "roundtrip should preserve value"); + } + + /// Helper function for types where PartialEq may not work correctly (e.g., types + /// containing Null, Array, Map, or Struct scalars which return None from partial_cmp). + /// This compares the JSON representation instead. + fn assert_roundtrip_via_json(value: &T) { + let json = serde_json::to_string(value).expect("serialization should succeed"); + let deserialized: T = serde_json::from_str(&json).expect("deserialization should succeed"); + let reserialized = + serde_json::to_string(&deserialized).expect("reserialization should succeed"); + assert_eq!( + json, reserialized, + "roundtrip should produce identical JSON" + ); + } + #[test] fn test_expression_format() { let cases = [ @@ -1094,4 +1120,480 @@ mod tests { assert_eq!(result, expected); } } + + // ==================== Serde Roundtrip Tests ==================== + + mod serde_tests { + use std::sync::Arc; + + use crate::expressions::scalars::{ArrayData, DecimalData, MapData, StructData}; + use crate::expressions::{ + column_expr, column_name, BinaryExpressionOp, BinaryPredicateOp, ColumnName, + Expression, Predicate, Scalar, Transform, UnaryExpressionOp, VariadicExpressionOp, + }; + use crate::schema::{ArrayType, DataType, DecimalType, MapType, StructField}; + + use super::{assert_roundtrip, assert_roundtrip_via_json}; + + // ==================== Expression::Literal Tests ==================== + + #[test] + fn test_literal_scalars_roundtrip() { + // Test all primitive scalar types that have proper PartialEq + let cases: Vec = vec![ + // Numeric types + Expression::literal(42i32), // Integer + Expression::literal(9999999999i64), // Long + Expression::literal(123i16), // Short + Expression::literal(42i8), // Byte + Expression::literal(1.12345677_32), // Float + Expression::literal(1.12345667_64), // Double + // String and Boolean + Expression::literal("hello world"), + Expression::literal(true), + Expression::literal(false), + // Temporal types + Expression::Literal(Scalar::Timestamp(1234567890000000)), + Expression::Literal(Scalar::TimestampNtz(1234567890000000)), + Expression::Literal(Scalar::Date(19000)), + // Binary + Expression::Literal(Scalar::Binary(vec![1, 2, 3, 4, 5])), + // Decimal + Expression::Literal(Scalar::Decimal( + DecimalData::try_new(12345i128, DecimalType::try_new(10, 2).unwrap()).unwrap(), + )), + ]; + + for expr in &cases { + assert_roundtrip(expr); + } + } + + #[test] + fn test_literal_complex_scalars_roundtrip() { + // Test complex scalar types that need JSON comparison (partial_cmp returns None) + let cases: Vec = vec![ + // Null with different types + Expression::null_literal(DataType::INTEGER), + Expression::null_literal(DataType::STRING), + Expression::null_literal(DataType::BOOLEAN), + // Array + Expression::Literal(Scalar::Array( + ArrayData::try_new( + ArrayType::new(DataType::INTEGER, false), + vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)], + ) + .unwrap(), + )), + // Map + Expression::Literal(Scalar::Map( + MapData::try_new( + MapType::new(DataType::STRING, DataType::INTEGER, false), + vec![ + (Scalar::String("a".to_string()), Scalar::Integer(1)), + (Scalar::String("b".to_string()), Scalar::Integer(2)), + ], + ) + .unwrap(), + )), + // Struct + Expression::Literal(Scalar::Struct( + StructData::try_new( + vec![ + StructField::nullable("x", DataType::INTEGER), + StructField::nullable("y", DataType::STRING), + ], + vec![Scalar::Integer(42), Scalar::String("hello".to_string())], + ) + .unwrap(), + )), + ]; + + for expr in &cases { + assert_roundtrip_via_json(expr); + } + } + + // ==================== Expression::Column Tests ==================== + + #[test] + fn test_column_expressions_roundtrip() { + let cases: Vec = vec![ + column_expr!("my_column"), + Expression::column(["parent", "child"]), + Expression::column(["a", "b", "c", "d"]), + ]; + + for expr in &cases { + assert_roundtrip(expr); + } + } + + #[test] + fn test_column_names_roundtrip() { + let cases: Vec = vec![ + column_name!("simple"), + ColumnName::new(["a", "b", "c"]), + ColumnName::new::<&str>([]), + ]; + + for col in &cases { + assert_roundtrip(col); + } + } + + // ==================== Expression Operations Tests ==================== + + #[test] + fn test_unary_expression_roundtrip() { + let expr = Expression::unary(UnaryExpressionOp::ToJson, column_expr!("data")); + assert_roundtrip(&expr); + } + + #[test] + fn test_binary_expressions_roundtrip() { + let ops = [ + BinaryExpressionOp::Plus, + BinaryExpressionOp::Minus, + BinaryExpressionOp::Multiply, + BinaryExpressionOp::Divide, + ]; + + for op in ops { + let expr = Expression::binary(op, column_expr!("a"), Expression::literal(10)); + assert_roundtrip(&expr); + } + } + + #[test] + fn test_variadic_expression_roundtrip() { + let expr = Expression::variadic( + VariadicExpressionOp::Coalesce, + [ + column_expr!("a"), + column_expr!("b"), + Expression::literal("default"), + ], + ); + assert_roundtrip(&expr); + } + + #[test] + fn test_nested_arithmetic_expression_roundtrip() { + // (a + b) * (c - d) / 2 + let left = Expression::binary( + BinaryExpressionOp::Plus, + column_expr!("a"), + column_expr!("b"), + ); + let right = Expression::binary( + BinaryExpressionOp::Minus, + column_expr!("c"), + column_expr!("d"), + ); + let mul = Expression::binary(BinaryExpressionOp::Multiply, left, right); + let expr = Expression::binary(BinaryExpressionOp::Divide, mul, Expression::literal(2)); + assert_roundtrip(&expr); + } + + // ==================== Expression::Struct/Transform/Other Tests ==================== + + #[test] + fn test_struct_expression_roundtrip() { + let expr = Expression::struct_from([ + Arc::new(column_expr!("x")), + Arc::new(Expression::literal(42)), + Arc::new(Expression::literal("hello")), + ]); + assert_roundtrip(&expr); + } + + #[test] + fn test_transform_expressions_roundtrip() { + let cases: Vec = vec![ + // Identity transform + Expression::transform(Transform::new_top_level()), + // Drop field + Expression::transform(Transform::new_top_level().with_dropped_field("old_column")), + // Replace field + Expression::transform( + Transform::new_top_level() + .with_replaced_field("original", Arc::new(Expression::literal(0))), + ), + // Insert fields + Expression::transform( + Transform::new_top_level() + .with_inserted_field(Some("after_col"), Arc::new(column_expr!("new_col"))) + .with_inserted_field( + None::, + Arc::new(Expression::literal("prepended")), + ), + ), + // Nested transform + Expression::transform( + Transform::new_nested(["parent", "child"]).with_dropped_field("to_drop"), + ), + ]; + + for expr in &cases { + assert_roundtrip(expr); + } + } + + #[test] + fn test_expression_wrapping_predicate_roundtrip() { + let pred = Predicate::eq(column_expr!("x"), Expression::literal(10)); + let expr = Expression::from_pred(pred); + assert_roundtrip(&expr); + } + + #[test] + fn test_expression_unknown_roundtrip() { + let expr = Expression::unknown("some_unknown_function()"); + assert_roundtrip(&expr); + } + + // ==================== Predicate Tests ==================== + + #[test] + fn test_predicate_basics_roundtrip() { + let cases: Vec = vec![ + // Boolean expression + Predicate::from_expr(column_expr!("is_active")), + // Literals + Predicate::literal(true), + Predicate::literal(false), + // NOT + Predicate::not(Predicate::from_expr(column_expr!("x"))), + // Nested NOT + Predicate::not(Predicate::not(Predicate::gt( + column_expr!("x"), + Expression::literal(5), + ))), + // Unknown + Predicate::unknown("some_unknown_predicate()"), + // Unary predicates + Predicate::is_null(column_expr!("nullable_col")), + Predicate::is_not_null(column_expr!("nullable_col")), + ]; + + for pred in &cases { + assert_roundtrip(pred); + } + } + + #[test] + fn test_predicate_null_literal_roundtrip() { + // Null literal needs JSON comparison + let pred = Predicate::null_literal(); + assert_roundtrip_via_json(&pred); + } + + #[test] + fn test_predicate_comparisons_roundtrip() { + let cases: Vec = vec![ + Predicate::eq(column_expr!("x"), Expression::literal(42)), + Predicate::ne(column_expr!("status"), Expression::literal("active")), + Predicate::lt(column_expr!("age"), Expression::literal(18)), + Predicate::le(column_expr!("price"), Expression::literal(100)), + Predicate::gt(column_expr!("score"), Expression::literal(90)), + Predicate::ge(column_expr!("quantity"), Expression::literal(1)), + Predicate::distinct(column_expr!("a"), column_expr!("b")), + ]; + + for pred in &cases { + assert_roundtrip(pred); + } + } + + #[test] + fn test_predicate_in_roundtrip() { + // IN with array needs JSON comparison + let array_data = ArrayData::try_new( + ArrayType::new(DataType::INTEGER, false), + vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)], + ) + .unwrap(); + let pred = Predicate::binary( + BinaryPredicateOp::In, + column_expr!("x"), + Expression::Literal(Scalar::Array(array_data)), + ); + assert_roundtrip_via_json(&pred); + } + + #[test] + fn test_predicate_junctions_roundtrip() { + let cases: Vec = vec![ + // Simple AND + Predicate::and( + Predicate::gt(column_expr!("x"), Expression::literal(0)), + Predicate::lt(column_expr!("x"), Expression::literal(100)), + ), + // Simple OR + Predicate::or( + Predicate::eq(column_expr!("status"), Expression::literal("active")), + Predicate::eq(column_expr!("status"), Expression::literal("pending")), + ), + // Multiple AND + Predicate::and_from([ + Predicate::gt(column_expr!("x"), Expression::literal(0)), + Predicate::lt(column_expr!("x"), Expression::literal(100)), + Predicate::is_not_null(column_expr!("x")), + ]), + // Multiple OR + Predicate::or_from([ + Predicate::eq(column_expr!("type"), Expression::literal("A")), + Predicate::eq(column_expr!("type"), Expression::literal("B")), + Predicate::eq(column_expr!("type"), Expression::literal("C")), + ]), + // Nested: (a > 0 AND b < 100) OR (c = 'special') + Predicate::or( + Predicate::and( + Predicate::gt(column_expr!("a"), Expression::literal(0)), + Predicate::lt(column_expr!("b"), Expression::literal(100)), + ), + Predicate::eq(column_expr!("c"), Expression::literal("special")), + ), + ]; + + for pred in &cases { + assert_roundtrip(pred); + } + } + + // ==================== Complex Nested Structures ==================== + + #[test] + fn test_deeply_nested_structures_roundtrip() { + // COALESCE(a + b, c * d, 0) > 100 + let add = Expression::binary( + BinaryExpressionOp::Plus, + column_expr!("a"), + column_expr!("b"), + ); + let mul = Expression::binary( + BinaryExpressionOp::Multiply, + column_expr!("c"), + column_expr!("d"), + ); + let coalesce = Expression::variadic( + VariadicExpressionOp::Coalesce, + [add, mul, Expression::literal(0)], + ); + let pred = Predicate::gt(coalesce, Expression::literal(100)); + assert_roundtrip(&pred); + + // Expression wrapping a predicate that references expressions + let inner_pred = Predicate::and( + Predicate::eq(column_expr!("x"), Expression::literal(1)), + Predicate::gt( + Expression::binary( + BinaryExpressionOp::Plus, + column_expr!("y"), + column_expr!("z"), + ), + Expression::literal(10), + ), + ); + let expr = Expression::from_pred(inner_pred); + assert_roundtrip(&expr); + } + + // ==================== Opaque Variant Failure Tests ==================== + + #[test] + fn test_opaque_expression_serialize_fails() { + use crate::expressions::{OpaqueExpressionOp, ScalarExpressionEvaluator}; + use crate::DeltaResult; + + #[derive(Debug, PartialEq)] + struct TestOpaqueExprOp; + + impl OpaqueExpressionOp for TestOpaqueExprOp { + fn name(&self) -> &str { + "test_opaque" + } + fn eval_expr_scalar( + &self, + _eval_expr: &ScalarExpressionEvaluator<'_>, + _exprs: &[Expression], + ) -> DeltaResult { + Ok(Scalar::Integer(0)) + } + } + + let expr = Expression::opaque(TestOpaqueExprOp, [Expression::literal(1)]); + let result = serde_json::to_string(&expr); + assert!( + result.is_err(), + "Opaque expression serialization should fail" + ); + assert!( + result + .unwrap_err() + .to_string() + .contains("Cannot serialize Opaque Expression"), + "Error should mention opaque expression" + ); + } + + #[test] + fn test_opaque_predicate_serialize_fails() { + use crate::expressions::{OpaquePredicateOp, ScalarExpressionEvaluator}; + use crate::kernel_predicates::{ + DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator, + IndirectDataSkippingPredicateEvaluator, + }; + use crate::DeltaResult; + + #[derive(Debug, PartialEq)] + struct TestOpaquePredOp; + + impl OpaquePredicateOp for TestOpaquePredOp { + fn name(&self) -> &str { + "test_opaque_pred" + } + fn eval_pred_scalar( + &self, + _eval_expr: &ScalarExpressionEvaluator<'_>, + _eval_pred: &DirectPredicateEvaluator<'_>, + _exprs: &[Expression], + _inverted: bool, + ) -> DeltaResult> { + Ok(Some(true)) + } + fn eval_as_data_skipping_predicate( + &self, + _evaluator: &DirectDataSkippingPredicateEvaluator<'_>, + _exprs: &[Expression], + _inverted: bool, + ) -> Option { + Some(true) + } + fn as_data_skipping_predicate( + &self, + _evaluator: &IndirectDataSkippingPredicateEvaluator<'_>, + _exprs: &[Expression], + _inverted: bool, + ) -> Option { + None + } + } + + let pred = Predicate::opaque(TestOpaquePredOp, [Expression::literal(1)]); + let result = serde_json::to_string(&pred); + assert!( + result.is_err(), + "Opaque predicate serialization should fail" + ); + assert!( + result + .unwrap_err() + .to_string() + .contains("Cannot serialize Opaque Expression"), + "Error should mention opaque expression" + ); + } + } } From c016ad815161e6d44e7280c194b3407f3322905f Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 25 Nov 2025 13:27:45 -0800 Subject: [PATCH 07/10] serde --- kernel/src/log_replay.rs | 4 +- kernel/src/scan/log_replay.rs | 413 +++++++++++++++++++++++++++++++++- kernel/src/scan/state_info.rs | 2 +- kernel/src/transforms.rs | 3 +- 4 files changed, 410 insertions(+), 12 deletions(-) diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index ea4f617ec9..dfa0767a3d 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -26,8 +26,8 @@ use tracing::debug; /// The subset of file action fields that uniquely identifies it in the log, used for deduplication /// of adds and removes during log replay. -#[derive(Debug, Hash, Eq, PartialEq, Clone)] -pub(crate) struct FileActionKey { +#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)] +pub struct FileActionKey { pub(crate) path: String, pub(crate) dv_unique_id: Option, } diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index e0fe51b3fb..b91b1f8c9a 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -2,6 +2,8 @@ use std::clone::Clone; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; +use delta_kernel_derive::internal_api; + use super::data_skipping::DataSkippingFilter; use super::state_info::StateInfo; use super::{PhysicalPredicate, ScanMetadata}; @@ -14,10 +16,36 @@ use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, Log use crate::scan::Scalar; use crate::schema::ToSchema as _; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType}; -use crate::transforms::{get_transform_expr, parse_partition_values}; +use crate::table_features::ColumnMappingMode; +use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, ExpressionEvaluator}; +/// Internal serializable state (schemas, transform spec, column mapping, etc.) +/// NOTE: This is opaque to the user - it is passed through as a blob. +#[derive(serde::Serialize, serde::Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct InternalScanState { + logical_schema: Arc, + physical_schema: Arc, + predicate_schema: Option>, + transform_spec: Option>, + column_mapping_mode: ColumnMappingMode, +} + +/// Public-facing serialized processor state for distributed processing. +/// +/// This struct contains all the information needed to reconstruct a `ScanLogReplayProcessor` +/// on remote compute nodes, enabling distributed log replay processing. +pub struct SerializableScanState { + /// Optional predicate for data skipping (if provided) + pub predicate: Option, + /// Opaque internal state blob + pub internal_state_blob: Vec, + /// Set of file action keys that have already been processed. + pub seen_file_keys: HashSet, +} + /// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan. /// /// During a table scan, the processor reads batches of log actions (in reverse chronological order) @@ -54,6 +82,23 @@ pub(crate) struct ScanLogReplayProcessor { impl ScanLogReplayProcessor { /// Create a new [`ScanLogReplayProcessor`] instance pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { + Self::new_with_seen_files(engine, state_info, Default::default()) + } + + /// Create new [`ScanLogReplayProcessor`] with pre-populated seen_file_keys. + /// + /// This is useful when reconstructing a processor from serialized state, where the + /// seen_file_keys have already been computed during a previous phase of log replay. + /// + /// # Parameters + /// - `engine`: Engine for creating evaluators and filters + /// - `state_info`: StateInfo containing schemas, transforms, and predicates + /// - `seen_file_keys`: Pre-computed set of file action keys that have been seen + pub(crate) fn new_with_seen_files( + engine: &dyn Engine, + state_info: Arc, + seen_file_keys: HashSet, + ) -> DeltaResult { // Extract the physical predicate from StateInfo's PhysicalPredicate enum. // The DataSkippingFilter and partition_filter components expect the predicate // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from @@ -80,10 +125,109 @@ impl ScanLogReplayProcessor { get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), )?, - seen_file_keys: Default::default(), + seen_file_keys, state_info, }) } + + /// Serialize the processor state for distributed processing. + /// + /// Consumes the processor and returns a `SerializableScanState` containing: + /// - The predicate (if any) for data skipping + /// - An opaque internal state blob (schemas, transform spec, column mapping mode) + /// - The set of seen file keys including their deletion vector information + /// + /// The returned state can be used with `from_serializable_state` to reconstruct the + /// processor on remote compute nodes. + /// + /// WARNING: The SerializableScanState may only be deserialized using an equal binary version + /// of delta-kernel-rs. Using different versions for serialization and deserialization leads to + /// undefined behaviour! + #[internal_api] + #[allow(unused)] + pub(crate) fn into_serializable_state(self) -> DeltaResult { + let StateInfo { + logical_schema, + physical_schema, + physical_predicate, + transform_spec, + column_mapping_mode, + } = self.state_info.as_ref().clone(); + + // Extract predicate from PhysicalPredicate + let (predicate, predicate_schema) = match physical_predicate { + PhysicalPredicate::Some(pred, schema) => (Some(pred), Some(schema)), + _ => (None, None), + }; + + // Serialize internal state to JSON blob (schemas, transform spec, and column mapping mode) + let internal_state = InternalScanState { + logical_schema, + physical_schema, + transform_spec, + predicate_schema, + column_mapping_mode, + }; + let internal_state_blob = serde_json::to_vec(&internal_state) + .map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?; + + let state = SerializableScanState { + predicate, + internal_state_blob, + seen_file_keys: self.seen_file_keys, + }; + + Ok(state) + } + + /// Reconstruct a processor from serialized state. + /// + /// Creates a new processor with the provided state. All fields (partition_filter, + /// data_skipping_filter, add_transform, and seen_file_keys) are reconstructed from + /// the serialized state and engine. + /// + /// # Parameters + /// - `engine`: Engine for creating evaluators and filters + /// - `state`: The serialized state containing predicate, internal state blob, and seen file keys + /// + /// # Returns + /// A new `ScanLogReplayProcessor` wrapped in an Arc. + /// + #[internal_api] + #[allow(unused)] + pub(crate) fn from_serializable_state( + engine: &dyn Engine, + state: SerializableScanState, + ) -> DeltaResult> { + // Deserialize internal state from json + let internal_state: InternalScanState = serde_json::from_slice(&state.internal_state_blob) + .map_err(|e| Error::generic(format!("Failed to deserialize internal state: {}", e)))?; + + // Reconstruct PhysicalPredicate from predicate and predicate schema + let physical_predicate = match state.predicate { + Some(predicate) => { + let Some(predicate_schema) = internal_state.predicate_schema else { + return Err(Error::generic( + "Invalid serialized internal state. Expected predicate schema.", + )); + }; + PhysicalPredicate::Some(predicate, predicate_schema) + } + None => PhysicalPredicate::None, + }; + + let state_info = Arc::new(StateInfo { + logical_schema: internal_state.logical_schema, + physical_schema: internal_state.physical_schema, + physical_predicate, + transform_spec: internal_state.transform_spec, + column_mapping_mode: internal_state.column_mapping_mode, + }); + + let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?; + + Ok(Arc::new(processor)) + } } /// A visitor that deduplicates a stream of add and remove actions into a stream of valid adds. Log @@ -406,9 +550,11 @@ pub(crate) fn scan_action_iter( #[cfg(test)] mod tests { + use std::collections::HashSet; use std::{collections::HashMap, sync::Arc}; use crate::actions::get_commit_schema; + use crate::engine::sync::SyncEngine; use crate::expressions::{BinaryExpressionOp, Scalar, VariadicExpressionOp}; use crate::log_replay::ActionsBatch; use crate::scan::state::{DvInfo, Stats}; @@ -422,15 +568,15 @@ mod tests { }; use crate::scan::PhysicalPredicate; use crate::schema::MetadataColumnSpec; + use crate::schema::{DataType, SchemaRef, StructField, StructType}; use crate::table_features::ColumnMappingMode; + use crate::utils::test_utils::assert_result_error_with_message; use crate::Expression as Expr; - use crate::{ - engine::sync::SyncEngine, - schema::{DataType, SchemaRef, StructField, StructType}, - ExpressionRef, - }; + use crate::ExpressionRef; - use super::scan_action_iter; + use super::{ + scan_action_iter, InternalScanState, ScanLogReplayProcessor, SerializableScanState, + }; // dv-info is more complex to validate, we validate that works in the test for visit_scan_files // in state.rs @@ -632,4 +778,255 @@ mod tests { } } } + + #[test] + fn test_serialization_basic_state_and_dv_dropping() { + // Test basic StateInfo preservation and FileActionKey preservation + let engine = SyncEngine::new(); + let schema: SchemaRef = Arc::new(StructType::new_unchecked([ + StructField::new("id", DataType::INTEGER, true), + StructField::new("value", DataType::STRING, true), + ])); + let mut processor = ScanLogReplayProcessor::new( + &engine, + Arc::new(get_simple_state_info(schema.clone(), vec![]).unwrap()), + ) + .unwrap(); + + // Add file keys with and without DV info + let key1 = crate::log_replay::FileActionKey::new("file1.parquet", None); + let key2 = crate::log_replay::FileActionKey::new("file2.parquet", Some("dv-1".to_string())); + let key3 = crate::log_replay::FileActionKey::new("file3.parquet", Some("dv-2".to_string())); + processor.seen_file_keys.insert(key1.clone()); + processor.seen_file_keys.insert(key2.clone()); + processor.seen_file_keys.insert(key3.clone()); + + let state_info = processor.state_info.clone(); + let deserialized = ScanLogReplayProcessor::from_serializable_state( + &engine, + processor.into_serializable_state().unwrap(), + ) + .unwrap(); + + // Verify StateInfo fields preserved + assert_eq!( + deserialized.state_info.logical_schema, + state_info.logical_schema + ); + assert_eq!( + deserialized.state_info.physical_schema, + state_info.physical_schema + ); + assert_eq!( + deserialized.state_info.column_mapping_mode, + state_info.column_mapping_mode + ); + + // Verify all file keys are preserved with their DV info + assert_eq!(deserialized.seen_file_keys.len(), 3); + assert!(deserialized.seen_file_keys.contains(&key1)); + assert!(deserialized.seen_file_keys.contains(&key2)); + assert!(deserialized.seen_file_keys.contains(&key3)); + } + + #[test] + fn test_serialization_with_predicate() { + // Test that PhysicalPredicate and predicate schema are preserved + let engine = SyncEngine::new(); + let schema: SchemaRef = Arc::new(StructType::new_unchecked([ + StructField::new("id", DataType::INTEGER, true), + StructField::new("value", DataType::STRING, true), + ])); + let predicate = Arc::new(crate::expressions::Predicate::eq( + Expr::column(["id"]), + Expr::literal(10i32), + )); + let state_info = Arc::new( + get_state_info( + schema.clone(), + vec![], + Some(predicate.clone()), + HashMap::new(), + vec![], + ) + .unwrap(), + ); + let original_pred_schema = match &state_info.physical_predicate { + PhysicalPredicate::Some(_, s) => s.clone(), + _ => panic!("Expected predicate"), + }; + let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let deserialized = ScanLogReplayProcessor::from_serializable_state( + &engine, + processor.into_serializable_state().unwrap(), + ) + .unwrap(); + + match &deserialized.state_info.physical_predicate { + PhysicalPredicate::Some(pred, pred_schema) => { + assert_eq!(pred.as_ref(), predicate.as_ref()); + assert_eq!(pred_schema.as_ref(), original_pred_schema.as_ref()); + } + _ => panic!("Expected PhysicalPredicate::Some"), + } + } + + #[test] + fn test_serialization_with_transforms() { + // Test transform_spec preservation (partition columns + row tracking) + let engine = SyncEngine::new(); + let schema: SchemaRef = Arc::new(StructType::new_unchecked([ + StructField::new("value", DataType::INTEGER, true), + StructField::new("date", DataType::DATE, true), + ])); + let state_info = Arc::new( + get_state_info( + schema, + vec!["date".to_string()], + None, + [ + ("delta.enableRowTracking", "true"), + ( + "delta.rowTracking.materializedRowIdColumnName", + "row_id_col", + ), + ] + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + vec![("row_id", MetadataColumnSpec::RowId)], + ) + .unwrap(), + ); + let original_transform = state_info.transform_spec.clone(); + assert!(original_transform.is_some()); + let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let deserialized = ScanLogReplayProcessor::from_serializable_state( + &engine, + processor.into_serializable_state().unwrap(), + ) + .unwrap(); + assert_eq!(deserialized.state_info.transform_spec, original_transform); + } + + #[test] + fn test_serialization_column_mapping_modes() { + // Test that different ColumnMappingMode values are preserved + let engine = SyncEngine::new(); + for mode in [ + ColumnMappingMode::None, + ColumnMappingMode::Id, + ColumnMappingMode::Name, + ] { + let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( + "id", + DataType::INTEGER, + true, + )])); + let state_info = Arc::new(StateInfo { + logical_schema: schema.clone(), + physical_schema: schema, + physical_predicate: PhysicalPredicate::None, + transform_spec: None, + column_mapping_mode: mode, + }); + let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); + let deserialized = ScanLogReplayProcessor::from_serializable_state( + &engine, + processor.into_serializable_state().unwrap(), + ) + .unwrap(); + assert_eq!(deserialized.state_info.column_mapping_mode, mode); + } + } + + #[test] + fn test_serialization_edge_cases() { + // Test edge cases: empty seen_file_keys, no predicate, no transform_spec + let engine = SyncEngine::new(); + let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( + "id", + DataType::INTEGER, + true, + )])); + let state_info = Arc::new(StateInfo { + logical_schema: schema.clone(), + physical_schema: schema, + physical_predicate: PhysicalPredicate::None, + transform_spec: None, + column_mapping_mode: ColumnMappingMode::None, + }); + let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); + let serialized = processor.into_serializable_state().unwrap(); + assert!(serialized.predicate.is_none()); + let deserialized = + ScanLogReplayProcessor::from_serializable_state(&engine, serialized).unwrap(); + assert_eq!(deserialized.seen_file_keys.len(), 0); + assert!(deserialized.state_info.transform_spec.is_none()); + } + + #[test] + fn test_serialization_invalid_json() { + // Test that invalid JSON blobs are properly rejected + let engine = SyncEngine::new(); + let invalid_state = SerializableScanState { + predicate: None, + internal_state_blob: vec![0, 1, 2, 3, 255], // Invalid JSON + seen_file_keys: HashSet::new(), + }; + assert!(ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state).is_err()); + } + + #[test] + fn test_serialization_missing_predicate_schema() { + // Test that missing predicate_schema when predicate exists is detected + let engine = SyncEngine::new(); + let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( + "id", + DataType::INTEGER, + true, + )])); + let invalid_internal_state = InternalScanState { + logical_schema: schema.clone(), + physical_schema: schema, + predicate_schema: None, // Missing! + transform_spec: None, + column_mapping_mode: ColumnMappingMode::None, + }; + let predicate = Arc::new(crate::expressions::Predicate::column(["id"])); + let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap(); + let invalid_state = SerializableScanState { + predicate: Some(predicate), // Predicate exists but schema is None + internal_state_blob: invalid_blob, + seen_file_keys: HashSet::new(), + }; + let result = ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state); + assert!(result.is_err()); + if let Err(e) = result { + assert!(e.to_string().contains("predicate schema")); + } + } + + #[test] + fn deserialize_internal_state_with_extry_fields_fails() { + let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( + "id", + DataType::INTEGER, + true, + )])); + let invalid_internal_state = InternalScanState { + logical_schema: schema.clone(), + physical_schema: schema, + predicate_schema: None, + transform_spec: None, + column_mapping_mode: ColumnMappingMode::None, + }; + let blob = serde_json::to_string(&invalid_internal_state).unwrap(); + let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap(); + obj["new_field"] = serde_json::json!("my_new_value"); + let invalid_blob = obj.to_string(); + + let res: Result = serde_json::from_str(&invalid_blob); + assert_result_error_with_message(res, "unknown field"); + } } diff --git a/kernel/src/scan/state_info.rs b/kernel/src/scan/state_info.rs index 44abf33e54..2d1c6ed8af 100644 --- a/kernel/src/scan/state_info.rs +++ b/kernel/src/scan/state_info.rs @@ -15,7 +15,7 @@ use crate::transforms::{FieldTransformSpec, TransformSpec}; use crate::{DeltaResult, Error, PredicateRef, StructField}; /// All the state needed to process a scan. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct StateInfo { /// The logical schema for this scan pub(crate) logical_schema: SchemaRef, diff --git a/kernel/src/transforms.rs b/kernel/src/transforms.rs index da111f173f..d0c80a3f32 100644 --- a/kernel/src/transforms.rs +++ b/kernel/src/transforms.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::sync::Arc; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use crate::expressions::{ BinaryExpressionOp, Expression, ExpressionRef, Scalar, Transform, VariadicExpressionOp, @@ -23,7 +24,7 @@ pub(crate) type TransformSpec = Vec; /// /// These transformations are "sparse" - they only specify what changes, while unchanged fields /// pass through implicitly in their original order. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub(crate) enum FieldTransformSpec { /// Insert the given expression after the named input column (None = prepend instead) // NOTE: It's quite likely we will sometimes need to reorder columns for one reason or another, From 61f0908590d905f97ee8052aadaa5b64f74fc4de Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 26 Nov 2025 15:41:09 -0800 Subject: [PATCH 08/10] deduplicator --- .../src/action_reconciliation/log_replay.rs | 1 + kernel/src/log_replay.rs | 46 ++++--------- kernel/src/log_replay/deduplicator.rs | 65 +++++++++++++++++++ kernel/src/scan/log_replay.rs | 58 ++++++++--------- 4 files changed, 106 insertions(+), 64 deletions(-) create mode 100644 kernel/src/log_replay/deduplicator.rs diff --git a/kernel/src/action_reconciliation/log_replay.rs b/kernel/src/action_reconciliation/log_replay.rs index aecfa6ae75..5671d1b64c 100644 --- a/kernel/src/action_reconciliation/log_replay.rs +++ b/kernel/src/action_reconciliation/log_replay.rs @@ -31,6 +31,7 @@ //! actions selected //! use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _}; +use crate::log_replay::deduplicator::Deduplicator; use crate::log_replay::{ ActionsBatch, FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor, }; diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index dfa0767a3d..8881a2c3cf 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -13,8 +13,8 @@ //! This module provides structures for efficient batch processing, focusing on file action //! deduplication with `FileActionDeduplicator` which tracks unique files across log batches //! to minimize memory usage for tables with extensive history. -use crate::actions::deletion_vector::DeletionVectorDescriptor; -use crate::engine_data::{GetData, TypedGetData}; +use crate::engine_data::GetData; +use crate::log_replay::deduplicator::Deduplicator; use crate::scan::data_skipping::DataSkippingFilter; use crate::{DeltaResult, EngineData}; @@ -24,6 +24,8 @@ use std::collections::HashSet; use tracing::debug; +pub(crate) mod deduplicator; + /// The subset of file action fields that uniquely identifies it in the log, used for deduplication /// of adds and removes during log replay. #[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)] @@ -56,7 +58,8 @@ pub(crate) struct FileActionDeduplicator<'seen> { seen_file_keys: &'seen mut HashSet, // TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch` // to better reflect its role in deduplication logic. - /// Whether we're processing a log batch (as opposed to a checkpoint) + /// Whether we're processing a commit log JSON file (`true`) or a checkpoint file (`false`). + /// When `true`, file actions are added to `seen_file_keys` as they're processed. is_log_batch: bool, /// Index of the getter containing the add.path column add_path_index: usize, @@ -86,12 +89,14 @@ impl<'seen> FileActionDeduplicator<'seen> { remove_dv_start_index, } } +} +impl<'seen> Deduplicator for FileActionDeduplicator<'seen> { /// Checks if log replay already processed this logical file (in which case the current action /// should be ignored). If not already seen, register it so we can recognize future duplicates. /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it /// and should process it. - pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { + fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { // Note: each (add.path + add.dv_unique_id()) pair has a // unique Add + Remove pair in the log. For example: // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json @@ -117,35 +122,6 @@ impl<'seen> FileActionDeduplicator<'seen> { } } - /// Extracts the deletion vector unique ID if it exists. - /// - /// This function retrieves the necessary fields for constructing a deletion vector unique ID - /// by accessing `getters` at `dv_start_index` and the following two indices. Specifically: - /// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`). - /// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`). - /// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`). - fn extract_dv_unique_id<'a>( - &self, - i: usize, - getters: &[&'a dyn GetData<'a>], - dv_start_index: usize, - ) -> DeltaResult> { - match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? { - Some(storage_type) => { - let path_or_inline = - getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?; - let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?; - - Ok(Some(DeletionVectorDescriptor::unique_id_from_parts( - storage_type, - path_or_inline, - offset, - ))) - } - None => Ok(None), - } - } - /// Extracts a file action key and determines if it's an add operation. /// This method examines the data at the given index using the provided getters /// to identify whether a file action exists and what type it is. @@ -159,7 +135,7 @@ impl<'seen> FileActionDeduplicator<'seen> { /// - `Ok(Some((key, is_add)))`: When a file action is found, returns the key and whether it's an add operation /// - `Ok(None)`: When no file action is found /// - `Err(...)`: On any error during extraction - pub(crate) fn extract_file_action<'a>( + fn extract_file_action<'a>( &self, i: usize, getters: &[&'a dyn GetData<'a>], @@ -190,7 +166,7 @@ impl<'seen> FileActionDeduplicator<'seen> { /// /// `true` indicates we are processing a batch from a commit file. /// `false` indicates we are processing a batch from a checkpoint. - pub(crate) fn is_log_batch(&self) -> bool { + fn is_log_batch(&self) -> bool { self.is_log_batch } } diff --git a/kernel/src/log_replay/deduplicator.rs b/kernel/src/log_replay/deduplicator.rs new file mode 100644 index 0000000000..e8fd6993a5 --- /dev/null +++ b/kernel/src/log_replay/deduplicator.rs @@ -0,0 +1,65 @@ +//! Deduplication abstraction for log replay processors. +//! +//! The [`Deduplicator`] trait supports two deduplication strategies: +//! +//! - **JSON commit files** (`is_log_batch = true`): Tracks (path, dv_unique_id) and updates +//! the hashmap as files are seen. Implementation: [`FileActionDeduplicator`] +//! +//! - **Checkpoint files** (`is_log_batch = false`): Uses (path, dv_unique_id) to filter actions +//! using a read-only hashmap pre-populated from the commit log phase. Future implementation. +//! +//! [`FileActionDeduplicator`]: crate::log_replay::FileActionDeduplicator + +use crate::actions::deletion_vector::DeletionVectorDescriptor; +use crate::engine_data::{GetData, TypedGetData}; +use crate::log_replay::FileActionKey; +use crate::DeltaResult; + +pub(crate) trait Deduplicator { + /// Extracts a file action key from the data. Returns `(key, is_add)` if found. + /// + /// TODO: Remove the skip_removes field in the future. The caller is responsible for using the + /// correct Deduplicator instance depeding on whether the batch belongs to a commit or to a + /// checkpoint. + fn extract_file_action<'a>( + &self, + i: usize, + getters: &[&'a dyn GetData<'a>], + skip_removes: bool, + ) -> DeltaResult>; + + /// Checks if this file has been seen. When `is_log_batch() = true`, updates the hashmap + /// to track new files. Returns `true` if the file should be filtered out. + fn check_and_record_seen(&mut self, key: FileActionKey) -> bool; + + /// Returns `true` for commit log batches (updates hashmap), `false` for checkpoints (read-only). + fn is_log_batch(&self) -> bool; + + /// Extracts the deletion vector unique ID if it exists. + /// + /// This function retrieves the necessary fields for constructing a deletion vector unique ID + /// by accessing `getters` at `dv_start_index` and the following two indices. Specifically: + /// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`). + /// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`). + /// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`). + fn extract_dv_unique_id<'a>( + &self, + i: usize, + getters: &[&'a dyn GetData<'a>], + dv_start_index: usize, + ) -> DeltaResult> { + let Some(storage_type) = + getters[dv_start_index].get_opt(i, "deletionVector.storageType")? + else { + return Ok(None); + }; + let path_or_inline = getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?; + let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?; + + Ok(Some(DeletionVectorDescriptor::unique_id_from_parts( + storage_type, + path_or_inline, + offset, + ))) + } +} diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index b91b1f8c9a..dcdd72e199 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -12,6 +12,7 @@ use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef}; use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _}; +use crate::log_replay::deduplicator::Deduplicator; use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; use crate::scan::Scalar; use crate::schema::ToSchema as _; @@ -80,6 +81,15 @@ pub(crate) struct ScanLogReplayProcessor { } impl ScanLogReplayProcessor { + // These index positions correspond to the order of columns defined in + // `selected_column_names_and_types()` + const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters + const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters + const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns + const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters + const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters + const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns + /// Create a new [`ScanLogReplayProcessor`] instance pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { Self::new_with_seen_files(engine, state_info, Default::default()) @@ -234,40 +244,23 @@ impl ScanLogReplayProcessor { /// replay visits actions newest-first, so once we've seen a file action for a given (path, dvId) /// pair, we should ignore all subsequent (older) actions for that same (path, dvId) pair. If the /// first action for a given file is a remove, then that file does not show up in the result at all. -struct AddRemoveDedupVisitor<'seen> { - deduplicator: FileActionDeduplicator<'seen>, +struct AddRemoveDedupVisitor { + deduplicator: D, selection_vector: Vec, state_info: Arc, partition_filter: Option, row_transform_exprs: Vec>, } -impl AddRemoveDedupVisitor<'_> { - // These index positions correspond to the order of columns defined in - // `selected_column_names_and_types()` - const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters - const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters - const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns - const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters - const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters - const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns - +impl AddRemoveDedupVisitor { fn new( - seen: &mut HashSet, + deduplicator: D, selection_vector: Vec, state_info: Arc, partition_filter: Option, - is_log_batch: bool, - ) -> AddRemoveDedupVisitor<'_> { + ) -> AddRemoveDedupVisitor { AddRemoveDedupVisitor { - deduplicator: FileActionDeduplicator::new( - seen, - is_log_batch, - Self::ADD_PATH_INDEX, - Self::REMOVE_PATH_INDEX, - Self::ADD_DV_START_INDEX, - Self::REMOVE_DV_START_INDEX, - ), + deduplicator, selection_vector, state_info, partition_filter, @@ -319,8 +312,8 @@ impl AddRemoveDedupVisitor<'_> { // encounter if the table's schema was replaced after the most recent checkpoint. let partition_values = match &self.state_info.transform_spec { Some(transform) if is_add => { - let partition_values = - getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?; + let partition_values = getters[ScanLogReplayProcessor::ADD_PARTITION_VALUES_INDEX] + .get(i, "add.partitionValues")?; let partition_values = parse_partition_values( &self.state_info.logical_schema, transform, @@ -340,7 +333,7 @@ impl AddRemoveDedupVisitor<'_> { return Ok(false); } let base_row_id: Option = - getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?; + getters[ScanLogReplayProcessor::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?; let transform = self .state_info .transform_spec @@ -363,7 +356,7 @@ impl AddRemoveDedupVisitor<'_> { } } -impl RowVisitor for AddRemoveDedupVisitor<'_> { +impl RowVisitor for AddRemoveDedupVisitor { fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { // NOTE: The visitor assumes a schema with adds first and removes optionally afterward. static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { @@ -509,12 +502,19 @@ impl LogReplayProcessor for ScanLogReplayProcessor { let selection_vector = self.build_selection_vector(actions.as_ref())?; assert_eq!(selection_vector.len(), actions.len()); - let mut visitor = AddRemoveDedupVisitor::new( + let deduplicator = FileActionDeduplicator::new( &mut self.seen_file_keys, + is_log_batch, + Self::ADD_PATH_INDEX, + Self::REMOVE_PATH_INDEX, + Self::ADD_DV_START_INDEX, + Self::REMOVE_DV_START_INDEX, + ); + let mut visitor = AddRemoveDedupVisitor::new( + deduplicator, selection_vector, self.state_info.clone(), self.partition_filter.clone(), - is_log_batch, ); visitor.visit_rows_of(actions.as_ref())?; From 6865ff9760fa5716bc75d92cef9d43422804118d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 26 Nov 2025 16:00:11 -0800 Subject: [PATCH 09/10] checkpoint deduplicator --- kernel/src/log_replay.rs | 92 +++++++++++++++++++++++++++ kernel/src/log_replay/deduplicator.rs | 57 +++++++++++++++++ kernel/src/scan/log_replay.rs | 48 +++++++++++++- 3 files changed, 195 insertions(+), 2 deletions(-) diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index 8881a2c3cf..e922ccc7dd 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -203,6 +203,13 @@ impl ActionsBatch { } } +#[internal_api] +#[allow(unused)] +pub(crate) trait ParallelizableLogReplayProcessor { + type Output; + fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult; +} + /// A trait for processing batches of actions from Delta transaction logs during log replay. /// /// Log replay processors scan transaction logs in **reverse chronological order** (newest to oldest), @@ -336,6 +343,7 @@ pub(crate) trait HasSelectionVector { #[cfg(test)] mod tests { + use super::deduplicator::CheckpointDeduplicator; use super::*; use crate::engine_data::GetData; use crate::DeltaResult; @@ -584,4 +592,88 @@ mod tests { let deduplicator_checkpoint = create_deduplicator(&mut seen, false); assert!(!deduplicator_checkpoint.is_log_batch()); } + + // ==================== CheckpointDeduplicator Tests ==================== + + #[test] + fn test_checkpoint_extract_file_action_add() -> DeltaResult<()> { + let seen = HashSet::new(); + let deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?; + + let mut mock_add = MockGetData::new(); + mock_add.add_string(0, "add.path", "checkpoint_file.parquet"); + let getters = create_getters_with_mocks(Some(&mock_add), None); + let result = deduplicator.extract_file_action(0, &getters, false)?; + + assert!(result.is_some()); + let (key, is_add) = result.unwrap(); + assert_eq!(key.path, "checkpoint_file.parquet"); + assert!(key.dv_unique_id.is_none()); + assert!(is_add); + + Ok(()) + } + + #[test] + fn test_checkpoint_extract_file_action_with_deletion_vector() -> DeltaResult<()> { + let seen = HashSet::new(); + let deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?; + + let mut mock_dv = MockGetData::new(); + mock_dv.add_string(0, "add.path", "file_with_dv.parquet"); + mock_dv.add_string(0, "deletionVector.storageType", "s3"); + mock_dv.add_string(0, "deletionVector.pathOrInlineDv", "path/to/dv"); + mock_dv.add_int(0, "deletionVector.offset", 100); + let getters = create_getters_with_mocks(Some(&mock_dv), None); + let result = deduplicator.extract_file_action(0, &getters, false)?; + + assert!(result.is_some()); + let (key, is_add) = result.unwrap(); + assert_eq!(key.path, "file_with_dv.parquet"); + assert!(matches!( + key.dv_unique_id.as_deref(), + Some("s3path/to/dv@100") + )); + assert!(is_add); + + Ok(()) + } + + #[test] + fn test_checkpoint_deduplicator_filters_commit_duplicates() -> DeltaResult<()> { + let mut seen = HashSet::new(); + + // Files "seen" during commit processing + seen.insert(FileActionKey::new("modified_in_commit.parquet", None)); + seen.insert(FileActionKey::new( + "modified_with_dv.parquet", + Some("dv123".to_string()), + )); + + let mut deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?; + + // File modified in commit - should be filtered from checkpoint + let commit_modified = FileActionKey::new("modified_in_commit.parquet", None); + assert!( + deduplicator.check_and_record_seen(commit_modified), + "Files seen in commits should be filtered from checkpoint" + ); + + // File with DV modified in commit - should be filtered + let commit_modified_dv = + FileActionKey::new("modified_with_dv.parquet", Some("dv123".to_string())); + assert!( + deduplicator.check_and_record_seen(commit_modified_dv), + "Files with DVs seen in commits should be filtered from checkpoint" + ); + + // File only in checkpoint - should NOT be filtered + let checkpoint_only = FileActionKey::new("checkpoint_only.parquet", None); + assert!( + !deduplicator.check_and_record_seen(checkpoint_only), + "Files only in checkpoint should not be filtered" + ); + + Ok(()) + } } diff --git a/kernel/src/log_replay/deduplicator.rs b/kernel/src/log_replay/deduplicator.rs index e8fd6993a5..ba91271b10 100644 --- a/kernel/src/log_replay/deduplicator.rs +++ b/kernel/src/log_replay/deduplicator.rs @@ -10,6 +10,8 @@ //! //! [`FileActionDeduplicator`]: crate::log_replay::FileActionDeduplicator +use std::collections::HashSet; + use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::engine_data::{GetData, TypedGetData}; use crate::log_replay::FileActionKey; @@ -63,3 +65,58 @@ pub(crate) trait Deduplicator { ))) } } + +/// Read-only deduplicator for checkpoint processing. +/// +/// Unlike [`FileActionDeduplicator`] which mutably tracks files, this uses an immutable +/// reference to filter checkpoint actions against files already seen from commits. +/// Only handles add actions (no removes), and never modifies the seen set. +/// +/// [`FileActionDeduplicator`]: crate::log_replay::FileActionDeduplicator +#[allow(unused)] +pub(crate) struct CheckpointDeduplicator<'a> { + seen_file_keys: &'a HashSet, + add_path_index: usize, + add_dv_start_index: usize, +} + +impl<'a> CheckpointDeduplicator<'a> { + #[allow(unused)] + pub(crate) fn try_new( + seen_file_keys: &'a HashSet, + add_path_index: usize, + add_dv_start_index: usize, + ) -> DeltaResult { + Ok(CheckpointDeduplicator { + seen_file_keys, + add_path_index, + add_dv_start_index, + }) + } +} + +impl Deduplicator for CheckpointDeduplicator<'_> { + /// Extracts add action key only (checkpoints skip removes). `skip_removes` is ignored. + fn extract_file_action<'b>( + &self, + i: usize, + getters: &[&'b dyn GetData<'b>], + _skip_removes: bool, + ) -> DeltaResult> { + let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? else { + return Ok(None); + }; + let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?; + Ok(Some((FileActionKey::new(path, dv_unique_id), true))) + } + + /// Read-only check against seen set. Returns `true` if file should be filtered out. + fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { + self.seen_file_keys.contains(&key) + } + + /// Always `false` - checkpoint batches never update the seen set. + fn is_log_batch(&self) -> bool { + false + } +} diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index dcdd72e199..016b97543d 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -12,8 +12,11 @@ use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef}; use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _}; -use crate::log_replay::deduplicator::Deduplicator; -use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; +use crate::log_replay::deduplicator::{CheckpointDeduplicator, Deduplicator}; +use crate::log_replay::{ + ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor, + ParallelizableLogReplayProcessor, +}; use crate::scan::Scalar; use crate::schema::ToSchema as _; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType}; @@ -488,6 +491,47 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef { EXPR.clone() } +impl ParallelizableLogReplayProcessor for ScanLogReplayProcessor { + type Output = ::Output; + fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult { + let ActionsBatch { + actions, + is_log_batch, + } = actions_batch; + require!( + !is_log_batch, + Error::generic("Parallel checkpoint processor may only be applied to checkpoint files") + ); + + // Build an initial selection vector for the batch which has had the data skipping filter + // applied. The selection vector is further updated by the deduplication visitor to remove + // rows that are not valid adds. + let selection_vector = self.build_selection_vector(actions.as_ref())?; + assert_eq!(selection_vector.len(), actions.len()); + + let deduplicator = CheckpointDeduplicator::try_new( + &self.seen_file_keys, + Self::ADD_PATH_INDEX, + Self::ADD_DV_START_INDEX, + )?; + let mut visitor = AddRemoveDedupVisitor::new( + deduplicator, + selection_vector, + self.state_info.clone(), + self.partition_filter.clone(), + ); + visitor.visit_rows_of(actions.as_ref())?; + + // TODO: Teach expression eval to respect the selection vector we just computed so carefully! + let result = self.add_transform.evaluate(actions.as_ref())?; + ScanMetadata::try_new( + result, + visitor.selection_vector, + visitor.row_transform_exprs, + ) + } +} + impl LogReplayProcessor for ScanLogReplayProcessor { type Output = ScanMetadata; From 8d59594b63db32fdf208026f8784b7622675e3d8 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 16:20:38 -0800 Subject: [PATCH 10/10] add leaf reader --- kernel/src/log_reader/checkpoint_leaf.rs | 136 +++++++++++++++++++++++ kernel/src/log_reader/mod.rs | 1 + 2 files changed, 137 insertions(+) create mode 100644 kernel/src/log_reader/checkpoint_leaf.rs diff --git a/kernel/src/log_reader/checkpoint_leaf.rs b/kernel/src/log_reader/checkpoint_leaf.rs new file mode 100644 index 0000000000..e65244e4f3 --- /dev/null +++ b/kernel/src/log_reader/checkpoint_leaf.rs @@ -0,0 +1,136 @@ +//! Sidecar phase for log replay - processes sidecar/leaf parquet files. + +use std::sync::Arc; + +use itertools::Itertools; + +use crate::log_replay::ActionsBatch; +use crate::schema::SchemaRef; +use crate::{DeltaResult, Engine, FileMeta}; + +/// Phase that processes a leaf-level checkpoint file. A leaf-level checkpoint is any checkpoint +/// file that does not reference another checkpoint file. This includes: +/// - Sidecar files in a table with V2-Checkpoint that contains a manifest file +/// - Multi-part checkpoint files +/// - Single-part checkpoint files +#[allow(unused)] +pub(crate) struct CheckpointLeafReader { + actions: Box> + Send>, +} + +impl CheckpointLeafReader { + /// Create a new sidecar phase from file list. + /// + /// # Parameters + /// - `files`: Sidecar/leaf files to process + /// - `engine`: Engine for reading files + /// - `schema`: Schema to use when reading sidecar files (projected based on processor requirements) + #[allow(unused)] + pub(crate) fn try_new( + engine: Arc, + files: Vec, + schema: SchemaRef, + ) -> DeltaResult { + let actions = engine + .parquet_handler() + .read_parquet_files(&files, schema, None)? + .map_ok(|batch| ActionsBatch::new(batch, false)); + + Ok(Self { + actions: Box::new(actions), + }) + } +} + +impl Iterator for CheckpointLeafReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + self.actions.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::actions::{get_commit_schema, ADD_NAME}; + use crate::arrow::array::{Array, StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use crate::log_reader::checkpoint_manifest::CheckpointManifestReader; + use crate::utils::test_utils::load_test_table; + use itertools::Itertools; + + #[test] + fn test_sidecar_phase_processes_files() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?; + let log_segment = snapshot.log_segment(); + assert_eq!( + log_segment.checkpoint_parts.len(), + 1, + "There should be a single manifest checkpoint" + ); + + let mut manifest_phase = CheckpointManifestReader::try_new( + engine.clone(), + &log_segment.checkpoint_parts[0], + log_segment.log_root.clone(), + )?; + + // Drain manifest phase + for batch in manifest_phase.by_ref() { + let _batch = batch?; + } + + let sidecars = manifest_phase.extract_sidecars()?; + assert_eq!(sidecars.len(), 2, "There should be two sidecar files"); + + let schema = get_commit_schema().project(&[ADD_NAME])?; + let sidecar_phase = CheckpointLeafReader::try_new(engine.clone(), sidecars, schema)?; + + let mut sidecar_file_paths = Vec::new(); + for result in sidecar_phase { + let batch = result?; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(!is_log_batch, "Sidecars should not be log batches"); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + sidecar_file_paths.extend(batch_paths); + } + + sidecar_file_paths.sort(); + + assert_eq!( + sidecar_file_paths.len(), + 101, + "CheckpointLeafReader should find exactly 101 files from sidecars" + ); + + // Verify first few files match expected (sampling to keep test readable) + let expected_first_files = [ + "test%25file%25prefix-part-00000-01086c52-1b86-48d0-8889-517fe626849d-c000.snappy.parquet", + "test%25file%25prefix-part-00000-0fd71c0e-fd08-4685-87d6-aae77532d3ea-c000.snappy.parquet", + "test%25file%25prefix-part-00000-2710dd7f-9fa5-429d-b3fb-c005ba16e062-c000.snappy.parquet", + ]; + + assert_eq!( + &sidecar_file_paths[..3], + &expected_first_files[..], + "CheckpointLeafReader should process files in expected order" + ); + + Ok(()) + } +} diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs index b3c344e09b..b3d0a5f50c 100644 --- a/kernel/src/log_reader/mod.rs +++ b/kernel/src/log_reader/mod.rs @@ -1,2 +1,3 @@ +pub(crate) mod checkpoint_leaf; pub(crate) mod checkpoint_manifest; pub(crate) mod commit;