From 9116c74c04204bd82a2f21ae8e314d167a9c04ac Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 14:56:43 -0800 Subject: [PATCH 01/14] commit reader --- kernel/src/lib.rs | 1 + kernel/src/log_reader/commit.rs | 131 ++++++++++++++++++++++++++++++++ kernel/src/log_reader/mod.rs | 1 + kernel/src/log_segment.rs | 13 +--- kernel/src/scan/log_replay.rs | 2 +- 5 files changed, 137 insertions(+), 11 deletions(-) create mode 100644 kernel/src/log_reader/commit.rs create mode 100644 kernel/src/log_reader/mod.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index c299531af3..505b742619 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -93,6 +93,7 @@ pub mod error; pub mod expressions; mod log_compaction; mod log_path; +mod log_reader; pub mod metrics; pub mod scan; pub mod schema; diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs new file mode 100644 index 0000000000..e0d66c8955 --- /dev/null +++ b/kernel/src/log_reader/commit.rs @@ -0,0 +1,131 @@ +//! Commit phase for log replay - processes JSON commit files. + +use std::sync::Arc; + +use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME}; +use crate::log_replay::ActionsBatch; +use crate::log_segment::LogSegment; +use crate::schema::SchemaRef; +use crate::{DeltaResult, Engine}; + +/// Phase that processes JSON commit files. +pub(crate) struct CommitReader { + actions: Box> + Send>, +} + +impl CommitReader { + /// Create a new commit phase from a log segment. + /// + /// # Parameters + /// - `log_segment`: The log segment to process + /// - `engine`: Engine for reading files + pub(crate) fn try_new( + engine: &dyn Engine, + log_segment: &LogSegment, + schema: SchemaRef, + ) -> DeltaResult { + let commit_files = log_segment.find_commit_cover(); + let actions = engine + .json_handler() + .read_json_files(&commit_files, schema, None)? + .map(|batch| batch.map(|b| ActionsBatch::new(b, true))); + + Ok(Self { + actions: Box::new(actions), + }) + } +} + +impl Iterator for CommitReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + self.actions.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::DefaultEngine; + use crate::log_replay::LogReplayProcessor; + use crate::scan::log_replay::ScanLogReplayProcessor; + use crate::scan::state_info::StateInfo; + use object_store::local::LocalFileSystem; + use std::path::PathBuf; + use std::sync::Arc as StdArc; + + fn load_test_table( + table_name: &str, + ) -> DeltaResult<( + StdArc>, + StdArc, + url::Url, + )> { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/data"); + path.push(table_name); + + let path = std::fs::canonicalize(path) + .map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + + let url = url::Url::from_directory_path(path) + .map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?; + + let store = StdArc::new(LocalFileSystem::new()); + let engine = StdArc::new(DefaultEngine::new(store)); + let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + + Ok((engine, snapshot, url)) + } + + #[test] + fn test_commit_phase_processes_commits() -> DeltaResult<()> { + let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?; + let log_segment = StdArc::new(snapshot.log_segment().clone()); + + let state_info = StdArc::new(StateInfo::try_new( + snapshot.schema(), + snapshot.table_configuration(), + None, + (), + )?); + + let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + let mut commit_phase = CommitPhase::try_new(&log_segment, engine.clone())?; + + let mut batch_count = 0; + let mut file_paths = Vec::new(); + + for result in commit_phase { + let batch = result?; + let metadata = processor.process_actions_batch(batch)?; + let paths = metadata.visit_scan_files( + vec![], + |ps: &mut Vec, path, _, _, _, _, _| { + ps.push(path.to_string()); + }, + )?; + file_paths.extend(paths); + batch_count += 1; + } + + // table-without-dv-small has exactly 1 commit file + assert_eq!( + batch_count, 1, + "table-without-dv-small should have exactly 1 commit batch" + ); + + // table-without-dv-small has exactly 1 add file + file_paths.sort(); + let expected_files = + vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"]; + assert_eq!( + file_paths, expected_files, + "CommitPhase should find exactly the expected file" + ); + + Ok(()) + } +} diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs new file mode 100644 index 0000000000..d1337ef59a --- /dev/null +++ b/kernel/src/log_reader/mod.rs @@ -0,0 +1 @@ +pub(crate) mod commit; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 3b223e633e..10b60275a0 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -11,6 +11,7 @@ use crate::actions::{ PROTOCOL_NAME, SIDECAR_NAME, }; use crate::last_checkpoint_hint::LastCheckpointHint; +use crate::log_reader::commit::CommitReader; use crate::log_replay::ActionsBatch; use crate::metrics::{MetricEvent, MetricId, MetricsReporter}; use crate::path::{LogPathFileType, ParsedLogPath}; @@ -330,15 +331,7 @@ impl LogSegment { meta_predicate: Option, ) -> DeltaResult> + Send> { // `replay` expects commit files to be sorted in descending order, so the return value here is correct - let commits_and_compactions = self.find_commit_cover(); - let commit_stream = engine - .json_handler() - .read_json_files( - &commits_and_compactions, - commit_read_schema, - meta_predicate.clone(), - )? - .map_ok(|batch| ActionsBatch::new(batch, true)); + let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; let checkpoint_stream = self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; @@ -367,7 +360,7 @@ impl LogSegment { /// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes /// that all files in `self.ascending_commit_files` and `self.ascending_compaction_files` are in /// range for this log segment. This invariant is maintained by our listing code. - fn find_commit_cover(&self) -> Vec { + pub(crate) fn find_commit_cover(&self) -> Vec { // Create an iterator sorted in ascending order by (initial version, end version), e.g. // [00.json, 00.09.compacted.json, 00.99.compacted.json, 01.json, 02.json, ..., 10.json, // 10.19.compacted.json, 11.json, ...] diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 7bf69eb728..e0fe51b3fb 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor { impl ScanLogReplayProcessor { /// Create a new [`ScanLogReplayProcessor`] instance - fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { + pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { // Extract the physical predicate from StateInfo's PhysicalPredicate enum. // The DataSkippingFilter and partition_filter components expect the predicate // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from From 7c04cd7a47bf38c8eddb845bb1ec0fac1689bca0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 15:46:59 -0800 Subject: [PATCH 02/14] improve commit --- kernel/src/log_reader/commit.rs | 6 ++++-- kernel/src/scan/mod.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index e0d66c8955..17ac94bf38 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -52,6 +52,7 @@ mod tests { use crate::log_replay::LogReplayProcessor; use crate::scan::log_replay::ScanLogReplayProcessor; use crate::scan::state_info::StateInfo; + use crate::scan::COMMIT_READ_SCHEMA; use object_store::local::LocalFileSystem; use std::path::PathBuf; use std::sync::Arc as StdArc; @@ -93,7 +94,8 @@ mod tests { )?); let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; - let mut commit_phase = CommitPhase::try_new(&log_segment, engine.clone())?; + let schema = COMMIT_READ_SCHEMA.clone(); + let mut commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?; let mut batch_count = 0; let mut file_paths = Vec::new(); @@ -123,7 +125,7 @@ mod tests { vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"]; assert_eq!( file_paths, expected_files, - "CommitPhase should find exactly the expected file" + "CommitReader should find exactly the expected file" ); Ok(()) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 5146e78aba..5ac8482f14 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -41,7 +41,7 @@ pub(crate) mod state_info; // safety: we define get_commit_schema() and _know_ it contains ADD_NAME and REMOVE_NAME #[allow(clippy::unwrap_used)] -static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { +pub(crate) static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { get_commit_schema() .project(&[ADD_NAME, REMOVE_NAME]) .unwrap() From 20c2a95ce19c63431fa6b2a5bfcd22f9ab7cfbf2 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 10:41:55 -0800 Subject: [PATCH 03/14] cleanup --- kernel/src/log_reader/commit.rs | 98 ++++++++++++++++----------------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index 17ac94bf38..9c6a3dbe0b 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -1,14 +1,11 @@ //! Commit phase for log replay - processes JSON commit files. -use std::sync::Arc; - -use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME}; use crate::log_replay::ActionsBatch; use crate::log_segment::LogSegment; use crate::schema::SchemaRef; use crate::{DeltaResult, Engine}; -/// Phase that processes JSON commit files. +/// Phase that processes JSON commit files as [`ActionsBatch`] pub(crate) struct CommitReader { actions: Box> + Send>, } @@ -17,8 +14,9 @@ impl CommitReader { /// Create a new commit phase from a log segment. /// /// # Parameters - /// - `log_segment`: The log segment to process /// - `engine`: Engine for reading files + /// - `log_segment`: The log segment to process + /// - `schema`: The schema to read the json files pub(crate) fn try_new( engine: &dyn Engine, log_segment: &LogSegment, @@ -47,21 +45,23 @@ impl Iterator for CommitReader { #[cfg(test)] mod tests { use super::*; + use crate::arrow::array::{StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; - use crate::log_replay::LogReplayProcessor; - use crate::scan::log_replay::ScanLogReplayProcessor; - use crate::scan::state_info::StateInfo; use crate::scan::COMMIT_READ_SCHEMA; + use crate::{Error, Snapshot}; + use itertools::Itertools; use object_store::local::LocalFileSystem; use std::path::PathBuf; - use std::sync::Arc as StdArc; + use std::sync::Arc; + use url::Url; fn load_test_table( table_name: &str, ) -> DeltaResult<( - StdArc>, - StdArc, + Arc>, + Arc, url::Url, )> { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -69,60 +69,56 @@ mod tests { path.push(table_name); let path = std::fs::canonicalize(path) - .map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; - let url = url::Url::from_directory_path(path) - .map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?; + let url = Url::from_directory_path(path) + .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; - let store = StdArc::new(LocalFileSystem::new()); - let engine = StdArc::new(DefaultEngine::new(store)); - let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; Ok((engine, snapshot, url)) } #[test] - fn test_commit_phase_processes_commits() -> DeltaResult<()> { - let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?; - let log_segment = StdArc::new(snapshot.log_segment().clone()); - - let state_info = StdArc::new(StateInfo::try_new( - snapshot.schema(), - snapshot.table_configuration(), - None, - (), - )?); - - let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; - let schema = COMMIT_READ_SCHEMA.clone(); - let mut commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?; + fn test_commit_phase_processes_commits() -> Result<(), Box> { + let (engine, snapshot, _url) = load_test_table("app-txn-no-checkpoint")?; + let log_segment = Arc::new(snapshot.log_segment().clone()); - let mut batch_count = 0; - let mut file_paths = Vec::new(); + let schema = COMMIT_READ_SCHEMA.clone(); + let commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?; + let mut file_paths = vec![]; for result in commit_phase { let batch = result?; - let metadata = processor.process_actions_batch(batch)?; - let paths = metadata.visit_scan_files( - vec![], - |ps: &mut Vec, path, _, _, _, _, _| { - ps.push(path.to_string()); - }, - )?; - file_paths.extend(paths); - batch_count += 1; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(is_log_batch); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(|s| s.to_string()).collect_vec(); + file_paths.extend(batch_paths); } - // table-without-dv-small has exactly 1 commit file - assert_eq!( - batch_count, 1, - "table-without-dv-small should have exactly 1 commit batch" - ); - - // table-without-dv-small has exactly 1 add file file_paths.sort(); - let expected_files = - vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"]; + let mut expected_files = + vec!["modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet", "modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet" + ,"modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet","modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet" + ]; + expected_files.sort(); assert_eq!( file_paths, expected_files, "CommitReader should find exactly the expected file" From 703dc4ee1c20ac0858691aa9ecd0ec8a2d788cf4 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 10:56:14 -0800 Subject: [PATCH 04/14] commit reader --- kernel/src/log_reader/commit.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index 9c6a3dbe0b..da67eb2fda 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -5,7 +5,7 @@ use crate::log_segment::LogSegment; use crate::schema::SchemaRef; use crate::{DeltaResult, Engine}; -/// Phase that processes JSON commit files as [`ActionsBatch`] +/// Phase that processes JSON commit files into [`ActionsBatch`]s pub(crate) struct CommitReader { actions: Box> + Send>, } @@ -50,7 +50,7 @@ mod tests { use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; use crate::scan::COMMIT_READ_SCHEMA; - use crate::{Error, Snapshot}; + use crate::{Error, Snapshot, SnapshotRef}; use itertools::Itertools; use object_store::local::LocalFileSystem; use std::path::PathBuf; @@ -61,7 +61,7 @@ mod tests { table_name: &str, ) -> DeltaResult<( Arc>, - Arc, + SnapshotRef, url::Url, )> { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -109,19 +109,20 @@ mod tests { .downcast_ref::() .unwrap(); - let batch_paths = path.iter().flatten().map(|s| s.to_string()).collect_vec(); + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); file_paths.extend(batch_paths); } file_paths.sort(); - let mut expected_files = - vec!["modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet", "modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet" - ,"modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet","modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet" - ]; - expected_files.sort(); + let expected_files = vec![ + "modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet", + "modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet", + "modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet", + "modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet", + ]; assert_eq!( file_paths, expected_files, - "CommitReader should find exactly the expected file" + "CommitReader should find exactly the expected files" ); Ok(()) From fc9c3b674a25cbfde470c8c2fc94bec46ba8a1e0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 15:20:45 -0800 Subject: [PATCH 05/14] manifest v1 --- kernel/src/log_reader/manifest.rs | 289 ++++++++++++++++++++++++++++++ kernel/src/log_reader/mod.rs | 1 + 2 files changed, 290 insertions(+) create mode 100644 kernel/src/log_reader/manifest.rs diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs new file mode 100644 index 0000000000..b94e29a472 --- /dev/null +++ b/kernel/src/log_reader/manifest.rs @@ -0,0 +1,289 @@ +//! Manifest phase for log replay - processes single-part checkpoint manifest files. + +use std::sync::Arc; + +use url::Url; + +use crate::actions::Sidecar; +use crate::actions::{get_all_actions_schema, visitors::SidecarVisitor, SIDECAR_NAME}; +use crate::expressions::Transform; +use crate::log_replay::ActionsBatch; +use crate::schema::{Schema, SchemaRef, StructField, ToSchema}; +use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMeta, RowVisitor}; + +/// Phase that processes single-part checkpoint manifest files. +/// +/// Extracts sidecar references while processing the manifest. +pub(crate) struct ManifestPhase { + actions: Box> + Send>, + sidecar_visitor: SidecarVisitor, + original_schema: SchemaRef, + log_root: Url, +} + +/// Possible transitions after ManifestPhase completes. +pub(crate) enum AfterManifest { + /// Has sidecars → return sidecar files + Sidecars { sidecars: Vec }, + /// No sidecars + Done, +} + +impl ManifestPhase { + /// Create a new manifest phase for a single-part checkpoint. + /// + /// The schema is automatically augmented with the sidecar column since the manifest + /// phase needs to extract sidecar references for phase transitions. + /// + /// # Parameters + /// - `manifest_file`: The checkpoint manifest file to process + /// - `log_root`: Root URL for resolving sidecar paths + /// - `engine`: Engine for reading files + /// - `base_schema`: Schema columns required by the processor (will be augmented with sidecar) + pub fn new( + manifest_file: FileMeta, + log_root: Url, + engine: Arc, + ) -> DeltaResult { + let files = vec![manifest_file.clone()]; + + // Determine file type from extension + let extension = manifest_file + .location + .path() + .rsplit('.') + .next() + .unwrap_or(""); + + let actions = match extension { + "json" => { + engine + .json_handler() + .read_json_files(&files, sidecar_schema.clone(), None)? + } + "parquet" => { + engine + .parquet_handler() + .read_parquet_files(&files, sidecar_schema.clone(), None)? + } + ext => { + return Err(Error::generic(format!( + "Unsupported checkpoint extension: {}", + ext + ))) + } + }; + + let actions = actions.map(|batch| batch.map(|b| ActionsBatch::new(b, false))); + + Ok(Self { + actions: Box::new(actions), + sidecar_visitor: SidecarVisitor::default(), + log_root, + original_schema, + projector, + }) + } + + /// Transition to the next phase. + /// + /// Returns an enum indicating what comes next: + /// - `Sidecars`: Extracted sidecar files + /// - `Done`: No sidecars found + pub(crate) fn finalize(self) -> DeltaResult { + // TODO: Check that stream is exhausted. We can track a boolean flag on whether we saw a None yet. + let sidecars = self + .sidecar_visitor + .sidecars + .into_iter() + .map(|s| s.to_filemeta(&self.log_root)) + .collect::, _>>()?; + + if sidecars.is_empty() { + Ok(AfterManifest::Done) + } else { + Ok(AfterManifest::Sidecars { sidecars }) + } + } +} + +impl Iterator for ManifestPhase { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + self.actions.next().map(|batch_result| { + batch_result.and_then(|batch| { + // Extract sidecar references from the batch + self.sidecar_visitor.visit_rows_of(batch.actions())?; + + // Return the batch + // TODO: un-select sidecar actions + // TODO: project out sidecar actions + let batch = self.projector.evaluate(batch); + Ok(batch) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::DefaultEngine; + use crate::log_replay::LogReplayProcessor; + use crate::scan::log_replay::ScanLogReplayProcessor; + use crate::scan::state_info::StateInfo; + use object_store::local::LocalFileSystem; + use std::path::PathBuf; + use std::sync::Arc as StdArc; + + fn load_test_table( + table_name: &str, + ) -> DeltaResult<( + StdArc>, + StdArc, + url::Url, + )> { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/data"); + path.push(table_name); + + let path = std::fs::canonicalize(path) + .map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + + let url = url::Url::from_directory_path(path) + .map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?; + + let store = StdArc::new(LocalFileSystem::new()); + let engine = StdArc::new(DefaultEngine::new(store)); + let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + + Ok((engine, snapshot, url)) + } + + #[test] + fn test_manifest_phase_with_checkpoint() -> DeltaResult<()> { + // Use a table with v2 checkpoints where adds might be in sidecars + let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?; + let log_segment = snapshot.log_segment(); + + // Check if there are any checkpoint parts + if log_segment.checkpoint_parts.is_empty() { + println!("Test table has no checkpoint parts, skipping"); + return Ok(()); + } + + let state_info = StdArc::new(StateInfo::try_new( + snapshot.schema(), + snapshot.table_configuration(), + None, + (), + )?); + + let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + + // Get the first checkpoint part + let checkpoint_file = &log_segment.checkpoint_parts[0]; + let manifest_file = checkpoint_file.location.clone(); + + let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?; + + let mut manifest_phase = + ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?; + + // Count batches and collect results + let mut batch_count = 0; + let mut file_paths = Vec::new(); + + while let Some(result) = manifest_phase.next() { + let batch = result?; + let metadata = processor.process_actions_batch(batch)?; + let paths = metadata.visit_scan_files( + vec![], + |ps: &mut Vec, path, _, _, _, _, _| { + ps.push(path.to_string()); + }, + )?; + file_paths.extend(paths); + batch_count += 1; + } + + // For v2 checkpoints with sidecars, the manifest might not contain adds directly. + // In this test table, all adds are in sidecars, so manifest should be empty. + assert_eq!( + batch_count, 1, + "Single manifest file should produce exactly 1 batch" + ); + + // Verify the manifest itself contains no add files (they're all in sidecars) + file_paths.sort(); + assert_eq!( + file_paths.len(), 0, + "For this v2 checkpoint with sidecars, manifest should contain 0 add files (all in sidecars)" + ); + + Ok(()) + } + + #[test] + fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { + let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?; + let log_segment = snapshot.log_segment(); + + if log_segment.checkpoint_parts.is_empty() { + println!("Test table has no checkpoint parts, skipping"); + return Ok(()); + } + + let checkpoint_file = &log_segment.checkpoint_parts[0]; + let manifest_file = checkpoint_file.location.clone(); + + let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?; + + let mut manifest_phase = + ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?; + + // Drain the phase + while manifest_phase.next().is_some() {} + + // Check if sidecars were collected + let next = manifest_phase.into_next()?; + + match next { + AfterManifest::Sidecars { sidecars } => { + // For the v2-checkpoints-json-with-sidecars test table at version 6, + // there are exactly 2 sidecar files + assert_eq!( + sidecars.len(), + 2, + "Should collect exactly 2 sidecars for checkpoint at version 6" + ); + + // Extract and verify the sidecar paths + let mut collected_paths: Vec = sidecars + .iter() + .map(|fm| { + // Get the filename from the URL path + fm.location + .path_segments() + .and_then(|segments| segments.last()) + .unwrap_or("") + .to_string() + }) + .collect(); + + collected_paths.sort(); + + // Verify they're the expected sidecar files for version 6 + assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet"); + assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet"); + } + AfterManifest::Done => { + panic!("Expected sidecars for v2-checkpoints-json-with-sidecars table"); + } + } + + Ok(()) + } +} diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs index d1337ef59a..1ae42a4ba4 100644 --- a/kernel/src/log_reader/mod.rs +++ b/kernel/src/log_reader/mod.rs @@ -1 +1,2 @@ pub(crate) mod commit; +pub(crate) mod manifest; From b2bf9d399b6ecf86a6da1e1fb414db0fda88c346 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 15:38:15 -0800 Subject: [PATCH 06/14] manifest --- kernel/src/log_reader/manifest.rs | 70 ++++++++++++++++++------------- kernel/src/scan/mod.rs | 2 +- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index b94e29a472..9e83472fe7 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -1,14 +1,16 @@ //! Manifest phase for log replay - processes single-part checkpoint manifest files. -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; +use itertools::Itertools; use url::Url; -use crate::actions::Sidecar; use crate::actions::{get_all_actions_schema, visitors::SidecarVisitor, SIDECAR_NAME}; +use crate::actions::{get_commit_schema, Sidecar, ADD_NAME}; use crate::expressions::Transform; use crate::log_replay::ActionsBatch; use crate::schema::{Schema, SchemaRef, StructField, ToSchema}; +use crate::utils::require; use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMeta, RowVisitor}; /// Phase that processes single-part checkpoint manifest files. @@ -17,8 +19,9 @@ use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMet pub(crate) struct ManifestPhase { actions: Box> + Send>, sidecar_visitor: SidecarVisitor, - original_schema: SchemaRef, + manifest_file: FileMeta, log_root: Url, + is_complete: bool, } /// Possible transitions after ManifestPhase completes. @@ -39,12 +42,18 @@ impl ManifestPhase { /// - `manifest_file`: The checkpoint manifest file to process /// - `log_root`: Root URL for resolving sidecar paths /// - `engine`: Engine for reading files - /// - `base_schema`: Schema columns required by the processor (will be augmented with sidecar) pub fn new( manifest_file: FileMeta, log_root: Url, engine: Arc, ) -> DeltaResult { + #[allow(clippy::unwrap_used)] + static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { + get_commit_schema() + .project(&[ADD_NAME, SIDECAR_NAME]) + .unwrap() + }); + let files = vec![manifest_file.clone()]; // Determine file type from extension @@ -59,13 +68,13 @@ impl ManifestPhase { "json" => { engine .json_handler() - .read_json_files(&files, sidecar_schema.clone(), None)? - } - "parquet" => { - engine - .parquet_handler() - .read_parquet_files(&files, sidecar_schema.clone(), None)? + .read_json_files(&files, MANIFEST_READ_SCHMEA.clone(), None)? } + "parquet" => engine.parquet_handler().read_parquet_files( + &files, + MANIFEST_READ_SCHMEA.clone(), + None, + )?, ext => { return Err(Error::generic(format!( "Unsupported checkpoint extension: {}", @@ -80,8 +89,8 @@ impl ManifestPhase { actions: Box::new(actions), sidecar_visitor: SidecarVisitor::default(), log_root, - original_schema, - projector, + manifest_file, + is_complete: false, }) } @@ -91,13 +100,20 @@ impl ManifestPhase { /// - `Sidecars`: Extracted sidecar files /// - `Done`: No sidecars found pub(crate) fn finalize(self) -> DeltaResult { - // TODO: Check that stream is exhausted. We can track a boolean flag on whether we saw a None yet. - let sidecars = self + require!( + self.is_complete, + Error::generic(format!( + "Finalized called on ManifestReader for file {:?}", + self.manifest_file.location + )) + ); + + let sidecars: Vec<_> = self .sidecar_visitor .sidecars .into_iter() .map(|s| s.to_filemeta(&self.log_root)) - .collect::, _>>()?; + .try_collect()?; if sidecars.is_empty() { Ok(AfterManifest::Done) @@ -111,18 +127,18 @@ impl Iterator for ManifestPhase { type Item = DeltaResult; fn next(&mut self) -> Option { - self.actions.next().map(|batch_result| { + let result = self.actions.next().map(|batch_result| { batch_result.and_then(|batch| { - // Extract sidecar references from the batch self.sidecar_visitor.visit_rows_of(batch.actions())?; - - // Return the batch - // TODO: un-select sidecar actions - // TODO: project out sidecar actions - let batch = self.projector.evaluate(batch); Ok(batch) }) - }) + }); + + if result.is_none() { + self.is_complete = true; + } + + result } } @@ -187,10 +203,8 @@ mod tests { let checkpoint_file = &log_segment.checkpoint_parts[0]; let manifest_file = checkpoint_file.location.clone(); - let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?; - let mut manifest_phase = - ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?; + ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; // Count batches and collect results let mut batch_count = 0; @@ -242,13 +256,13 @@ mod tests { let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?; let mut manifest_phase = - ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?; + ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; // Drain the phase while manifest_phase.next().is_some() {} // Check if sidecars were collected - let next = manifest_phase.into_next()?; + let next = manifest_phase.finalize()?; match next { AfterManifest::Sidecars { sidecars } => { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 5ac8482f14..9ad23dd02e 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -13,7 +13,7 @@ use self::log_replay::get_scan_metadata_transform_expr; use crate::actions::deletion_vector::{ deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor, }; -use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME}; +use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME}; use crate::engine_data::FilteredEngineData; use crate::expressions::transforms::ExpressionTransform; use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Scalar}; From 8ccc3a615948e6c45971d9419ef1c04cabd2f4d6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 15:50:46 -0800 Subject: [PATCH 07/14] allow_unused --- kernel/src/log_reader/manifest.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index 9e83472fe7..f22984063a 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -5,17 +5,17 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use url::Url; -use crate::actions::{get_all_actions_schema, visitors::SidecarVisitor, SIDECAR_NAME}; -use crate::actions::{get_commit_schema, Sidecar, ADD_NAME}; -use crate::expressions::Transform; +use crate::actions::{get_commit_schema, ADD_NAME}; +use crate::actions::{visitors::SidecarVisitor, SIDECAR_NAME}; use crate::log_replay::ActionsBatch; -use crate::schema::{Schema, SchemaRef, StructField, ToSchema}; +use crate::schema::SchemaRef; use crate::utils::require; -use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMeta, RowVisitor}; +use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; /// Phase that processes single-part checkpoint manifest files. /// /// Extracts sidecar references while processing the manifest. +#[allow(unused)] pub(crate) struct ManifestPhase { actions: Box> + Send>, sidecar_visitor: SidecarVisitor, @@ -25,6 +25,7 @@ pub(crate) struct ManifestPhase { } /// Possible transitions after ManifestPhase completes. +#[allow(unused)] pub(crate) enum AfterManifest { /// Has sidecars → return sidecar files Sidecars { sidecars: Vec }, @@ -42,7 +43,8 @@ impl ManifestPhase { /// - `manifest_file`: The checkpoint manifest file to process /// - `log_root`: Root URL for resolving sidecar paths /// - `engine`: Engine for reading files - pub fn new( + #[allow(unused)] + pub(crate) fn new( manifest_file: FileMeta, log_root: Url, engine: Arc, @@ -99,6 +101,7 @@ impl ManifestPhase { /// Returns an enum indicating what comes next: /// - `Sidecars`: Extracted sidecar files /// - `Done`: No sidecars found + #[allow(unused)] pub(crate) fn finalize(self) -> DeltaResult { require!( self.is_complete, From e63e7a84962a0e13108dbb82eb3f611ffbf1bfca Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 18 Nov 2025 16:05:21 -0800 Subject: [PATCH 08/14] improve test --- kernel/src/log_reader/manifest.rs | 69 ++++++++++++------------------- 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index f22984063a..cfd133780e 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -5,10 +5,11 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use url::Url; -use crate::actions::{get_commit_schema, ADD_NAME}; -use crate::actions::{visitors::SidecarVisitor, SIDECAR_NAME}; +use crate::actions::visitors::SidecarVisitor; +use crate::actions::SIDECAR_NAME; +use crate::actions::{Add, Sidecar, ADD_NAME}; use crate::log_replay::ActionsBatch; -use crate::schema::SchemaRef; +use crate::schema::{SchemaRef, StructField, StructType, ToSchema}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; @@ -49,11 +50,11 @@ impl ManifestPhase { log_root: Url, engine: Arc, ) -> DeltaResult { - #[allow(clippy::unwrap_used)] static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { - get_commit_schema() - .project(&[ADD_NAME, SIDECAR_NAME]) - .unwrap() + Arc::new(StructType::new_unchecked([ + StructField::nullable(ADD_NAME, Add::to_schema()), + StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()), + ])) }); let files = vec![manifest_file.clone()]; @@ -148,43 +149,37 @@ impl Iterator for ManifestPhase { #[cfg(test)] mod tests { use super::*; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; use crate::log_replay::LogReplayProcessor; use crate::scan::log_replay::ScanLogReplayProcessor; use crate::scan::state_info::StateInfo; + use crate::SnapshotRef; use object_store::local::LocalFileSystem; - use std::path::PathBuf; - use std::sync::Arc as StdArc; + use std::sync::Arc; + use tempfile::TempDir; fn load_test_table( table_name: &str, - ) -> DeltaResult<( - StdArc>, - StdArc, - url::Url, - )> { - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.push("tests/data"); - path.push(table_name); - - let path = std::fs::canonicalize(path) - .map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?; - - let url = url::Url::from_directory_path(path) + ) -> DeltaResult<(Arc, SnapshotRef, Url, TempDir)> { + let test_dir = test_utils::load_test_data("tests/data", table_name) + .map_err(|e| crate::Error::Generic(format!("Failed to load test data: {}", e)))?; + let test_path = test_dir.path().join(table_name); + + let url = url::Url::from_directory_path(&test_path) .map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?; - let store = StdArc::new(LocalFileSystem::new()); - let engine = StdArc::new(DefaultEngine::new(store)); + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?; - Ok((engine, snapshot, url)) + Ok((engine, snapshot, url, test_dir)) } #[test] fn test_manifest_phase_with_checkpoint() -> DeltaResult<()> { // Use a table with v2 checkpoints where adds might be in sidecars - let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?; + let (engine, snapshot, log_root, _tempdir) = + load_test_table("v2-checkpoints-json-with-sidecars")?; let log_segment = snapshot.log_segment(); // Check if there are any checkpoint parts @@ -193,7 +188,7 @@ mod tests { return Ok(()); } - let state_info = StdArc::new(StateInfo::try_new( + let state_info = Arc::new(StateInfo::try_new( snapshot.schema(), snapshot.table_configuration(), None, @@ -210,10 +205,9 @@ mod tests { ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; // Count batches and collect results - let mut batch_count = 0; let mut file_paths = Vec::new(); - while let Some(result) = manifest_phase.next() { + for result in manifest_phase { let batch = result?; let metadata = processor.process_actions_batch(batch)?; let paths = metadata.visit_scan_files( @@ -223,18 +217,8 @@ mod tests { }, )?; file_paths.extend(paths); - batch_count += 1; } - - // For v2 checkpoints with sidecars, the manifest might not contain adds directly. - // In this test table, all adds are in sidecars, so manifest should be empty. - assert_eq!( - batch_count, 1, - "Single manifest file should produce exactly 1 batch" - ); - // Verify the manifest itself contains no add files (they're all in sidecars) - file_paths.sort(); assert_eq!( file_paths.len(), 0, "For this v2 checkpoint with sidecars, manifest should contain 0 add files (all in sidecars)" @@ -245,7 +229,8 @@ mod tests { #[test] fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { - let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?; + let (engine, snapshot, log_root, _tempdir) = + load_test_table("v2-checkpoints-json-with-sidecars")?; let log_segment = snapshot.log_segment(); if log_segment.checkpoint_parts.is_empty() { @@ -256,8 +241,6 @@ mod tests { let checkpoint_file = &log_segment.checkpoint_parts[0]; let manifest_file = checkpoint_file.location.clone(); - let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?; - let mut manifest_phase = ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; From f95ab27a921a9caa5046127d55be1215b08664f2 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 14:56:23 -0800 Subject: [PATCH 09/14] fix manifest --- kernel/src/log_reader/commit.rs | 33 +--- kernel/src/log_reader/manifest.rs | 293 +++++++++++++++++------------- kernel/src/scan/mod.rs | 2 +- kernel/src/utils.rs | 38 +++- 4 files changed, 203 insertions(+), 163 deletions(-) diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index da67eb2fda..e98e719c57 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -47,43 +47,14 @@ mod tests { use super::*; use crate::arrow::array::{StringArray, StructArray}; use crate::engine::arrow_data::EngineDataArrowExt as _; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; - use crate::engine::default::DefaultEngine; use crate::scan::COMMIT_READ_SCHEMA; - use crate::{Error, Snapshot, SnapshotRef}; + use crate::utils::test_utils::load_extracted_test_table; use itertools::Itertools; - use object_store::local::LocalFileSystem; - use std::path::PathBuf; use std::sync::Arc; - use url::Url; - - fn load_test_table( - table_name: &str, - ) -> DeltaResult<( - Arc>, - SnapshotRef, - url::Url, - )> { - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.push("tests/data"); - path.push(table_name); - - let path = std::fs::canonicalize(path) - .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; - - let url = Url::from_directory_path(path) - .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; - - Ok((engine, snapshot, url)) - } #[test] fn test_commit_phase_processes_commits() -> Result<(), Box> { - let (engine, snapshot, _url) = load_test_table("app-txn-no-checkpoint")?; + let (engine, snapshot) = load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), "app-txn-no-checkpoint")?; let log_segment = Arc::new(snapshot.log_segment().clone()); let schema = COMMIT_READ_SCHEMA.clone(); diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index cfd133780e..1163dd07bb 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -6,9 +6,10 @@ use itertools::Itertools; use url::Url; use crate::actions::visitors::SidecarVisitor; -use crate::actions::SIDECAR_NAME; -use crate::actions::{Add, Sidecar, ADD_NAME}; +use crate::actions::{Add, Remove, Sidecar, ADD_NAME}; +use crate::actions::{REMOVE_NAME, SIDECAR_NAME}; use crate::log_replay::ActionsBatch; +use crate::path::ParsedLogPath; use crate::schema::{SchemaRef, StructField, StructType, ToSchema}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; @@ -20,7 +21,6 @@ use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; pub(crate) struct ManifestPhase { actions: Box> + Send>, sidecar_visitor: SidecarVisitor, - manifest_file: FileMeta, log_root: Url, is_complete: bool, } @@ -45,54 +45,43 @@ impl ManifestPhase { /// - `log_root`: Root URL for resolving sidecar paths /// - `engine`: Engine for reading files #[allow(unused)] - pub(crate) fn new( - manifest_file: FileMeta, + pub(crate) fn try_new( + manifest: &ParsedLogPath, log_root: Url, engine: Arc, ) -> DeltaResult { static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked([ StructField::nullable(ADD_NAME, Add::to_schema()), + StructField::nullable(REMOVE_NAME, Remove::to_schema()), StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()), ])) }); - let files = vec![manifest_file.clone()]; - - // Determine file type from extension - let extension = manifest_file - .location - .path() - .rsplit('.') - .next() - .unwrap_or(""); - - let actions = match extension { - "json" => { - engine - .json_handler() - .read_json_files(&files, MANIFEST_READ_SCHMEA.clone(), None)? - } + let actions = match manifest.extension.as_str() { + "json" => engine.json_handler().read_json_files( + std::slice::from_ref(&manifest.location), + MANIFEST_READ_SCHMEA.clone(), + None, + )?, "parquet" => engine.parquet_handler().read_parquet_files( - &files, + std::slice::from_ref(&manifest.location), MANIFEST_READ_SCHMEA.clone(), None, )?, - ext => { + extension => { return Err(Error::generic(format!( "Unsupported checkpoint extension: {}", - ext - ))) + extension + ))); } }; - let actions = actions.map(|batch| batch.map(|b| ActionsBatch::new(b, false))); - + let actions = Box::new(actions.map(|batch_res| Ok(ActionsBatch::new(batch_res?, false)))); Ok(Self { - actions: Box::new(actions), + actions, sidecar_visitor: SidecarVisitor::default(), log_root, - manifest_file, is_complete: false, }) } @@ -106,10 +95,7 @@ impl ManifestPhase { pub(crate) fn finalize(self) -> DeltaResult { require!( self.is_complete, - Error::generic(format!( - "Finalized called on ManifestReader for file {:?}", - self.manifest_file.location - )) + Error::generic("Finalize called on manifest reader but it was not exausted") ); let sidecars: Vec<_> = self @@ -149,115 +135,89 @@ impl Iterator for ManifestPhase { #[cfg(test)] mod tests { use super::*; - use crate::engine::default::DefaultEngine; - use crate::log_replay::LogReplayProcessor; - use crate::scan::log_replay::ScanLogReplayProcessor; - use crate::scan::state_info::StateInfo; + use crate::utils::test_utils::{ + assert_result_error_with_message, create_engine_and_snapshot_from_path, + load_extracted_test_table, + }; use crate::SnapshotRef; - use object_store::local::LocalFileSystem; use std::sync::Arc; - use tempfile::TempDir; - - fn load_test_table( - table_name: &str, - ) -> DeltaResult<(Arc, SnapshotRef, Url, TempDir)> { - let test_dir = test_utils::load_test_data("tests/data", table_name) - .map_err(|e| crate::Error::Generic(format!("Failed to load test data: {}", e)))?; - let test_path = test_dir.path().join(table_name); - let url = url::Url::from_directory_path(&test_path) - .map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?; - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?; - - Ok((engine, snapshot, url, test_dir)) - } + /// Core helper function to test manifest phase with expected add paths and sidecars + fn verify_manifest_phase( + engine: Arc, + snapshot: SnapshotRef, + expected_add_paths: &[&str], + expected_sidecars: &[&str], + ) -> DeltaResult<()> { + use crate::arrow::array::{Array, StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use itertools::Itertools; - #[test] - fn test_manifest_phase_with_checkpoint() -> DeltaResult<()> { - // Use a table with v2 checkpoints where adds might be in sidecars - let (engine, snapshot, log_root, _tempdir) = - load_test_table("v2-checkpoints-json-with-sidecars")?; let log_segment = snapshot.log_segment(); - // Check if there are any checkpoint parts if log_segment.checkpoint_parts.is_empty() { - println!("Test table has no checkpoint parts, skipping"); - return Ok(()); + panic!("Test table has no checkpoint parts"); } - let state_info = Arc::new(StateInfo::try_new( - snapshot.schema(), - snapshot.table_configuration(), - None, - (), - )?); - - let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; - - // Get the first checkpoint part let checkpoint_file = &log_segment.checkpoint_parts[0]; - let manifest_file = checkpoint_file.location.clone(); - let mut manifest_phase = - ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; + let mut manifest_phase = ManifestPhase::try_new( + checkpoint_file, + snapshot.log_segment().log_root.clone(), + engine.clone(), + )?; - // Count batches and collect results - let mut file_paths = Vec::new(); - - for result in manifest_phase { + // Extract add file paths and verify expectations + let mut file_paths = vec![]; + for result in manifest_phase.by_ref() { let batch = result?; - let metadata = processor.process_actions_batch(batch)?; - let paths = metadata.visit_scan_files( - vec![], - |ps: &mut Vec, path, _, _, _, _, _| { - ps.push(path.to_string()); - }, - )?; - file_paths.extend(paths); + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(!is_log_batch, "Manifest should not be a log batch"); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + + // If we expect no add paths (they're in sidecars), verify all adds are null + // Extract add paths + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + file_paths.extend(batch_paths); } - // Verify the manifest itself contains no add files (they're all in sidecars) + + // Verify collected add paths + file_paths.sort(); assert_eq!( - file_paths.len(), 0, - "For this v2 checkpoint with sidecars, manifest should contain 0 add files (all in sidecars)" + file_paths, expected_add_paths, + "ManifestPhase should extract expected Add file paths from checkpoint" ); - Ok(()) - } - - #[test] - fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { - let (engine, snapshot, log_root, _tempdir) = - load_test_table("v2-checkpoints-json-with-sidecars")?; - let log_segment = snapshot.log_segment(); - - if log_segment.checkpoint_parts.is_empty() { - println!("Test table has no checkpoint parts, skipping"); - return Ok(()); - } - - let checkpoint_file = &log_segment.checkpoint_parts[0]; - let manifest_file = checkpoint_file.location.clone(); - - let mut manifest_phase = - ManifestPhase::new(manifest_file, log_root.clone(), engine.clone())?; - - // Drain the phase - while manifest_phase.next().is_some() {} - - // Check if sidecars were collected + // Check sidecars let next = manifest_phase.finalize()?; - match next { - AfterManifest::Sidecars { sidecars } => { - // For the v2-checkpoints-json-with-sidecars test table at version 6, - // there are exactly 2 sidecar files + match (next, expected_sidecars) { + (AfterManifest::Sidecars { .. }, []) => { + panic!("Expected to be Done, but found Sidecars") + } + (AfterManifest::Done, []) => { /* Empty expected sidecars is Done */ } + (AfterManifest::Done, sidecars) => { + panic!("Expected manifest phase to be Done, but got {:?}", sidecars) + } + (AfterManifest::Sidecars { sidecars }, expected_sidecars) => { assert_eq!( sidecars.len(), - 2, - "Should collect exactly 2 sidecars for checkpoint at version 6" + expected_sidecars.len(), + "Should collect exactly {} sidecars", + expected_sidecars.len() ); // Extract and verify the sidecar paths @@ -267,23 +227,98 @@ mod tests { // Get the filename from the URL path fm.location .path_segments() - .and_then(|segments| segments.last()) + .and_then(|mut segments| segments.next_back()) .unwrap_or("") .to_string() }) .collect(); collected_paths.sort(); - - // Verify they're the expected sidecar files for version 6 - assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet"); - assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet"); - } - AfterManifest::Done => { - panic!("Expected sidecars for v2-checkpoints-json-with-sidecars table"); + // Verify they're the expected sidecar files + assert_eq!(collected_paths, expected_sidecars.to_vec()); } } Ok(()) } + + /// Helper function to test manifest phase with expected add paths and sidecars. + /// Works with both compressed (tar.zst) and already-extracted test tables. + fn test_manifest_phase( + table_name: &str, + expected_add_paths: &[&str], + expected_sidecars: &[&str], + ) -> DeltaResult<()> { + // Try loading as compressed table first, fall back to extracted + let (engine, snapshot, _tempdir) = + match test_utils::load_test_data("tests/data", table_name) { + Ok(test_dir) => { + let test_path = test_dir.path().join(table_name); + let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?; + (engine, snapshot, Some(test_dir)) + } + Err(_) => { + let (engine, snapshot) = + load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), table_name)?; + (engine, snapshot, None) + } + }; + + verify_manifest_phase(engine, snapshot, expected_add_paths, expected_sidecars) + } + + #[test] + fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> { + test_manifest_phase( + "with_checkpoint_no_last_checkpoint", + &["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"], + &[], // No sidecars + ) + } + + #[test] + fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { + let (engine, snapshot) = load_extracted_test_table( + env!("CARGO_MANIFEST_DIR"), + "with_checkpoint_no_last_checkpoint", + )?; + + let manifest_phase = ManifestPhase::try_new( + &snapshot.log_segment().checkpoint_parts[0], + snapshot.log_segment().log_root.clone(), + engine.clone(), + )?; + + // Attempt to finalize without draining the iterator + let result = manifest_phase.finalize(); + + // Should get an error about not being exhausted + assert_result_error_with_message(result, "not exausted"); + + Ok(()) + } + + #[test] + fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { + test_manifest_phase( + "v2-checkpoints-json-with-sidecars", + &[], // No add paths in manifest (they're in sidecars) + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet", + ], + ) + } + + #[test] + fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> { + test_manifest_phase( + "v2-checkpoints-parquet-with-sidecars", + &[], // No add paths in manifest (they're in sidecars) + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet", + ], + ) + } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 9ad23dd02e..5ac8482f14 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -13,7 +13,7 @@ use self::log_replay::get_scan_metadata_transform_expr; use crate::actions::deletion_vector::{ deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor, }; -use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME}; +use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME}; use crate::engine_data::FilteredEngineData; use crate::expressions::transforms::ExpressionTransform; use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Scalar}; diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 665ab7e336..a08c9bed01 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -152,9 +152,10 @@ pub(crate) mod test_utils { use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; + use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; - use crate::Engine; - use crate::EngineData; + use crate::{DeltaResult, EngineData, Error, SnapshotRef}; + use crate::{Engine, Snapshot}; use itertools::Itertools; use object_store::local::LocalFileSystem; @@ -163,6 +164,7 @@ pub(crate) mod test_utils { use std::{path::Path, sync::Arc}; use tempfile::TempDir; use test_utils::delta_path_for_version; + use url::Url; #[derive(Serialize)] pub(crate) enum Action { @@ -283,6 +285,38 @@ pub(crate) mod test_utils { } } } + + /// Helper to create engine and snapshot from a path. + /// Returns (engine, snapshot) tuple. + pub(crate) fn create_engine_and_snapshot_from_path( + path: &Path, + ) -> DeltaResult<(Arc, SnapshotRef)> { + let url = Url::from_directory_path(path) + .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + Ok((engine, snapshot)) + } + + /// Load an already-extracted test table from the filesystem. + /// Returns (engine, snapshot) tuple. + pub(crate) fn load_extracted_test_table( + manifest_dir: &str, + table_name: &str, + ) -> DeltaResult<(Arc, SnapshotRef)> { + use std::path::PathBuf; + + let mut path = PathBuf::from(manifest_dir); + path.push("tests/data"); + path.push(table_name); + + let path = std::fs::canonicalize(path) + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + + create_engine_and_snapshot_from_path(&path) + } } #[cfg(test)] From 3ac0de83b71ebc8a3508a98e2c1157e351e8faef Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 15:22:55 -0800 Subject: [PATCH 10/14] more cleanup --- kernel/src/log_reader/commit.rs | 2 +- kernel/src/log_reader/manifest.rs | 72 +++++++++++-------------------- kernel/src/utils.rs | 5 +-- 3 files changed, 29 insertions(+), 50 deletions(-) diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index e98e719c57..74c2a17967 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -54,7 +54,7 @@ mod tests { #[test] fn test_commit_phase_processes_commits() -> Result<(), Box> { - let (engine, snapshot) = load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), "app-txn-no-checkpoint")?; + let (engine, snapshot) = load_extracted_test_table("app-txn-no-checkpoint")?; let log_segment = Arc::new(snapshot.log_segment().clone()); let schema = COMMIT_READ_SCHEMA.clone(); diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index 1163dd07bb..3e69298200 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -1,4 +1,4 @@ -//! Manifest phase for log replay - processes single-part checkpoint manifest files. +//! Manifest phase for log replay - processes single-part checkpoints and manifest checkpoints. use std::sync::{Arc, LazyLock}; @@ -14,9 +14,8 @@ use crate::schema::{SchemaRef, StructField, StructType, ToSchema}; use crate::utils::require; use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; -/// Phase that processes single-part checkpoint manifest files. -/// -/// Extracts sidecar references while processing the manifest. +/// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file +/// and extracts the sidecar actions during iteration. #[allow(unused)] pub(crate) struct ManifestPhase { actions: Box> + Send>, @@ -28,8 +27,8 @@ pub(crate) struct ManifestPhase { /// Possible transitions after ManifestPhase completes. #[allow(unused)] pub(crate) enum AfterManifest { - /// Has sidecars → return sidecar files - Sidecars { sidecars: Vec }, + /// Sidecars extracted from the manifest phase + Sidecars(Vec), /// No sidecars Done, } @@ -108,7 +107,7 @@ impl ManifestPhase { if sidecars.is_empty() { Ok(AfterManifest::Done) } else { - Ok(AfterManifest::Sidecars { sidecars }) + Ok(AfterManifest::Sidecars(sidecars)) } } } @@ -139,8 +138,10 @@ mod tests { assert_result_error_with_message, create_engine_and_snapshot_from_path, load_extracted_test_table, }; + use crate::SnapshotRef; use std::sync::Arc; + use test_utils::load_test_data; /// Core helper function to test manifest phase with expected add paths and sidecars fn verify_manifest_phase( @@ -154,18 +155,10 @@ mod tests { use itertools::Itertools; let log_segment = snapshot.log_segment(); - - if log_segment.checkpoint_parts.is_empty() { - panic!("Test table has no checkpoint parts"); - } - + let log_root = log_segment.log_root.clone(); + assert_eq!(log_segment.checkpoint_parts.len(), 1); let checkpoint_file = &log_segment.checkpoint_parts[0]; - - let mut manifest_phase = ManifestPhase::try_new( - checkpoint_file, - snapshot.log_segment().log_root.clone(), - engine.clone(), - )?; + let mut manifest_phase = ManifestPhase::try_new(checkpoint_file, log_root, engine.clone())?; // Extract add file paths and verify expectations let mut file_paths = vec![]; @@ -180,9 +173,6 @@ mod tests { let record_batch = actions.try_into_record_batch()?; let add = record_batch.column_by_name("add").unwrap(); let add_struct = add.as_any().downcast_ref::().unwrap(); - - // If we expect no add paths (they're in sidecars), verify all adds are null - // Extract add paths let path = add_struct .column_by_name("path") .unwrap() @@ -205,14 +195,14 @@ mod tests { let next = manifest_phase.finalize()?; match (next, expected_sidecars) { - (AfterManifest::Sidecars { .. }, []) => { - panic!("Expected to be Done, but found Sidecars") + (AfterManifest::Sidecars(sidecars), []) => { + panic!("Expected to be Done, but found Sidecars: {:?}", sidecars) } (AfterManifest::Done, []) => { /* Empty expected sidecars is Done */ } (AfterManifest::Done, sidecars) => { panic!("Expected manifest phase to be Done, but got {:?}", sidecars) } - (AfterManifest::Sidecars { sidecars }, expected_sidecars) => { + (AfterManifest::Sidecars(sidecars), expected_sidecars) => { assert_eq!( sidecars.len(), expected_sidecars.len(), @@ -224,7 +214,6 @@ mod tests { let mut collected_paths: Vec = sidecars .iter() .map(|fm| { - // Get the filename from the URL path fm.location .path_segments() .and_then(|mut segments| segments.next_back()) @@ -250,19 +239,17 @@ mod tests { expected_sidecars: &[&str], ) -> DeltaResult<()> { // Try loading as compressed table first, fall back to extracted - let (engine, snapshot, _tempdir) = - match test_utils::load_test_data("tests/data", table_name) { - Ok(test_dir) => { - let test_path = test_dir.path().join(table_name); - let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?; - (engine, snapshot, Some(test_dir)) - } - Err(_) => { - let (engine, snapshot) = - load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), table_name)?; - (engine, snapshot, None) - } - }; + let (engine, snapshot, _tempdir) = match load_test_data("tests/data", table_name) { + Ok(test_dir) => { + let test_path = test_dir.path().join(table_name); + let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?; + (engine, snapshot, Some(test_dir)) + } + Err(_) => { + let (engine, snapshot) = load_extracted_test_table(table_name)?; + (engine, snapshot, None) + } + }; verify_manifest_phase(engine, snapshot, expected_add_paths, expected_sidecars) } @@ -278,10 +265,7 @@ mod tests { #[test] fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { - let (engine, snapshot) = load_extracted_test_table( - env!("CARGO_MANIFEST_DIR"), - "with_checkpoint_no_last_checkpoint", - )?; + let (engine, snapshot) = load_extracted_test_table("with_checkpoint_no_last_checkpoint")?; let manifest_phase = ManifestPhase::try_new( &snapshot.log_segment().checkpoint_parts[0], @@ -289,12 +273,8 @@ mod tests { engine.clone(), )?; - // Attempt to finalize without draining the iterator let result = manifest_phase.finalize(); - - // Should get an error about not being exhausted assert_result_error_with_message(result, "not exausted"); - Ok(()) } diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index a08c9bed01..473ce2947c 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -161,6 +161,7 @@ pub(crate) mod test_utils { use object_store::local::LocalFileSystem; use object_store::ObjectStore; use serde::Serialize; + use std::path::PathBuf; use std::{path::Path, sync::Arc}; use tempfile::TempDir; use test_utils::delta_path_for_version; @@ -303,11 +304,9 @@ pub(crate) mod test_utils { /// Load an already-extracted test table from the filesystem. /// Returns (engine, snapshot) tuple. pub(crate) fn load_extracted_test_table( - manifest_dir: &str, table_name: &str, ) -> DeltaResult<(Arc, SnapshotRef)> { - use std::path::PathBuf; - + let manifest_dir = env!("CARGO_MANIFEST_DIR"); let mut path = PathBuf::from(manifest_dir); path.push("tests/data"); path.push(table_name); From cba514127b523a7b21151cbdd62f6912d09af77a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 15:43:58 -0800 Subject: [PATCH 11/14] address pr reviews --- kernel/src/log_reader/manifest.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index 3e69298200..7e0d57b9e2 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -22,6 +22,7 @@ pub(crate) struct ManifestPhase { sidecar_visitor: SidecarVisitor, log_root: Url, is_complete: bool, + manifest_file: FileMeta, } /// Possible transitions after ManifestPhase completes. @@ -70,8 +71,7 @@ impl ManifestPhase { )?, extension => { return Err(Error::generic(format!( - "Unsupported checkpoint extension: {}", - extension + "Unsupported checkpoint extension: {extension}", ))); } }; @@ -82,6 +82,7 @@ impl ManifestPhase { sidecar_visitor: SidecarVisitor::default(), log_root, is_complete: false, + manifest_file: manifest.location.clone(), }) } @@ -94,7 +95,10 @@ impl ManifestPhase { pub(crate) fn finalize(self) -> DeltaResult { require!( self.is_complete, - Error::generic("Finalize called on manifest reader but it was not exausted") + Error::generic(format!( + "Cannot finalize in-progress ManifestReader for file: {}", + self.manifest_file.location + )) ); let sidecars: Vec<_> = self @@ -117,10 +121,9 @@ impl Iterator for ManifestPhase { fn next(&mut self) -> Option { let result = self.actions.next().map(|batch_result| { - batch_result.and_then(|batch| { - self.sidecar_visitor.visit_rows_of(batch.actions())?; - Ok(batch) - }) + let batch = batch_result?; + self.sidecar_visitor.visit_rows_of(batch.actions())?; + Ok(batch) }); if result.is_none() { @@ -134,12 +137,15 @@ impl Iterator for ManifestPhase { #[cfg(test)] mod tests { use super::*; + use crate::arrow::array::{Array, StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; use crate::utils::test_utils::{ assert_result_error_with_message, create_engine_and_snapshot_from_path, load_extracted_test_table, }; - use crate::SnapshotRef; + + use itertools::Itertools; use std::sync::Arc; use test_utils::load_test_data; @@ -150,10 +156,6 @@ mod tests { expected_add_paths: &[&str], expected_sidecars: &[&str], ) -> DeltaResult<()> { - use crate::arrow::array::{Array, StringArray, StructArray}; - use crate::engine::arrow_data::EngineDataArrowExt as _; - use itertools::Itertools; - let log_segment = snapshot.log_segment(); let log_root = log_segment.log_root.clone(); assert_eq!(log_segment.checkpoint_parts.len(), 1); @@ -274,7 +276,10 @@ mod tests { )?; let result = manifest_phase.finalize(); - assert_result_error_with_message(result, "not exausted"); + assert_result_error_with_message( + result, + "Cannot finalize in-progress ManifestReader for file", + ); Ok(()) } From bc84caa1465ce8c4d55d26450896b68b47d0455f Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 24 Nov 2025 15:55:24 -0800 Subject: [PATCH 12/14] rename to CheckpointManifestReader --- kernel/src/log_reader/manifest.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/manifest.rs index 7e0d57b9e2..62d623e874 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/manifest.rs @@ -17,7 +17,7 @@ use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; /// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file /// and extracts the sidecar actions during iteration. #[allow(unused)] -pub(crate) struct ManifestPhase { +pub(crate) struct CheckpointManifestReader { actions: Box> + Send>, sidecar_visitor: SidecarVisitor, log_root: Url, @@ -25,7 +25,7 @@ pub(crate) struct ManifestPhase { manifest_file: FileMeta, } -/// Possible transitions after ManifestPhase completes. +/// Possible transitions after CheckpointManifestReader completes. #[allow(unused)] pub(crate) enum AfterManifest { /// Sidecars extracted from the manifest phase @@ -34,7 +34,7 @@ pub(crate) enum AfterManifest { Done, } -impl ManifestPhase { +impl CheckpointManifestReader { /// Create a new manifest phase for a single-part checkpoint. /// /// The schema is automatically augmented with the sidecar column since the manifest @@ -116,7 +116,7 @@ impl ManifestPhase { } } -impl Iterator for ManifestPhase { +impl Iterator for CheckpointManifestReader { type Item = DeltaResult; fn next(&mut self) -> Option { @@ -160,7 +160,8 @@ mod tests { let log_root = log_segment.log_root.clone(); assert_eq!(log_segment.checkpoint_parts.len(), 1); let checkpoint_file = &log_segment.checkpoint_parts[0]; - let mut manifest_phase = ManifestPhase::try_new(checkpoint_file, log_root, engine.clone())?; + let mut manifest_phase = + CheckpointManifestReader::try_new(checkpoint_file, log_root, engine.clone())?; // Extract add file paths and verify expectations let mut file_paths = vec![]; @@ -190,7 +191,7 @@ mod tests { file_paths.sort(); assert_eq!( file_paths, expected_add_paths, - "ManifestPhase should extract expected Add file paths from checkpoint" + "CheckpointManifestReader should extract expected Add file paths from checkpoint" ); // Check sidecars @@ -269,7 +270,7 @@ mod tests { fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { let (engine, snapshot) = load_extracted_test_table("with_checkpoint_no_last_checkpoint")?; - let manifest_phase = ManifestPhase::try_new( + let manifest_phase = CheckpointManifestReader::try_new( &snapshot.log_segment().checkpoint_parts[0], snapshot.log_segment().log_root.clone(), engine.clone(), From 1bcc46ec84649d967450fa1419b5ee3ff3d6bc71 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 25 Nov 2025 11:55:17 -0800 Subject: [PATCH 13/14] address comments --- .../{manifest.rs => checkpoint_manifest.rs} | 87 +++++++------------ kernel/src/log_reader/mod.rs | 2 +- 2 files changed, 31 insertions(+), 58 deletions(-) rename kernel/src/log_reader/{manifest.rs => checkpoint_manifest.rs} (79%) diff --git a/kernel/src/log_reader/manifest.rs b/kernel/src/log_reader/checkpoint_manifest.rs similarity index 79% rename from kernel/src/log_reader/manifest.rs rename to kernel/src/log_reader/checkpoint_manifest.rs index 62d623e874..1a9ca87753 100644 --- a/kernel/src/log_reader/manifest.rs +++ b/kernel/src/log_reader/checkpoint_manifest.rs @@ -25,15 +25,6 @@ pub(crate) struct CheckpointManifestReader { manifest_file: FileMeta, } -/// Possible transitions after CheckpointManifestReader completes. -#[allow(unused)] -pub(crate) enum AfterManifest { - /// Sidecars extracted from the manifest phase - Sidecars(Vec), - /// No sidecars - Done, -} - impl CheckpointManifestReader { /// Create a new manifest phase for a single-part checkpoint. /// @@ -86,17 +77,14 @@ impl CheckpointManifestReader { }) } - /// Transition to the next phase. - /// - /// Returns an enum indicating what comes next: - /// - `Sidecars`: Extracted sidecar files - /// - `Done`: No sidecars found + /// Extract the sidecars from the manifest file if there were any. + /// NOTE: The iterator must be completely exhausted before calling this #[allow(unused)] - pub(crate) fn finalize(self) -> DeltaResult { + pub(crate) fn extract_sidecars(self) -> DeltaResult> { require!( self.is_complete, Error::generic(format!( - "Cannot finalize in-progress ManifestReader for file: {}", + "Cannot extract sidecars from in-progress ManifestReader for file: {}", self.manifest_file.location )) ); @@ -108,11 +96,7 @@ impl CheckpointManifestReader { .map(|s| s.to_filemeta(&self.log_root)) .try_collect()?; - if sidecars.is_empty() { - Ok(AfterManifest::Done) - } else { - Ok(AfterManifest::Sidecars(sidecars)) - } + Ok(sidecars) } } @@ -195,41 +179,30 @@ mod tests { ); // Check sidecars - let next = manifest_phase.finalize()?; + let actual_sidecars = manifest_phase.extract_sidecars()?; - match (next, expected_sidecars) { - (AfterManifest::Sidecars(sidecars), []) => { - panic!("Expected to be Done, but found Sidecars: {:?}", sidecars) - } - (AfterManifest::Done, []) => { /* Empty expected sidecars is Done */ } - (AfterManifest::Done, sidecars) => { - panic!("Expected manifest phase to be Done, but got {:?}", sidecars) - } - (AfterManifest::Sidecars(sidecars), expected_sidecars) => { - assert_eq!( - sidecars.len(), - expected_sidecars.len(), - "Should collect exactly {} sidecars", - expected_sidecars.len() - ); - - // Extract and verify the sidecar paths - let mut collected_paths: Vec = sidecars - .iter() - .map(|fm| { - fm.location - .path_segments() - .and_then(|mut segments| segments.next_back()) - .unwrap_or("") - .to_string() - }) - .collect(); - - collected_paths.sort(); - // Verify they're the expected sidecar files - assert_eq!(collected_paths, expected_sidecars.to_vec()); - } - } + assert_eq!( + actual_sidecars.len(), + expected_sidecars.len(), + "Should collect exactly {} actual_sidecars", + expected_sidecars.len() + ); + + // Extract and verify the sidecar paths + let mut collected_paths: Vec = actual_sidecars + .iter() + .map(|fm| { + fm.location + .path_segments() + .and_then(|mut segments| segments.next_back()) + .unwrap_or("") + .to_string() + }) + .collect(); + + collected_paths.sort(); + // Verify they're the expected sidecar files + assert_eq!(collected_paths, expected_sidecars.to_vec()); Ok(()) } @@ -276,10 +249,10 @@ mod tests { engine.clone(), )?; - let result = manifest_phase.finalize(); + let result = manifest_phase.extract_sidecars(); assert_result_error_with_message( result, - "Cannot finalize in-progress ManifestReader for file", + "Cannot extract sidecars from in-progress ManifestReader for file", ); Ok(()) } diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs index 1ae42a4ba4..b3c344e09b 100644 --- a/kernel/src/log_reader/mod.rs +++ b/kernel/src/log_reader/mod.rs @@ -1,2 +1,2 @@ +pub(crate) mod checkpoint_manifest; pub(crate) mod commit; -pub(crate) mod manifest; From a534df501dc2c8c80014b439695e10f4355b6d63 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 1 Dec 2025 15:03:21 -0800 Subject: [PATCH 14/14] simplify --- kernel/src/log_reader/checkpoint_manifest.rs | 63 +++++++------------- kernel/src/log_reader/commit.rs | 4 +- kernel/src/utils.rs | 52 +++++++++------- 3 files changed, 52 insertions(+), 67 deletions(-) diff --git a/kernel/src/log_reader/checkpoint_manifest.rs b/kernel/src/log_reader/checkpoint_manifest.rs index 1a9ca87753..bf21a9e8ed 100644 --- a/kernel/src/log_reader/checkpoint_manifest.rs +++ b/kernel/src/log_reader/checkpoint_manifest.rs @@ -37,9 +37,9 @@ impl CheckpointManifestReader { /// - `engine`: Engine for reading files #[allow(unused)] pub(crate) fn try_new( + engine: Arc, manifest: &ParsedLogPath, log_root: Url, - engine: Arc, ) -> DeltaResult { static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked([ @@ -123,17 +123,13 @@ mod tests { use super::*; use crate::arrow::array::{Array, StringArray, StructArray}; use crate::engine::arrow_data::EngineDataArrowExt as _; - use crate::utils::test_utils::{ - assert_result_error_with_message, create_engine_and_snapshot_from_path, - load_extracted_test_table, - }; + use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; use crate::SnapshotRef; use itertools::Itertools; use std::sync::Arc; - use test_utils::load_test_data; - /// Core helper function to test manifest phase with expected add paths and sidecars + /// Helper function to test manifest phase with expected add paths and sidecars fn verify_manifest_phase( engine: Arc, snapshot: SnapshotRef, @@ -145,7 +141,7 @@ mod tests { assert_eq!(log_segment.checkpoint_parts.len(), 1); let checkpoint_file = &log_segment.checkpoint_parts[0]; let mut manifest_phase = - CheckpointManifestReader::try_new(checkpoint_file, log_root, engine.clone())?; + CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?; // Extract add file paths and verify expectations let mut file_paths = vec![]; @@ -207,46 +203,25 @@ mod tests { Ok(()) } - /// Helper function to test manifest phase with expected add paths and sidecars. - /// Works with both compressed (tar.zst) and already-extracted test tables. - fn test_manifest_phase( - table_name: &str, - expected_add_paths: &[&str], - expected_sidecars: &[&str], - ) -> DeltaResult<()> { - // Try loading as compressed table first, fall back to extracted - let (engine, snapshot, _tempdir) = match load_test_data("tests/data", table_name) { - Ok(test_dir) => { - let test_path = test_dir.path().join(table_name); - let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?; - (engine, snapshot, Some(test_dir)) - } - Err(_) => { - let (engine, snapshot) = load_extracted_test_table(table_name)?; - (engine, snapshot, None) - } - }; - - verify_manifest_phase(engine, snapshot, expected_add_paths, expected_sidecars) - } - #[test] fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> { - test_manifest_phase( - "with_checkpoint_no_last_checkpoint", + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; + verify_manifest_phase( + engine, + snapshot, &["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"], - &[], // No sidecars + &[], ) } #[test] fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { - let (engine, snapshot) = load_extracted_test_table("with_checkpoint_no_last_checkpoint")?; + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; let manifest_phase = CheckpointManifestReader::try_new( + engine.clone(), &snapshot.log_segment().checkpoint_parts[0], snapshot.log_segment().log_root.clone(), - engine.clone(), )?; let result = manifest_phase.extract_sidecars(); @@ -259,9 +234,11 @@ mod tests { #[test] fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { - test_manifest_phase( - "v2-checkpoints-json-with-sidecars", - &[], // No add paths in manifest (they're in sidecars) + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], &[ "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet", "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet", @@ -271,9 +248,11 @@ mod tests { #[test] fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> { - test_manifest_phase( - "v2-checkpoints-parquet-with-sidecars", - &[], // No add paths in manifest (they're in sidecars) + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], &[ "00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet", "00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet", diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs index 74c2a17967..0012802856 100644 --- a/kernel/src/log_reader/commit.rs +++ b/kernel/src/log_reader/commit.rs @@ -48,13 +48,13 @@ mod tests { use crate::arrow::array::{StringArray, StructArray}; use crate::engine::arrow_data::EngineDataArrowExt as _; use crate::scan::COMMIT_READ_SCHEMA; - use crate::utils::test_utils::load_extracted_test_table; + use crate::utils::test_utils::load_test_table; use itertools::Itertools; use std::sync::Arc; #[test] fn test_commit_phase_processes_commits() -> Result<(), Box> { - let (engine, snapshot) = load_extracted_test_table("app-txn-no-checkpoint")?; + let (engine, snapshot, _tempdir) = load_test_table("app-txn-no-checkpoint")?; let log_segment = Arc::new(snapshot.log_segment().clone()); let schema = COMMIT_READ_SCHEMA.clone(); diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 473ce2947c..f1240eb6cf 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -165,6 +165,7 @@ pub(crate) mod test_utils { use std::{path::Path, sync::Arc}; use tempfile::TempDir; use test_utils::delta_path_for_version; + use test_utils::load_test_data; use url::Url; #[derive(Serialize)] @@ -287,34 +288,39 @@ pub(crate) mod test_utils { } } - /// Helper to create engine and snapshot from a path. - /// Returns (engine, snapshot) tuple. - pub(crate) fn create_engine_and_snapshot_from_path( - path: &Path, - ) -> DeltaResult<(Arc, SnapshotRef)> { - let url = Url::from_directory_path(path) + /// Load a test table from tests/data directory. + /// Tries compressed (tar.zst) first, falls back to extracted. + /// Returns (engine, snapshot, optional tempdir). The TempDir must be kept alive + /// for the duration of the test to prevent premature cleanup of extracted files. + pub(crate) fn load_test_table( + table_name: &str, + ) -> DeltaResult<(Arc, SnapshotRef, Option)> { + // Try loading compressed table first, fall back to extracted + let (path, tempdir) = match load_test_data("tests/data", table_name) { + Ok(test_dir) => { + let test_path = test_dir.path().join(table_name); + (test_path, Some(test_dir)) + } + Err(_) => { + // Fall back to already-extracted table + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let mut path = PathBuf::from(manifest_dir); + path.push("tests/data"); + path.push(table_name); + let path = std::fs::canonicalize(path) + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + (path, None) + } + }; + + // Create engine and snapshot from the resolved path + let url = Url::from_directory_path(&path) .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; let store = Arc::new(LocalFileSystem::new()); let engine = Arc::new(DefaultEngine::new(store)); let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; - Ok((engine, snapshot)) - } - - /// Load an already-extracted test table from the filesystem. - /// Returns (engine, snapshot) tuple. - pub(crate) fn load_extracted_test_table( - table_name: &str, - ) -> DeltaResult<(Arc, SnapshotRef)> { - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - let mut path = PathBuf::from(manifest_dir); - path.push("tests/data"); - path.push(table_name); - - let path = std::fs::canonicalize(path) - .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; - - create_engine_and_snapshot_from_path(&path) + Ok((engine, snapshot, tempdir)) } }