Skip to content

Commit 6b81919

Browse files
committed
add leaf reader
1 parent 92737a4 commit 6b81919

File tree

2 files changed

+184
-0
lines changed

2 files changed

+184
-0
lines changed

kernel/src/log_reader/leaf.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
//! Sidecar phase for log replay - processes sidecar/leaf parquet files.
2+
3+
use std::sync::Arc;
4+
5+
use crate::log_replay::ActionsBatch;
6+
use crate::schema::SchemaRef;
7+
use crate::{DeltaResult, Engine, FileMeta};
8+
9+
/// Phase that processes sidecar or leaf parquet files.
10+
///
11+
/// This phase is distributable - you can partition `files` and create multiple
12+
/// instances on different executors.
13+
#[allow(unused)]
14+
pub(crate) struct LeafCheckpointReader {
15+
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
16+
}
17+
18+
impl LeafCheckpointReader {
19+
/// Create a new sidecar phase from file list.
20+
///
21+
/// # Distributability
22+
///
23+
/// This phase is designed to be distributable. To distribute:
24+
/// 1. Partition `files` across N executors
25+
/// 2. Create N `LeafCheckpointReader` instances, one per executor with its file partition
26+
///
27+
/// # Parameters
28+
/// - `files`: Sidecar/leaf files to process
29+
/// - `engine`: Engine for reading files
30+
/// - `schema`: Schema to use when reading sidecar files (projected based on processor requirements)
31+
#[allow(unused)]
32+
pub(crate) fn new(
33+
files: Vec<FileMeta>,
34+
engine: Arc<dyn Engine>,
35+
schema: SchemaRef,
36+
) -> DeltaResult<Self> {
37+
let actions = engine
38+
.parquet_handler()
39+
.read_parquet_files(&files, schema, None)?
40+
.map(|batch| batch.map(|b| ActionsBatch::new(b, false)));
41+
42+
Ok(Self {
43+
actions: Box::new(actions),
44+
})
45+
}
46+
}
47+
48+
impl Iterator for LeafCheckpointReader {
49+
type Item = DeltaResult<ActionsBatch>;
50+
51+
fn next(&mut self) -> Option<Self::Item> {
52+
self.actions.next()
53+
}
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
use crate::actions::{get_commit_schema, ADD_NAME};
60+
use crate::engine::default::DefaultEngine;
61+
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
62+
use crate::log_replay::LogReplayProcessor;
63+
use crate::scan::log_replay::ScanLogReplayProcessor;
64+
use crate::scan::state_info::StateInfo;
65+
use crate::{Error, Snapshot, SnapshotRef};
66+
use object_store::local::LocalFileSystem;
67+
use std::sync::Arc;
68+
use tempfile::TempDir;
69+
use url::Url;
70+
71+
fn load_test_table(
72+
table_name: &str,
73+
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef, Url, TempDir)> {
74+
let test_dir = test_utils::load_test_data("tests/data", table_name)
75+
.map_err(|e| Error::generic(format!("Failed to load test data: {}", e)))?;
76+
let test_path = test_dir.path().join(table_name);
77+
78+
let url = url::Url::from_directory_path(&test_path)
79+
.map_err(|_| Error::generic("Failed to create URL from path"))?;
80+
81+
let store = Arc::new(LocalFileSystem::new());
82+
let engine = Arc::new(DefaultEngine::new(store));
83+
let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
84+
85+
Ok((engine, snapshot, url, test_dir))
86+
}
87+
88+
#[test]
89+
fn test_sidecar_phase_processes_files() -> DeltaResult<()> {
90+
let (engine, snapshot, _table_root, _tempdir) =
91+
load_test_table("v2-checkpoints-json-with-sidecars")?;
92+
let log_segment = snapshot.log_segment();
93+
94+
let state_info = Arc::new(StateInfo::try_new(
95+
snapshot.schema(),
96+
snapshot.table_configuration(),
97+
None,
98+
(),
99+
)?);
100+
101+
let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
102+
103+
// First we need to run through manifest phase to get the sidecar files
104+
if log_segment.checkpoint_parts.is_empty() {
105+
println!("Test table has no checkpoint parts, skipping");
106+
return Ok(());
107+
}
108+
109+
// Get the first checkpoint part
110+
let checkpoint_file = &log_segment.checkpoint_parts[0];
111+
let manifest_file = checkpoint_file.location.clone();
112+
113+
let mut manifest_phase =
114+
ManifestPhase::new(manifest_file, log_segment.log_root.clone(), engine.clone())?;
115+
116+
// Drain manifest phase and apply processor
117+
for batch in manifest_phase.by_ref() {
118+
let batch = batch?;
119+
processor.process_actions_batch(batch)?;
120+
}
121+
122+
let after_manifest = manifest_phase.finalize()?;
123+
124+
match after_manifest {
125+
AfterManifest::Sidecars { sidecars } => {
126+
println!("Testing with {} sidecar files", sidecars.len());
127+
128+
let schema = get_commit_schema().project(&[ADD_NAME])?;
129+
130+
let mut sidecar_phase =
131+
LeafCheckpointReader::new(sidecars, engine.clone(), schema)?;
132+
133+
let mut sidecar_file_paths = Vec::new();
134+
let mut batch_count = 0;
135+
136+
while let Some(result) = sidecar_phase.next() {
137+
let batch = result?;
138+
let metadata = processor.process_actions_batch(batch)?;
139+
let paths = metadata.visit_scan_files(
140+
vec![],
141+
|ps: &mut Vec<String>, path, _, _, _, _, _| {
142+
ps.push(path.to_string());
143+
},
144+
)?;
145+
sidecar_file_paths.extend(paths);
146+
batch_count += 1;
147+
}
148+
149+
sidecar_file_paths.sort();
150+
151+
// v2-checkpoints-json-with-sidecars has exactly 2 sidecar files with 101 total files
152+
assert_eq!(
153+
batch_count, 2,
154+
"LeafCheckpointReader should process exactly 2 sidecar batches"
155+
);
156+
157+
assert_eq!(
158+
sidecar_file_paths.len(),
159+
101,
160+
"LeafCheckpointReader should find exactly 101 files from sidecars"
161+
);
162+
163+
// Verify first few files match expected (sampling to keep test readable)
164+
let expected_first_files = vec![
165+
"test%25file%25prefix-part-00000-01086c52-1b86-48d0-8889-517fe626849d-c000.snappy.parquet",
166+
"test%25file%25prefix-part-00000-0fd71c0e-fd08-4685-87d6-aae77532d3ea-c000.snappy.parquet",
167+
"test%25file%25prefix-part-00000-2710dd7f-9fa5-429d-b3fb-c005ba16e062-c000.snappy.parquet",
168+
];
169+
170+
assert_eq!(
171+
&sidecar_file_paths[..3],
172+
&expected_first_files[..],
173+
"LeafCheckpointReader should process files in expected order"
174+
);
175+
}
176+
AfterManifest::Done => {
177+
println!("No sidecars found - test inconclusive");
178+
}
179+
}
180+
181+
Ok(())
182+
}
183+
}

kernel/src/log_reader/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub(crate) mod commit;
2+
pub(crate) mod leaf;
23
pub(crate) mod manifest;

0 commit comments

Comments
 (0)