Skip to content

Commit a534df5

Browse files
committed
simplify
1 parent 1bcc46e commit a534df5

File tree

3 files changed

+52
-67
lines changed

3 files changed

+52
-67
lines changed

kernel/src/log_reader/checkpoint_manifest.rs

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ impl CheckpointManifestReader {
3737
/// - `engine`: Engine for reading files
3838
#[allow(unused)]
3939
pub(crate) fn try_new(
40+
engine: Arc<dyn Engine>,
4041
manifest: &ParsedLogPath,
4142
log_root: Url,
42-
engine: Arc<dyn Engine>,
4343
) -> DeltaResult<Self> {
4444
static MANIFEST_READ_SCHMEA: LazyLock<SchemaRef> = LazyLock::new(|| {
4545
Arc::new(StructType::new_unchecked([
@@ -123,17 +123,13 @@ mod tests {
123123
use super::*;
124124
use crate::arrow::array::{Array, StringArray, StructArray};
125125
use crate::engine::arrow_data::EngineDataArrowExt as _;
126-
use crate::utils::test_utils::{
127-
assert_result_error_with_message, create_engine_and_snapshot_from_path,
128-
load_extracted_test_table,
129-
};
126+
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
130127
use crate::SnapshotRef;
131128

132129
use itertools::Itertools;
133130
use std::sync::Arc;
134-
use test_utils::load_test_data;
135131

136-
/// Core helper function to test manifest phase with expected add paths and sidecars
132+
/// Helper function to test manifest phase with expected add paths and sidecars
137133
fn verify_manifest_phase(
138134
engine: Arc<dyn Engine>,
139135
snapshot: SnapshotRef,
@@ -145,7 +141,7 @@ mod tests {
145141
assert_eq!(log_segment.checkpoint_parts.len(), 1);
146142
let checkpoint_file = &log_segment.checkpoint_parts[0];
147143
let mut manifest_phase =
148-
CheckpointManifestReader::try_new(checkpoint_file, log_root, engine.clone())?;
144+
CheckpointManifestReader::try_new(engine.clone(), checkpoint_file, log_root)?;
149145

150146
// Extract add file paths and verify expectations
151147
let mut file_paths = vec![];
@@ -207,46 +203,25 @@ mod tests {
207203
Ok(())
208204
}
209205

210-
/// Helper function to test manifest phase with expected add paths and sidecars.
211-
/// Works with both compressed (tar.zst) and already-extracted test tables.
212-
fn test_manifest_phase(
213-
table_name: &str,
214-
expected_add_paths: &[&str],
215-
expected_sidecars: &[&str],
216-
) -> DeltaResult<()> {
217-
// Try loading as compressed table first, fall back to extracted
218-
let (engine, snapshot, _tempdir) = match load_test_data("tests/data", table_name) {
219-
Ok(test_dir) => {
220-
let test_path = test_dir.path().join(table_name);
221-
let (engine, snapshot) = create_engine_and_snapshot_from_path(&test_path)?;
222-
(engine, snapshot, Some(test_dir))
223-
}
224-
Err(_) => {
225-
let (engine, snapshot) = load_extracted_test_table(table_name)?;
226-
(engine, snapshot, None)
227-
}
228-
};
229-
230-
verify_manifest_phase(engine, snapshot, expected_add_paths, expected_sidecars)
231-
}
232-
233206
#[test]
234207
fn test_manifest_phase_extracts_file_paths() -> DeltaResult<()> {
235-
test_manifest_phase(
236-
"with_checkpoint_no_last_checkpoint",
208+
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;
209+
verify_manifest_phase(
210+
engine,
211+
snapshot,
237212
&["part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet"],
238-
&[], // No sidecars
213+
&[],
239214
)
240215
}
241216

242217
#[test]
243218
fn test_manifest_phase_early_finalize_error() -> DeltaResult<()> {
244-
let (engine, snapshot) = load_extracted_test_table("with_checkpoint_no_last_checkpoint")?;
219+
let (engine, snapshot, _tempdir) = load_test_table("with_checkpoint_no_last_checkpoint")?;
245220

246221
let manifest_phase = CheckpointManifestReader::try_new(
222+
engine.clone(),
247223
&snapshot.log_segment().checkpoint_parts[0],
248224
snapshot.log_segment().log_root.clone(),
249-
engine.clone(),
250225
)?;
251226

252227
let result = manifest_phase.extract_sidecars();
@@ -259,9 +234,11 @@ mod tests {
259234

260235
#[test]
261236
fn test_manifest_phase_collects_sidecars() -> DeltaResult<()> {
262-
test_manifest_phase(
263-
"v2-checkpoints-json-with-sidecars",
264-
&[], // No add paths in manifest (they're in sidecars)
237+
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-json-with-sidecars")?;
238+
verify_manifest_phase(
239+
engine,
240+
snapshot,
241+
&[],
265242
&[
266243
"00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet",
267244
"00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet",
@@ -271,9 +248,11 @@ mod tests {
271248

272249
#[test]
273250
fn test_manifest_phase_collects_sidecars_parquet() -> DeltaResult<()> {
274-
test_manifest_phase(
275-
"v2-checkpoints-parquet-with-sidecars",
276-
&[], // No add paths in manifest (they're in sidecars)
251+
let (engine, snapshot, _tempdir) = load_test_table("v2-checkpoints-parquet-with-sidecars")?;
252+
verify_manifest_phase(
253+
engine,
254+
snapshot,
255+
&[],
277256
&[
278257
"00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet",
279258
"00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet",

kernel/src/log_reader/commit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ mod tests {
4848
use crate::arrow::array::{StringArray, StructArray};
4949
use crate::engine::arrow_data::EngineDataArrowExt as _;
5050
use crate::scan::COMMIT_READ_SCHEMA;
51-
use crate::utils::test_utils::load_extracted_test_table;
51+
use crate::utils::test_utils::load_test_table;
5252
use itertools::Itertools;
5353
use std::sync::Arc;
5454

5555
#[test]
5656
fn test_commit_phase_processes_commits() -> Result<(), Box<dyn std::error::Error>> {
57-
let (engine, snapshot) = load_extracted_test_table("app-txn-no-checkpoint")?;
57+
let (engine, snapshot, _tempdir) = load_test_table("app-txn-no-checkpoint")?;
5858
let log_segment = Arc::new(snapshot.log_segment().clone());
5959

6060
let schema = COMMIT_READ_SCHEMA.clone();

kernel/src/utils.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ pub(crate) mod test_utils {
165165
use std::{path::Path, sync::Arc};
166166
use tempfile::TempDir;
167167
use test_utils::delta_path_for_version;
168+
use test_utils::load_test_data;
168169
use url::Url;
169170

170171
#[derive(Serialize)]
@@ -287,34 +288,39 @@ pub(crate) mod test_utils {
287288
}
288289
}
289290

290-
/// Helper to create engine and snapshot from a path.
291-
/// Returns (engine, snapshot) tuple.
292-
pub(crate) fn create_engine_and_snapshot_from_path(
293-
path: &Path,
294-
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef)> {
295-
let url = Url::from_directory_path(path)
291+
/// Load a test table from tests/data directory.
292+
/// Tries compressed (tar.zst) first, falls back to extracted.
293+
/// Returns (engine, snapshot, optional tempdir). The TempDir must be kept alive
294+
/// for the duration of the test to prevent premature cleanup of extracted files.
295+
pub(crate) fn load_test_table(
296+
table_name: &str,
297+
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef, Option<TempDir>)> {
298+
// Try loading compressed table first, fall back to extracted
299+
let (path, tempdir) = match load_test_data("tests/data", table_name) {
300+
Ok(test_dir) => {
301+
let test_path = test_dir.path().join(table_name);
302+
(test_path, Some(test_dir))
303+
}
304+
Err(_) => {
305+
// Fall back to already-extracted table
306+
let manifest_dir = env!("CARGO_MANIFEST_DIR");
307+
let mut path = PathBuf::from(manifest_dir);
308+
path.push("tests/data");
309+
path.push(table_name);
310+
let path = std::fs::canonicalize(path)
311+
.map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?;
312+
(path, None)
313+
}
314+
};
315+
316+
// Create engine and snapshot from the resolved path
317+
let url = Url::from_directory_path(&path)
296318
.map_err(|_| Error::Generic("Failed to create URL from path".to_string()))?;
297319

298320
let store = Arc::new(LocalFileSystem::new());
299321
let engine = Arc::new(DefaultEngine::new(store));
300322
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
301-
Ok((engine, snapshot))
302-
}
303-
304-
/// Load an already-extracted test table from the filesystem.
305-
/// Returns (engine, snapshot) tuple.
306-
pub(crate) fn load_extracted_test_table(
307-
table_name: &str,
308-
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef)> {
309-
let manifest_dir = env!("CARGO_MANIFEST_DIR");
310-
let mut path = PathBuf::from(manifest_dir);
311-
path.push("tests/data");
312-
path.push(table_name);
313-
314-
let path = std::fs::canonicalize(path)
315-
.map_err(|e| Error::Generic(format!("Failed to canonicalize path: {}", e)))?;
316-
317-
create_engine_and_snapshot_from_path(&path)
323+
Ok((engine, snapshot, tempdir))
318324
}
319325
}
320326

0 commit comments

Comments
 (0)