Skip to content

Commit 46fd53d

Browse files
committed
rename
1 parent a46775c commit 46fd53d

File tree

3 files changed

+149
-184
lines changed

3 files changed

+149
-184
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
//! Sidecar phase for log replay - processes sidecar/leaf parquet files.
2+
3+
use std::sync::Arc;
4+
5+
use crate::log_replay::ActionsBatch;
6+
use crate::schema::SchemaRef;
7+
use crate::{DeltaResult, Engine, FileMeta};
8+
9+
/// Phase that processes sidecar or leaf parquet files.
10+
///
11+
/// This phase is distributable - you can partition `files` and create multiple
12+
/// instances on different executors.
13+
#[allow(unused)]
14+
pub(crate) struct CheckpointLeafReader {
15+
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
16+
}
17+
18+
impl CheckpointLeafReader {
19+
/// Create a new sidecar phase from file list.
20+
///
21+
/// # Distributability
22+
///
23+
/// This phase is designed to be distributable. To distribute:
24+
/// 1. Partition `files` across N executors/threads
25+
/// 2. Create N `CheckpointLeafReader` instances, one per executor with its file partition
26+
///
27+
/// # Parameters
28+
/// - `files`: Sidecar/leaf files to process
29+
/// - `engine`: Engine for reading files
30+
/// - `schema`: Schema to use when reading sidecar files (projected based on processor requirements)
31+
#[allow(unused)]
32+
pub(crate) fn try_new(
33+
engine: Arc<dyn Engine>,
34+
files: Vec<FileMeta>,
35+
schema: SchemaRef,
36+
) -> DeltaResult<Self> {
37+
let actions = engine
38+
.parquet_handler()
39+
.read_parquet_files(&files, schema, None)?
40+
.map(|batch| batch.map(|b| ActionsBatch::new(b, false)));
41+
42+
Ok(Self {
43+
actions: Box::new(actions),
44+
})
45+
}
46+
}
47+
48+
impl Iterator for CheckpointLeafReader {
49+
type Item = DeltaResult<ActionsBatch>;
50+
51+
fn next(&mut self) -> Option<Self::Item> {
52+
self.actions.next()
53+
}
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
use crate::actions::{get_commit_schema, ADD_NAME};
60+
use crate::arrow::array::{Array, StringArray, StructArray};
61+
use crate::engine::arrow_data::EngineDataArrowExt as _;
62+
use crate::log_reader::checkpoint_manifest::CheckpointManifestReader;
63+
use crate::utils::test_utils::load_test_table_with_data;
64+
use itertools::Itertools;
65+
use std::sync::Arc;
66+
67+
#[test]
68+
fn test_sidecar_phase_processes_files() -> DeltaResult<()> {
69+
let (engine, snapshot, _tempdir) =
70+
load_test_table_with_data("tests/data", "v2-checkpoints-json-with-sidecars")?;
71+
let log_segment = snapshot.log_segment();
72+
73+
// First we need to run through manifest phase to get the sidecar files
74+
if log_segment.checkpoint_parts.is_empty() {
75+
println!("Test table has no checkpoint parts, skipping");
76+
return Ok(());
77+
}
78+
79+
// Get the first checkpoint part
80+
let checkpoint_file = &log_segment.checkpoint_parts[0];
81+
82+
let mut manifest_phase = CheckpointManifestReader::try_new(
83+
checkpoint_file,
84+
log_segment.log_root.clone(),
85+
engine.clone(),
86+
)?;
87+
88+
// Drain manifest phase
89+
for batch in manifest_phase.by_ref() {
90+
let _batch = batch?;
91+
}
92+
93+
let sidecars = manifest_phase.extract_sidecars()?;
94+
95+
println!("Testing with {} sidecar files", sidecars.len());
96+
97+
let schema = get_commit_schema().project(&[ADD_NAME])?;
98+
99+
let mut sidecar_phase = CheckpointLeafReader::try_new(engine.clone(), sidecars, schema)?;
100+
101+
let mut sidecar_file_paths = Vec::new();
102+
103+
for result in sidecar_phase {
104+
let batch = result?;
105+
let ActionsBatch {
106+
actions,
107+
is_log_batch,
108+
} = batch;
109+
assert!(!is_log_batch, "Sidecars should not be log batches");
110+
111+
let record_batch = actions.try_into_record_batch()?;
112+
let add = record_batch.column_by_name("add").unwrap();
113+
let add_struct = add.as_any().downcast_ref::<StructArray>().unwrap();
114+
let path = add_struct
115+
.column_by_name("path")
116+
.unwrap()
117+
.as_any()
118+
.downcast_ref::<StringArray>()
119+
.unwrap();
120+
121+
let batch_paths = path.iter().flatten().map(ToString::to_string).collect_vec();
122+
sidecar_file_paths.extend(batch_paths);
123+
}
124+
125+
sidecar_file_paths.sort();
126+
127+
assert_eq!(
128+
sidecar_file_paths.len(),
129+
101,
130+
"CheckpointLeafReader should find exactly 101 files from sidecars"
131+
);
132+
133+
// Verify first few files match expected (sampling to keep test readable)
134+
let expected_first_files = vec![
135+
"test%25file%25prefix-part-00000-01086c52-1b86-48d0-8889-517fe626849d-c000.snappy.parquet",
136+
"test%25file%25prefix-part-00000-0fd71c0e-fd08-4685-87d6-aae77532d3ea-c000.snappy.parquet",
137+
"test%25file%25prefix-part-00000-2710dd7f-9fa5-429d-b3fb-c005ba16e062-c000.snappy.parquet",
138+
];
139+
140+
assert_eq!(
141+
&sidecar_file_paths[..3],
142+
&expected_first_files[..],
143+
"CheckpointLeafReader should process files in expected order"
144+
);
145+
146+
Ok(())
147+
}
148+
}

kernel/src/log_reader/leaf.rs

Lines changed: 0 additions & 183 deletions
This file was deleted.

kernel/src/log_reader/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1+
pub(crate) mod checkpoint_leaf;
12
pub(crate) mod checkpoint_manifest;
23
pub(crate) mod commit;
3-
pub(crate) mod leaf;

0 commit comments

Comments
 (0)