Skip to content

Commit cfcf46c

Browse files
committed
manifest v1
1 parent a5493e5 commit cfcf46c

File tree

2 files changed

+290
-0
lines changed

2 files changed

+290
-0
lines changed

kernel/src/log_reader/manifest.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
//! Manifest phase for log replay - processes single-part checkpoint manifest files.
2+
3+
use std::sync::Arc;
4+
5+
use url::Url;
6+
7+
use crate::actions::Sidecar;
8+
use crate::actions::{get_all_actions_schema, visitors::SidecarVisitor, SIDECAR_NAME};
9+
use crate::expressions::Transform;
10+
use crate::log_replay::ActionsBatch;
11+
use crate::schema::{Schema, SchemaRef, StructField, ToSchema};
12+
use crate::{DeltaResult, Engine, Error, Expression, ExpressionEvaluator, FileMeta, RowVisitor};
13+
14+
/// Phase that processes single-part checkpoint manifest files.
15+
///
16+
/// Extracts sidecar references while processing the manifest.
17+
pub(crate) struct ManifestPhase {
18+
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
19+
sidecar_visitor: SidecarVisitor,
20+
original_schema: SchemaRef,
21+
log_root: Url,
22+
}
23+
24+
/// Possible transitions after ManifestPhase completes.
25+
pub(crate) enum AfterManifest {
26+
/// Has sidecars → return sidecar files
27+
Sidecars { sidecars: Vec<FileMeta> },
28+
/// No sidecars
29+
Done,
30+
}
31+
32+
impl ManifestPhase {
33+
/// Create a new manifest phase for a single-part checkpoint.
34+
///
35+
/// The schema is automatically augmented with the sidecar column since the manifest
36+
/// phase needs to extract sidecar references for phase transitions.
37+
///
38+
/// # Parameters
39+
/// - `manifest_file`: The checkpoint manifest file to process
40+
/// - `log_root`: Root URL for resolving sidecar paths
41+
/// - `engine`: Engine for reading files
42+
/// - `base_schema`: Schema columns required by the processor (will be augmented with sidecar)
43+
pub fn new(
44+
manifest_file: FileMeta,
45+
log_root: Url,
46+
engine: Arc<dyn Engine>,
47+
) -> DeltaResult<Self> {
48+
let files = vec![manifest_file.clone()];
49+
50+
// Determine file type from extension
51+
let extension = manifest_file
52+
.location
53+
.path()
54+
.rsplit('.')
55+
.next()
56+
.unwrap_or("");
57+
58+
let actions = match extension {
59+
"json" => {
60+
engine
61+
.json_handler()
62+
.read_json_files(&files, sidecar_schema.clone(), None)?
63+
}
64+
"parquet" => {
65+
engine
66+
.parquet_handler()
67+
.read_parquet_files(&files, sidecar_schema.clone(), None)?
68+
}
69+
ext => {
70+
return Err(Error::generic(format!(
71+
"Unsupported checkpoint extension: {}",
72+
ext
73+
)))
74+
}
75+
};
76+
77+
let actions = actions.map(|batch| batch.map(|b| ActionsBatch::new(b, false)));
78+
79+
Ok(Self {
80+
actions: Box::new(actions),
81+
sidecar_visitor: SidecarVisitor::default(),
82+
log_root,
83+
original_schema,
84+
projector,
85+
})
86+
}
87+
88+
/// Transition to the next phase.
89+
///
90+
/// Returns an enum indicating what comes next:
91+
/// - `Sidecars`: Extracted sidecar files
92+
/// - `Done`: No sidecars found
93+
pub(crate) fn finalize(self) -> DeltaResult<AfterManifest> {
94+
// TODO: Check that stream is exhausted. We can track a boolean flag on whether we saw a None yet.
95+
let sidecars = self
96+
.sidecar_visitor
97+
.sidecars
98+
.into_iter()
99+
.map(|s| s.to_filemeta(&self.log_root))
100+
.collect::<Result<Vec<_>, _>>()?;
101+
102+
if sidecars.is_empty() {
103+
Ok(AfterManifest::Done)
104+
} else {
105+
Ok(AfterManifest::Sidecars { sidecars })
106+
}
107+
}
108+
}
109+
110+
impl Iterator for ManifestPhase {
111+
type Item = DeltaResult<ActionsBatch>;
112+
113+
fn next(&mut self) -> Option<Self::Item> {
114+
self.actions.next().map(|batch_result| {
115+
batch_result.and_then(|batch| {
116+
// Extract sidecar references from the batch
117+
self.sidecar_visitor.visit_rows_of(batch.actions())?;
118+
119+
// Return the batch
120+
// TODO: un-select sidecar actions
121+
// TODO: project out sidecar actions
122+
let batch = self.projector.evaluate(batch);
123+
Ok(batch)
124+
})
125+
})
126+
}
127+
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use super::*;
132+
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
133+
use crate::engine::default::DefaultEngine;
134+
use crate::log_replay::LogReplayProcessor;
135+
use crate::scan::log_replay::ScanLogReplayProcessor;
136+
use crate::scan::state_info::StateInfo;
137+
use object_store::local::LocalFileSystem;
138+
use std::path::PathBuf;
139+
use std::sync::Arc as StdArc;
140+
141+
fn load_test_table(
142+
table_name: &str,
143+
) -> DeltaResult<(
144+
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
145+
StdArc<crate::Snapshot>,
146+
url::Url,
147+
)> {
148+
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
149+
path.push("tests/data");
150+
path.push(table_name);
151+
152+
let path = std::fs::canonicalize(path)
153+
.map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?;
154+
155+
let url = url::Url::from_directory_path(path)
156+
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;
157+
158+
let store = StdArc::new(LocalFileSystem::new());
159+
let engine = StdArc::new(DefaultEngine::new(store));
160+
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
161+
162+
Ok((engine, snapshot, url))
163+
}
164+
165+
#[test]
166+
fn test_manifest_phase_with_checkpoint() -> DeltaResult<()> {
167+
// Use a table with v2 checkpoints where adds might be in sidecars
168+
let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?;
169+
let log_segment = snapshot.log_segment();
170+
171+
// Check if there are any checkpoint parts
172+
if log_segment.checkpoint_parts.is_empty() {
173+
println!("Test table has no checkpoint parts, skipping");
174+
return Ok(());
175+
}
176+
177+
let state_info = StdArc::new(StateInfo::try_new(
178+
snapshot.schema(),
179+
snapshot.table_configuration(),
180+
None,
181+
(),
182+
)?);
183+
184+
let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
185+
186+
// Get the first checkpoint part
187+
let checkpoint_file = &log_segment.checkpoint_parts[0];
188+
let manifest_file = checkpoint_file.location.clone();
189+
190+
let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?;
191+
192+
let mut manifest_phase =
193+
ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?;
194+
195+
// Count batches and collect results
196+
let mut batch_count = 0;
197+
let mut file_paths = Vec::new();
198+
199+
while let Some(result) = manifest_phase.next() {
200+
let batch = result?;
201+
let metadata = processor.process_actions_batch(batch)?;
202+
let paths = metadata.visit_scan_files(
203+
vec![],
204+
|ps: &mut Vec<String>, path, _, _, _, _, _| {
205+
ps.push(path.to_string());
206+
},
207+
)?;
208+
file_paths.extend(paths);
209+
batch_count += 1;
210+
}
211+
212+
// For v2 checkpoints with sidecars, the manifest might not contain adds directly.
213+
// In this test table, all adds are in sidecars, so manifest should be empty.
214+
assert_eq!(
215+
batch_count, 1,
216+
"Single manifest file should produce exactly 1 batch"
217+
);
218+
219+
// Verify the manifest itself contains no add files (they're all in sidecars)
220+
file_paths.sort();
221+
assert_eq!(
222+
file_paths.len(), 0,
223+
"For this v2 checkpoint with sidecars, manifest should contain 0 add files (all in sidecars)"
224+
);
225+
226+
Ok(())
227+
}
228+
229+
#[test]
230+
fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> {
231+
let (engine, snapshot, log_root) = load_test_table("v2-checkpoints-json-with-sidecars")?;
232+
let log_segment = snapshot.log_segment();
233+
234+
if log_segment.checkpoint_parts.is_empty() {
235+
println!("Test table has no checkpoint parts, skipping");
236+
return Ok(());
237+
}
238+
239+
let checkpoint_file = &log_segment.checkpoint_parts[0];
240+
let manifest_file = checkpoint_file.location.clone();
241+
242+
let schema = crate::actions::get_commit_schema().project(&[crate::actions::ADD_NAME])?;
243+
244+
let mut manifest_phase =
245+
ManifestPhase::new(manifest_file, log_root.clone(), engine.clone(), schema)?;
246+
247+
// Drain the phase
248+
while manifest_phase.next().is_some() {}
249+
250+
// Check if sidecars were collected
251+
let next = manifest_phase.into_next()?;
252+
253+
match next {
254+
AfterManifest::Sidecars { sidecars } => {
255+
// For the v2-checkpoints-json-with-sidecars test table at version 6,
256+
// there are exactly 2 sidecar files
257+
assert_eq!(
258+
sidecars.len(),
259+
2,
260+
"Should collect exactly 2 sidecars for checkpoint at version 6"
261+
);
262+
263+
// Extract and verify the sidecar paths
264+
let mut collected_paths: Vec<String> = sidecars
265+
.iter()
266+
.map(|fm| {
267+
// Get the filename from the URL path
268+
fm.location
269+
.path_segments()
270+
.and_then(|segments| segments.last())
271+
.unwrap_or("")
272+
.to_string()
273+
})
274+
.collect();
275+
276+
collected_paths.sort();
277+
278+
// Verify they're the expected sidecar files for version 6
279+
assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet");
280+
assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet");
281+
}
282+
AfterManifest::Done => {
283+
panic!("Expected sidecars for v2-checkpoints-json-with-sidecars table");
284+
}
285+
}
286+
287+
Ok(())
288+
}
289+
}

kernel/src/log_reader/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub(crate) mod commit;
2+
pub(crate) mod manifest;

0 commit comments

Comments
 (0)