Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
39b6132
commit reader
OussamaSaoudi Nov 18, 2025
95291f7
improve commit
OussamaSaoudi Nov 18, 2025
ed7888c
cleanup
OussamaSaoudi Nov 24, 2025
fd419c8
commit reader
OussamaSaoudi Nov 24, 2025
527b388
manifest v1
OussamaSaoudi Nov 18, 2025
33209b2
manifest
OussamaSaoudi Nov 18, 2025
548c0da
allow_unused
OussamaSaoudi Nov 18, 2025
aed338a
improve test
OussamaSaoudi Nov 19, 2025
d5c941e
fix manifest
OussamaSaoudi Nov 24, 2025
c28e6bb
more cleanup
OussamaSaoudi Nov 24, 2025
42545f4
address pr reviews
OussamaSaoudi Nov 24, 2025
7353753
rename to CheckpointManifestReader
OussamaSaoudi Nov 24, 2025
6d676c4
address comments
OussamaSaoudi Nov 25, 2025
011d91c
simplify
OussamaSaoudi Dec 1, 2025
c98829b
implement driver
OussamaSaoudi Nov 19, 2025
5822506
driver
OussamaSaoudi Nov 19, 2025
c1dd394
driver
OussamaSaoudi Nov 21, 2025
aa1c82e
fix driver state management
OussamaSaoudi Nov 21, 2025
700cf69
update sidecars
OussamaSaoudi Nov 25, 2025
57d6c0d
simplify driver
OussamaSaoudi Nov 25, 2025
0f38889
improve driver
OussamaSaoudi Nov 25, 2025
7599a89
cleanup driver
OussamaSaoudi Nov 25, 2025
1cca411
more cleanup
OussamaSaoudi Nov 26, 2025
eaae67e
address comments
OussamaSaoudi Nov 26, 2025
a523cf5
format
OussamaSaoudi Nov 26, 2025
30c8194
cleanup
OussamaSaoudi Dec 1, 2025
4feb9dd
remove unnecessary change
OussamaSaoudi Dec 1, 2025
79eb035
remove Arc<LogSegment>
OussamaSaoudi Dec 2, 2025
afd32bc
apease clippy
OussamaSaoudi Dec 3, 2025
64956c5
appease clippy more
OussamaSaoudi Dec 3, 2025
d6d6c5a
rename distributed to parallel
OussamaSaoudi Dec 6, 2025
b324727
simplify driver
OussamaSaoudi Nov 25, 2025
f538fa0
serde first draft
OussamaSaoudi Nov 19, 2025
989c98b
remove old driver naming
OussamaSaoudi Nov 26, 2025
2a01a68
remove public-facing api
OussamaSaoudi Dec 2, 2025
93469b9
add back send
OussamaSaoudi Dec 2, 2025
04962e1
cleanup
OussamaSaoudi Dec 2, 2025
341ed5a
fix format
OussamaSaoudi Dec 2, 2025
83b108d
update to fileactionkey
OussamaSaoudi Dec 3, 2025
c67ab36
address feedback
OussamaSaoudi Dec 6, 2025
294a0e9
fix test
OussamaSaoudi Dec 8, 2025
c8b8469
copy pasta.
OussamaSaoudi Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod metrics;
mod parallel;
pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
262 changes: 262 additions & 0 deletions kernel/src/log_reader/checkpoint_manifest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
//! Manifest phase for log replay - processes single-part checkpoints and manifest checkpoints.

use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use url::Url;

use crate::actions::visitors::SidecarVisitor;
use crate::actions::{Add, Remove, Sidecar, ADD_NAME};
use crate::actions::{REMOVE_NAME, SIDECAR_NAME};
use crate::log_replay::ActionsBatch;
use crate::path::ParsedLogPath;
use crate::schema::{SchemaRef, StructField, StructType, ToSchema};
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor};

/// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file
/// and extracts the sidecar actions during iteration.
#[allow(unused)]
pub(crate) struct CheckpointManifestReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
sidecar_visitor: SidecarVisitor,
log_root: Url,
is_complete: bool,
manifest_file: FileMeta,
}

impl CheckpointManifestReader {
/// Create a new manifest phase for a single-part checkpoint.
///
/// The schema is automatically augmented with the sidecar column since the manifest
/// phase needs to extract sidecar references for phase transitions.
///
/// # Parameters
/// - `manifest_file`: The checkpoint manifest file to process
/// - `log_root`: Root URL for resolving sidecar paths
/// - `engine`: Engine for reading files
#[allow(unused)]
pub(crate) fn try_new(
engine: Arc<dyn Engine>,
manifest: &ParsedLogPath,
log_root: Url,
) -> DeltaResult<Self> {
static MANIFEST_READ_SCHMEA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
]))
});

let actions = match manifest.extension.as_str() {
"json" => engine.json_handler().read_json_files(
std::slice::from_ref(&manifest.location),
MANIFEST_READ_SCHMEA.clone(),
None,
)?,
"parquet" => engine.parquet_handler().read_parquet_files(
std::slice::from_ref(&manifest.location),
MANIFEST_READ_SCHMEA.clone(),
None,
)?,
extension => {
return Err(Error::generic(format!(
"Unsupported checkpoint extension: {extension}",
)));
}
};

let actions = Box::new(actions.map(|batch_res| Ok(ActionsBatch::new(batch_res?, false))));
Ok(Self {
actions,
sidecar_visitor: SidecarVisitor::default(),
log_root,
is_complete: false,
manifest_file: manifest.location.clone(),
})
}

/// Extract the sidecars from the manifest file if there were any.
/// NOTE: The iterator must be completely exhausted before calling this
#[allow(unused)]
pub(crate) fn extract_sidecars(self) -> DeltaResult<Vec<FileMeta>> {
require!(
self.is_complete,
Error::generic(format!(
"Cannot extract sidecars from in-progress ManifestReader for file: {}",
self.manifest_file.location
))
);

let sidecars: Vec<_> = self
.sidecar_visitor
.sidecars
.into_iter()
.map(|s| s.to_filemeta(&self.log_root))
.try_collect()?;

Ok(sidecars)
}
}

impl Iterator for CheckpointManifestReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
let result = self.actions.next().map(|batch_result| {
let batch = batch_result?;
self.sidecar_visitor.visit_rows_of(batch.actions())?;
Ok(batch)
});

if result.is_none() {
self.is_complete = true;
}

result
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{Array, StringArray, StructArray};
use crate::engine::arrow_data::EngineDataArrowExt as _;
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
use crate::SnapshotRef;

use itertools::Itertools;
use std::sync::Arc;

/// Helper function to test manifest phase with expected add paths and sidecars
fn verify_manifest_phase(
engine: Arc<dyn Engine>,
snapshot: SnapshotRef,
expected_add_paths: &[&str],
expected_sidecars: &[&str],
) -> DeltaResult<()> {
let log_segment = snapshot.log_segment();
let log_root = log_segment.log_root.clone();
assert_eq!(log_segment.checkpoint_parts.len(), 1);
let checkpoint_file = &log_segment.checkpoint_parts[0];
let mut manifest_phase =
CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?;

// Extract add file paths and verify expectations
let mut file_paths = vec![];
for result in manifest_phase.by_ref() {
let batch = result?;
let ActionsBatch {
actions,
is_log_batch,
} = batch;
assert!(!is_log_batch, "Manifest should not be a log batch");

let record_batch = actions.try_into_record_batch()?;
let add = record_batch.column_by_name("add").unwrap();
let add_struct = add.as_any().downcast_ref::<StructArray>().unwrap();
let path = add_struct
.column_by_name("path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec();
file_paths.extend(batch_paths);
}

// Verify collected add paths
file_paths.sort();
assert_eq!(
file_paths, expected_add_paths,
"CheckpointManifestReader should extract expected Add file paths from checkpoint"
);

// Check sidecars
let actual_sidecars = manifest_phase.extract_sidecars()?;

assert_eq!(
actual_sidecars.len(),
expected_sidecars.len(),
"Should collect exactly {} actual_sidecars",
expected_sidecars.len()
);

// Extract and verify the sidecar paths
let mut collected_paths: Vec<String> = actual_sidecars
.iter()
.map(|fm| {
fm.location
.path_segments()
.and_then(|mut segments| segments.next_back())
.unwrap_or("")
.to_string()
})
.collect();

collected_paths.sort();
// Verify they're the expected sidecar files
assert_eq!(collected_paths, expected_sidecars.to_vec());

Ok(())
}

#[test]
fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;
verify_manifest_phase(
engine,
snapshot,
&["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"],
&[],
)
}

#[test]
fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;

let manifest_phase = CheckpointManifestReader::try_new(
engine.clone(),
&snapshot.log_segment().checkpoint_parts[0],
snapshot.log_segment().log_root.clone(),
)?;

let result = manifest_phase.extract_sidecars();
assert_result_error_with_message(
result,
"Cannot extract sidecars from in-progress ManifestReader for file",
);
Ok(())
}

#[test]
fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?;
verify_manifest_phase(
engine,
snapshot,
&[],
&[
"00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet",
],
)
}

#[test]
fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?;
verify_manifest_phase(
engine,
snapshot,
&[],
&[
"00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet",
],
)
}
}
101 changes: 101 additions & 0 deletions kernel/src/log_reader/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! Commit phase for log replay - processes JSON commit files.

use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine};

/// Phase that processes JSON commit files into [`ActionsBatch`]s
pub(crate) struct CommitReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl CommitReader {
/// Create a new commit phase from a log segment.
///
/// # Parameters
/// - `engine`: Engine for reading files
/// - `log_segment`: The log segment to process
/// - `schema`: The schema to read the json files
pub(crate) fn try_new(
engine: &dyn Engine,
log_segment: &LogSegment,
schema: SchemaRef,
) -> DeltaResult<Self> {
let commit_files = log_segment.find_commit_cover();
let actions = engine
.json_handler()
.read_json_files(&commit_files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, true)));

Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for CommitReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array::{StringArray, StructArray};
use crate::engine::arrow_data::EngineDataArrowExt as _;
use crate::scan::COMMIT_READ_SCHEMA;
use crate::utils::test_utils::load_test_table;
use itertools::Itertools;
use std::sync::Arc;

#[test]
fn test_commit_phase_processes_commits() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot, _tempdir) = load_test_table("app-txn-no-checkpoint")?;
let log_segment = Arc::new(snapshot.log_segment().clone());

let schema = COMMIT_READ_SCHEMA.clone();
let commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?;

let mut file_paths = vec![];
for result in commit_phase {
let batch = result?;
let ActionsBatch {
actions,
is_log_batch,
} = batch;
assert!(is_log_batch);

let record_batch = actions.try_into_record_batch()?;
let add = record_batch.column_by_name("add").unwrap();
let add_struct = add.as_any().downcast_ref::<StructArray>().unwrap();

let path = add_struct
.column_by_name("path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec();
file_paths.extend(batch_paths);
}

file_paths.sort();
let expected_files = vec![
"modified=2021-02-01/part-00001-80996595-a345-43b7-b213-e247d6f091f7-c000.snappy.parquet",
"modified=2021-02-01/part-00001-8ebcaf8b-0f48-4213-98c9-5c2156d20a7e-c000.snappy.parquet",
"modified=2021-02-02/part-00001-9a16b9f6-c12a-4609-a9c4-828eacb9526a-c000.snappy.parquet",
"modified=2021-02-02/part-00001-bfac5c74-426e-410f-ab74-21a64e518e9c-c000.snappy.parquet",
];
assert_eq!(
file_paths, expected_files,
"CommitReader should find exactly the expected files"
);

Ok(())
}
}
2 changes: 2 additions & 0 deletions kernel/src/log_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod checkpoint_manifest;
pub(crate) mod commit;
4 changes: 2 additions & 2 deletions kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tracing::debug;

/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
/// of adds and removes during log replay.
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub(crate) struct FileActionKey {
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
pub struct FileActionKey {
pub(crate) path: String,
pub(crate) dv_unique_id: Option<String>,
}
Expand Down
Loading
Loading