diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index c299531af..505b74261 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -93,6 +93,7 @@ pub mod error; pub mod expressions; mod log_compaction; mod log_path; +mod log_reader; pub mod metrics; pub mod scan; pub mod schema; diff --git a/kernel/src/log_reader/checkpoint_manifest.rs b/kernel/src/log_reader/checkpoint_manifest.rs new file mode 100644 index 000000000..bf21a9e8e --- /dev/null +++ b/kernel/src/log_reader/checkpoint_manifest.rs @@ -0,0 +1,262 @@ +//! Manifest phase for log replay - processes single-part checkpoints and manifest checkpoints. + +use std::sync::{Arc, LazyLock}; + +use itertools::Itertools; +use url::Url; + +use crate::actions::visitors::SidecarVisitor; +use crate::actions::{Add, Remove, Sidecar, ADD_NAME}; +use crate::actions::{REMOVE_NAME, SIDECAR_NAME}; +use crate::log_replay::ActionsBatch; +use crate::path::ParsedLogPath; +use crate::schema::{SchemaRef, StructField, StructType, ToSchema}; +use crate::utils::require; +use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor}; + +/// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file +/// and extracts the sidecar actions during iteration. +#[allow(unused)] +pub(crate) struct CheckpointManifestReader { + actions: Box> + Send>, + sidecar_visitor: SidecarVisitor, + log_root: Url, + is_complete: bool, + manifest_file: FileMeta, +} + +impl CheckpointManifestReader { + /// Create a new manifest phase for a single-part checkpoint. + /// + /// The schema is automatically augmented with the sidecar column since the manifest + /// phase needs to extract sidecar references for phase transitions. + /// + /// # Parameters + /// - `manifest_file`: The checkpoint manifest file to process + /// - `log_root`: Root URL for resolving sidecar paths + /// - `engine`: Engine for reading files + #[allow(unused)] + pub(crate) fn try_new( + engine: Arc, + manifest: &ParsedLogPath, + log_root: Url, + ) -> DeltaResult { + static MANIFEST_READ_SCHMEA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new_unchecked([ + StructField::nullable(ADD_NAME, Add::to_schema()), + StructField::nullable(REMOVE_NAME, Remove::to_schema()), + StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()), + ])) + }); + + let actions = match manifest.extension.as_str() { + "json" => engine.json_handler().read_json_files( + std::slice::from_ref(&manifest.location), + MANIFEST_READ_SCHMEA.clone(), + None, + )?, + "parquet" => engine.parquet_handler().read_parquet_files( + std::slice::from_ref(&manifest.location), + MANIFEST_READ_SCHMEA.clone(), + None, + )?, + extension => { + return Err(Error::generic(format!( + "Unsupported checkpoint extension: {extension}", + ))); + } + }; + + let actions = Box::new(actions.map(|batch_res| Ok(ActionsBatch::new(batch_res?, false)))); + Ok(Self { + actions, + sidecar_visitor: SidecarVisitor::default(), + log_root, + is_complete: false, + manifest_file: manifest.location.clone(), + }) + } + + /// Extract the sidecars from the manifest file if there were any. + /// NOTE: The iterator must be completely exhausted before calling this + #[allow(unused)] + pub(crate) fn extract_sidecars(self) -> DeltaResult> { + require!( + self.is_complete, + Error::generic(format!( + "Cannot extract sidecars from in-progress ManifestReader for file: {}", + self.manifest_file.location + )) + ); + + let sidecars: Vec<_> = self + .sidecar_visitor + .sidecars + .into_iter() + .map(|s| s.to_filemeta(&self.log_root)) + .try_collect()?; + + Ok(sidecars) + } +} + +impl Iterator for CheckpointManifestReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + let result = self.actions.next().map(|batch_result| { + let batch = batch_result?; + self.sidecar_visitor.visit_rows_of(batch.actions())?; + Ok(batch) + }); + + if result.is_none() { + self.is_complete = true; + } + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{Array, StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; + use crate::SnapshotRef; + + use itertools::Itertools; + use std::sync::Arc; + + /// Helper function to test manifest phase with expected add paths and sidecars + fn verify_manifest_phase( + engine: Arc, + snapshot: SnapshotRef, + expected_add_paths: &[&str], + expected_sidecars: &[&str], + ) -> DeltaResult<()> { + let log_segment = snapshot.log_segment(); + let log_root = log_segment.log_root.clone(); + assert_eq!(log_segment.checkpoint_parts.len(), 1); + let checkpoint_file = &log_segment.checkpoint_parts[0]; + let mut manifest_phase = + CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?; + + // Extract add file paths and verify expectations + let mut file_paths = vec![]; + for result in manifest_phase.by_ref() { + let batch = result?; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(!is_log_batch, "Manifest should not be a log batch"); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + file_paths.extend(batch_paths); + } + + // Verify collected add paths + file_paths.sort(); + assert_eq!( + file_paths, expected_add_paths, + "CheckpointManifestReader should extract expected Add file paths from checkpoint" + ); + + // Check sidecars + let actual_sidecars = manifest_phase.extract_sidecars()?; + + assert_eq!( + actual_sidecars.len(), + expected_sidecars.len(), + "Should collect exactly {} actual_sidecars", + expected_sidecars.len() + ); + + // Extract and verify the sidecar paths + let mut collected_paths: Vec = actual_sidecars + .iter() + .map(|fm| { + fm.location + .path_segments() + .and_then(|mut segments| segments.next_back()) + .unwrap_or("") + .to_string() + }) + .collect(); + + collected_paths.sort(); + // Verify they're the expected sidecar files + assert_eq!(collected_paths, expected_sidecars.to_vec()); + + Ok(()) + } + + #[test] + fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; + verify_manifest_phase( + engine, + snapshot, + &["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"], + &[], + ) + } + + #[test] + fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?; + + let manifest_phase = CheckpointManifestReader::try_new( + engine.clone(), + &snapshot.log_segment().checkpoint_parts[0], + snapshot.log_segment().log_root.clone(), + )?; + + let result = manifest_phase.extract_sidecars(); + assert_result_error_with_message( + result, + "Cannot extract sidecars from in-progress ManifestReader for file", + ); + Ok(()) + } + + #[test] + fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet", + ], + ) + } + + #[test] + fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> { + let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?; + verify_manifest_phase( + engine, + snapshot, + &[], + &[ + "00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet", + "00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet", + ], + ) + } +} diff --git a/kernel/src/log_reader/commit.rs b/kernel/src/log_reader/commit.rs new file mode 100644 index 000000000..001280285 --- /dev/null +++ b/kernel/src/log_reader/commit.rs @@ -0,0 +1,101 @@ +//! Commit phase for log replay - processes JSON commit files. + +use crate::log_replay::ActionsBatch; +use crate::log_segment::LogSegment; +use crate::schema::SchemaRef; +use crate::{DeltaResult, Engine}; + +/// Phase that processes JSON commit files into [`ActionsBatch`]s +pub(crate) struct CommitReader { + actions: Box> + Send>, +} + +impl CommitReader { + /// Create a new commit phase from a log segment. + /// + /// # Parameters + /// - `engine`: Engine for reading files + /// - `log_segment`: The log segment to process + /// - `schema`: The schema to read the json files + pub(crate) fn try_new( + engine: &dyn Engine, + log_segment: &LogSegment, + schema: SchemaRef, + ) -> DeltaResult { + let commit_files = log_segment.find_commit_cover(); + let actions = engine + .json_handler() + .read_json_files(&commit_files, schema, None)? + .map(|batch| batch.map(|b| ActionsBatch::new(b, true))); + + Ok(Self { + actions: Box::new(actions), + }) + } +} + +impl Iterator for CommitReader { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + self.actions.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::{StringArray, StructArray}; + use crate::engine::arrow_data::EngineDataArrowExt as _; + use crate::scan::COMMIT_READ_SCHEMA; + use crate::utils::test_utils::load_test_table; + use itertools::Itertools; + use std::sync::Arc; + + #[test] + fn test_commit_phase_processes_commits() -> Result<(), Box> { + let (engine, snapshot, _tempdir) = load_test_table("app-txn-no-checkpoint")?; + let log_segment = Arc::new(snapshot.log_segment().clone()); + + let schema = COMMIT_READ_SCHEMA.clone(); + let commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?; + + let mut file_paths = vec![]; + for result in commit_phase { + let batch = result?; + let ActionsBatch { + actions, + is_log_batch, + } = batch; + assert!(is_log_batch); + + let record_batch = actions.try_into_record_batch()?; + let add = record_batch.column_by_name("add").unwrap(); + let add_struct = add.as_any().downcast_ref::().unwrap(); + + let path = add_struct + .column_by_name("path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec(); + file_paths.extend(batch_paths); + } + + file_paths.sort(); + let expected_files = vec![ + "modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet", + "modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet", + "modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet", + "modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet", + ]; + assert_eq!( + file_paths, expected_files, + "CommitReader should find exactly the expected files" + ); + + Ok(()) + } +} diff --git a/kernel/src/log_reader/mod.rs b/kernel/src/log_reader/mod.rs new file mode 100644 index 000000000..b3c344e09 --- /dev/null +++ b/kernel/src/log_reader/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod checkpoint_manifest; +pub(crate) mod commit; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 3b223e633..10b60275a 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -11,6 +11,7 @@ use crate::actions::{ PROTOCOL_NAME, SIDECAR_NAME, }; use crate::last_checkpoint_hint::LastCheckpointHint; +use crate::log_reader::commit::CommitReader; use crate::log_replay::ActionsBatch; use crate::metrics::{MetricEvent, MetricId, MetricsReporter}; use crate::path::{LogPathFileType, ParsedLogPath}; @@ -330,15 +331,7 @@ impl LogSegment { meta_predicate: Option, ) -> DeltaResult> + Send> { // `replay` expects commit files to be sorted in descending order, so the return value here is correct - let commits_and_compactions = self.find_commit_cover(); - let commit_stream = engine - .json_handler() - .read_json_files( - &commits_and_compactions, - commit_read_schema, - meta_predicate.clone(), - )? - .map_ok(|batch| ActionsBatch::new(batch, true)); + let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; let checkpoint_stream = self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; @@ -367,7 +360,7 @@ impl LogSegment { /// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes /// that all files in `self.ascending_commit_files` and `self.ascending_compaction_files` are in /// range for this log segment. This invariant is maintained by our listing code. - fn find_commit_cover(&self) -> Vec { + pub(crate) fn find_commit_cover(&self) -> Vec { // Create an iterator sorted in ascending order by (initial version, end version), e.g. // [00.json, 00.09.compacted.json, 00.99.compacted.json, 01.json, 02.json, ..., 10.json, // 10.19.compacted.json, 11.json, ...] diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 7bf69eb72..e0fe51b3f 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor { impl ScanLogReplayProcessor { /// Create a new [`ScanLogReplayProcessor`] instance - fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { + pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { // Extract the physical predicate from StateInfo's PhysicalPredicate enum. // The DataSkippingFilter and partition_filter components expect the predicate // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 5146e78ab..5ac8482f1 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -41,7 +41,7 @@ pub(crate) mod state_info; // safety: we define get_commit_schema() and _know_ it contains ADD_NAME and REMOVE_NAME #[allow(clippy::unwrap_used)] -static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { +pub(crate) static COMMIT_READ_SCHEMA: LazyLock = LazyLock::new(|| { get_commit_schema() .project(&[ADD_NAME, REMOVE_NAME]) .unwrap() diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 665ab7e33..f1240eb6c 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -152,17 +152,21 @@ pub(crate) mod test_utils { use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; + use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; - use crate::Engine; - use crate::EngineData; + use crate::{DeltaResult, EngineData, Error, SnapshotRef}; + use crate::{Engine, Snapshot}; use itertools::Itertools; use object_store::local::LocalFileSystem; use object_store::ObjectStore; use serde::Serialize; + use std::path::PathBuf; use std::{path::Path, sync::Arc}; use tempfile::TempDir; use test_utils::delta_path_for_version; + use test_utils::load_test_data; + use url::Url; #[derive(Serialize)] pub(crate) enum Action { @@ -283,6 +287,41 @@ pub(crate) mod test_utils { } } } + + /// Load a test table from tests/data directory. + /// Tries compressed (tar.zst) first, falls back to extracted. + /// Returns (engine, snapshot, optional tempdir). The TempDir must be kept alive + /// for the duration of the test to prevent premature cleanup of extracted files. + pub(crate) fn load_test_table( + table_name: &str, + ) -> DeltaResult<(Arc, SnapshotRef, Option)> { + // Try loading compressed table first, fall back to extracted + let (path, tempdir) = match load_test_data("tests/data", table_name) { + Ok(test_dir) => { + let test_path = test_dir.path().join(table_name); + (test_path, Some(test_dir)) + } + Err(_) => { + // Fall back to already-extracted table + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let mut path = PathBuf::from(manifest_dir); + path.push("tests/data"); + path.push(table_name); + let path = std::fs::canonicalize(path) + .map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?; + (path, None) + } + }; + + // Create engine and snapshot from the resolved path + let url = Url::from_directory_path(&path) + .map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + Ok((engine, snapshot, tempdir)) + } } #[cfg(test)]