Skip to content

Commit e5b57e1

Browse files
committed
more cleanup
1 parent 9bfb6b9 commit e5b57e1

File tree

3 files changed

+29
-50
lines changed

3 files changed

+29
-50
lines changed

kernel/src/log_reader/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mod tests {
5454

5555
#[test]
5656
fn test_commit_phase_processes_commits() -> Result<(), Box<dyn std::error::Error>> {
57-
let (engine, snapshot) = load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), "app-txn-no-checkpoint")?;
57+
let (engine, snapshot) = load_extracted_test_table("app-txn-no-checkpoint")?;
5858
let log_segment = Arc::new(snapshot.log_segment().clone());
5959

6060
let schema = COMMIT_READ_SCHEMA.clone();

kernel/src/log_reader/manifest.rs

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Manifest phase for log replay - processes single-part checkpoint manifest files.
1+
//! Manifest phase for log replay - processes single-part checkpoints and manifest checkpoints.
22
33
use std::sync::{Arc, LazyLock};
44

@@ -14,9 +14,8 @@ use crate::schema::{SchemaRef, StructField, StructType, ToSchema};
1414
use crate::utils::require;
1515
use crate::{DeltaResult, Engine, Error, FileMeta, RowVisitor};
1616

17-
/// Phase that processes single-part checkpoint manifest files.
18-
///
19-
/// Extracts sidecar references while processing the manifest.
17+
/// Phase that processes single-part checkpoint. This also treats the checkpoint as a manifest file
18+
/// and extracts the sidecar actions during iteration.
2019
#[allow(unused)]
2120
pub(crate) struct ManifestPhase {
2221
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
@@ -28,8 +27,8 @@ pub(crate) struct ManifestPhase {
2827
/// Possible transitions after ManifestPhase completes.
2928
#[allow(unused)]
3029
pub(crate) enum AfterManifest {
31-
/// Has sidecars → return sidecar files
32-
Sidecars { sidecars: Vec<FileMeta> },
30+
/// Sidecars extracted from the manifest phase
31+
Sidecars(Vec<FileMeta>),
3332
/// No sidecars
3433
Done,
3534
}
@@ -108,7 +107,7 @@ impl ManifestPhase {
108107
if sidecars.is_empty() {
109108
Ok(AfterManifest::Done)
110109
} else {
111-
Ok(AfterManifest::Sidecars { sidecars })
110+
Ok(AfterManifest::Sidecars(sidecars))
112111
}
113112
}
114113
}
@@ -139,8 +138,10 @@ mod tests {
139138
assert_result_error_with_message, create_engine_and_snapshot_from_path,
140139
load_extracted_test_table,
141140
};
141+
142142
use crate::SnapshotRef;
143143
use std::sync::Arc;
144+
use test_utils::load_test_data;
144145

145146
/// Core helper function to test manifest phase with expected add paths and sidecars
146147
fn verify_manifest_phase(
@@ -154,18 +155,10 @@ mod tests {
154155
use itertools::Itertools;
155156

156157
let log_segment = snapshot.log_segment();
157-
158-
if log_segment.checkpoint_parts.is_empty() {
159-
panic!("Test table has no checkpoint parts");
160-
}
161-
158+
let log_root = log_segment.log_root.clone();
159+
assert_eq!(log_segment.checkpoint_parts.len(), 1);
162160
let checkpoint_file = &log_segment.checkpoint_parts[0];
163-
164-
let mut manifest_phase = ManifestPhase::try_new(
165-
checkpoint_file,
166-
snapshot.log_segment().log_root.clone(),
167-
engine.clone(),
168-
)?;
161+
let mut manifest_phase = ManifestPhase::try_new(checkpoint_file, log_root, engine.clone())?;
169162

170163
// Extract add file paths and verify expectations
171164
let mut file_paths = vec![];
@@ -180,9 +173,6 @@ mod tests {
180173
let record_batch = actions.try_into_record_batch()?;
181174
let add = record_batch.column_by_name("add").unwrap();
182175
let add_struct = add.as_any().downcast_ref::<StructArray>().unwrap();
183-
184-
// If we expect no add paths (they're in sidecars), verify all adds are null
185-
// Extract add paths
186176
let path = add_struct
187177
.column_by_name("path")
188178
.unwrap()
@@ -205,14 +195,14 @@ mod tests {
205195
let next = manifest_phase.finalize()?;
206196

207197
match (next, expected_sidecars) {
208-
(AfterManifest::Sidecars { .. }, []) => {
209-
panic!("Expected to be Done, but found Sidecars")
198+
(AfterManifest::Sidecars(sidecars), []) => {
199+
panic!("Expected to be Done, but found Sidecars: {:?}", sidecars)
210200
}
211201
(AfterManifest::Done, []) => { /* Empty expected sidecars is Done */ }
212202
(AfterManifest::Done, sidecars) => {
213203
panic!("Expected manifest phase to be Done, but got {:?}", sidecars)
214204
}
215-
(AfterManifest::Sidecars { sidecars }, expected_sidecars) => {
205+
(AfterManifest::Sidecars(sidecars), expected_sidecars) => {
216206
assert_eq!(
217207
sidecars.len(),
218208
expected_sidecars.len(),
@@ -224,7 +214,6 @@ mod tests {
224214
let mut collected_paths: Vec<String> = sidecars
225215
.iter()
226216
.map(|fm| {
227-
// Get the filename from the URL path
228217
fm.location
229218
.path_segments()
230219
.and_then(|mut segments| segments.next_back())
@@ -250,19 +239,17 @@ mod tests {
250239
expected_sidecars: &[&str],
251240
) -> DeltaResult<()> {
252241
// Try loading as compressed table first, fall back to extracted
253-
let (engine, snapshot, _tempdir) =
254-
match test_utils::load_test_data("tests/data", table_name) {
255-
Ok(test_dir) => {
256-
let test_path = test_dir.path().join(table_name);
257-
let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?;
258-
(engine, snapshot, Some(test_dir))
259-
}
260-
Err(_) => {
261-
let (engine, snapshot) =
262-
load_extracted_test_table(env!("CARGO_MANIFEST_DIR"), table_name)?;
263-
(engine, snapshot, None)
264-
}
265-
};
242+
let (engine, snapshot, _tempdir) = match load_test_data("tests/data", table_name) {
243+
Ok(test_dir) => {
244+
let test_path = test_dir.path().join(table_name);
245+
let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?;
246+
(engine, snapshot, Some(test_dir))
247+
}
248+
Err(_) => {
249+
let (engine, snapshot) = load_extracted_test_table(table_name)?;
250+
(engine, snapshot, None)
251+
}
252+
};
266253

267254
verify_manifest_phase(engine, snapshot, expected_add_paths, expected_sidecars)
268255
}
@@ -278,23 +265,16 @@ mod tests {
278265

279266
#[test]
280267
fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> {
281-
let (engine, snapshot) = load_extracted_test_table(
282-
env!("CARGO_MANIFEST_DIR"),
283-
"with_checkpoint_no_last_checkpoint",
284-
)?;
268+
let (engine, snapshot) = load_extracted_test_table("with_checkpoint_no_last_checkpoint")?;
285269

286270
let manifest_phase = ManifestPhase::try_new(
287271
&snapshot.log_segment().checkpoint_parts[0],
288272
snapshot.log_segment().log_root.clone(),
289273
engine.clone(),
290274
)?;
291275

292-
// Attempt to finalize without draining the iterator
293276
let result = manifest_phase.finalize();
294-
295-
// Should get an error about not being exhausted
296277
assert_result_error_with_message(result, "not exausted");
297-
298278
Ok(())
299279
}
300280

kernel/src/utils.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ pub(crate) mod test_utils {
161161
use object_store::local::LocalFileSystem;
162162
use object_store::ObjectStore;
163163
use serde::Serialize;
164+
use std::path::PathBuf;
164165
use std::{path::Path, sync::Arc};
165166
use tempfile::TempDir;
166167
use test_utils::delta_path_for_version;
@@ -303,11 +304,9 @@ pub(crate) mod test_utils {
303304
/// Load an already-extracted test table from the filesystem.
304305
/// Returns (engine, snapshot) tuple.
305306
pub(crate) fn load_extracted_test_table(
306-
manifest_dir: &str,
307307
table_name: &str,
308308
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef)> {
309-
use std::path::PathBuf;
310-
309+
let manifest_dir = env!("CARGO_MANIFEST_DIR");
311310
let mut path = PathBuf::from(manifest_dir);
312311
path.push("tests/data");
313312
path.push(table_name);

0 commit comments

Comments
 (0)