Skip to content

Commit 1e40840

Browse files
committed
serde first draft
1 parent 3df290c commit 1e40840

File tree

6 files changed

+413
-45
lines changed

6 files changed

+413
-45
lines changed

kernel/src/distributed/driver.rs

Lines changed: 212 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
88
use std::sync::Arc;
99

10+
use delta_kernel_derive::internal_api;
11+
1012
use crate::actions::get_commit_schema;
1113
use crate::log_reader::commit::CommitReader;
1214
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
@@ -249,13 +251,12 @@ mod tests {
249251
use crate::scan::state_info::StateInfo;
250252
use object_store::local::LocalFileSystem;
251253
use std::path::PathBuf;
252-
use std::sync::Arc as StdArc;
253254

254255
fn load_test_table(
255256
table_name: &str,
256257
) -> DeltaResult<(
257-
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
258-
StdArc<crate::Snapshot>,
258+
Arc<DefaultEngine<TokioBackgroundExecutor>>,
259+
Arc<crate::Snapshot>,
259260
url::Url,
260261
)> {
261262
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -268,8 +269,8 @@ mod tests {
268269
let url = url::Url::from_directory_path(path)
269270
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;
270271

271-
let store = StdArc::new(LocalFileSystem::new());
272-
let engine = StdArc::new(DefaultEngine::new(store));
272+
let store = Arc::new(LocalFileSystem::new());
273+
let engine = Arc::new(DefaultEngine::new(store));
273274
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
274275

275276
Ok((engine, snapshot, url))
@@ -278,9 +279,9 @@ mod tests {
278279
#[test]
279280
fn test_driver_v2_with_commits_only() -> DeltaResult<()> {
280281
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
281-
let log_segment = StdArc::new(snapshot.log_segment().clone());
282+
let log_segment = Arc::new(snapshot.log_segment().clone());
282283

283-
let state_info = StdArc::new(StateInfo::try_new(
284+
let state_info = Arc::new(StateInfo::try_new(
284285
snapshot.schema(),
285286
snapshot.table_configuration(),
286287
None,
@@ -336,9 +337,9 @@ mod tests {
336337
#[test]
337338
fn test_driver_v2_with_sidecars() -> DeltaResult<()> {
338339
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
339-
let log_segment = StdArc::new(snapshot.log_segment().clone());
340+
let log_segment = Arc::new(snapshot.log_segment().clone());
340341

341-
let state_info = StdArc::new(StateInfo::try_new(
342+
let state_info = Arc::new(StateInfo::try_new(
342343
snapshot.schema(),
343344
snapshot.table_configuration(),
344345
None,
@@ -420,4 +421,206 @@ mod tests {
420421
Ok(())
421422
}
422423

424+
#[test]
425+
fn test_distributed_scan_serialization() -> DeltaResult<()> {
426+
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
427+
let log_segment = Arc::new(snapshot.log_segment().clone());
428+
429+
let state_info = Arc::new(StateInfo::try_new(
430+
snapshot.schema(),
431+
snapshot.table_configuration(),
432+
None,
433+
(),
434+
)?);
435+
436+
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info.clone())?;
437+
438+
// Serialize the processor (takes ownership)
439+
let (serialized_state, deduplicator) = processor.serialize()?;
440+
441+
// Verify we can reconstruct the processor
442+
let reconstructed = ScanLogReplayProcessor::from_serialized(
443+
engine.as_ref(),
444+
serialized_state,
445+
deduplicator,
446+
)?;
447+
448+
// Verify schemas match (compare against original state_info)
449+
assert_eq!(
450+
state_info.logical_schema,
451+
reconstructed.state_info.logical_schema,
452+
"Logical schemas should match after serialization"
453+
);
454+
assert_eq!(
455+
state_info.physical_schema,
456+
reconstructed.state_info.physical_schema,
457+
"Physical schemas should match after serialization"
458+
);
459+
460+
// Verify transform spec matches
461+
match (&state_info.transform_spec, &reconstructed.state_info.transform_spec) {
462+
(Some(original), Some(reconstructed)) => {
463+
assert_eq!(
464+
**original,
465+
**reconstructed,
466+
"Transform spec should be equal after serialization"
467+
);
468+
}
469+
(None, None) => {
470+
// Both None - correct
471+
}
472+
_ => panic!("Transform spec presence mismatch after serialization"),
473+
}
474+
475+
// Verify column mapping mode matches
476+
assert_eq!(
477+
state_info.column_mapping_mode,
478+
reconstructed.state_info.column_mapping_mode,
479+
"Column mapping mode should match after serialization"
480+
);
481+
482+
Ok(())
483+
}
484+
485+
#[test]
486+
fn test_distributed_scan_with_sidecars() -> DeltaResult<()> {
487+
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
488+
489+
// Create a scan
490+
let scan = crate::scan::ScanBuilder::new(snapshot.clone())
491+
.build()?;
492+
493+
// Get distributed driver
494+
let mut driver = scan.scan_metadata_distributed(engine.clone())?;
495+
496+
let mut driver_batch_count = 0;
497+
let mut driver_file_paths = Vec::new();
498+
499+
// Process driver-side batches
500+
while let Some(result) = driver.next() {
501+
let metadata = result?;
502+
let paths = metadata.visit_scan_files(
503+
vec![],
504+
|ps: &mut Vec<String>, path, _, _, _, _, _| {
505+
ps.push(path.to_string());
506+
},
507+
)?;
508+
driver_file_paths.extend(paths);
509+
driver_batch_count += 1;
510+
}
511+
512+
// Driver should process commits but find no files (all in sidecars)
513+
assert_eq!(
514+
driver_file_paths.len(), 0,
515+
"Driver should find 0 files (all adds are in checkpoint sidecars)"
516+
);
517+
518+
// Should have executor phase with sidecars
519+
let result = driver.finish()?;
520+
match result {
521+
crate::distributed::DriverPhaseResult::NeedsExecutorPhase {
522+
processor,
523+
files,
524+
} => {
525+
assert_eq!(
526+
files.len(),
527+
2,
528+
"Should have exactly 2 sidecar files for distribution"
529+
);
530+
531+
// Serialize processor for distribution
532+
let (serialized_state, deduplicator) = processor.serialize()?;
533+
534+
// Verify the serialized state can be reconstructed
535+
let _reconstructed = ScanLogReplayProcessor::from_serialized(
536+
engine.as_ref(),
537+
serialized_state,
538+
deduplicator,
539+
)?;
540+
541+
// Verify sidecar file paths
542+
let mut collected_paths: Vec<String> = files
543+
.iter()
544+
.map(|fm| {
545+
fm.location
546+
.path_segments()
547+
.and_then(|segments| segments.last())
548+
.unwrap_or("")
549+
.to_string()
550+
})
551+
.collect();
552+
553+
collected_paths.sort();
554+
555+
assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet");
556+
assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet");
557+
}
558+
crate::distributed::DriverPhaseResult::Complete(_processor) => {
559+
panic!("Expected NeedsExecutorPhase for table with sidecars");
560+
}
561+
}
562+
563+
Ok(())
564+
}
565+
566+
#[test]
567+
fn test_deduplicator_state_preserved() -> DeltaResult<()> {
568+
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
569+
let log_segment = Arc::new(snapshot.log_segment().clone());
570+
571+
let state_info = Arc::new(StateInfo::try_new(
572+
snapshot.schema(),
573+
snapshot.table_configuration(),
574+
None,
575+
(),
576+
)?);
577+
578+
let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info.clone())?;
579+
580+
// Process some actions to populate the deduplicator
581+
let mut driver = DriverPhase::try_new(processor, log_segment, engine.clone())?;
582+
583+
// Process all driver batches
584+
while let Some(_) = driver.next() {}
585+
586+
let result = driver.finish()?;
587+
let processor = match result {
588+
crate::distributed::DriverPhaseResult::Complete(p) => p,
589+
crate::distributed::DriverPhaseResult::NeedsExecutorPhase { processor, .. } => processor,
590+
};
591+
592+
let initial_dedup_count = processor.seen_file_keys.len();
593+
594+
// Serialize and reconstruct (serialize takes ownership)
595+
let (serialized_state, deduplicator) = processor.serialize()?;
596+
assert_eq!(
597+
deduplicator.len(),
598+
initial_dedup_count,
599+
"Deduplicator size should be preserved during serialization"
600+
);
601+
602+
let reconstructed = ScanLogReplayProcessor::from_serialized(
603+
engine.as_ref(),
604+
serialized_state,
605+
deduplicator.clone(),
606+
)?;
607+
608+
assert_eq!(
609+
reconstructed.seen_file_keys.len(),
610+
initial_dedup_count,
611+
"Reconstructed processor should have same deduplicator size"
612+
);
613+
614+
// Verify the deduplicator contents match (compare against returned deduplicator)
615+
for key in &deduplicator {
616+
assert!(
617+
reconstructed.seen_file_keys.contains(key),
618+
"Reconstructed deduplicator should contain key: {:?}",
619+
key
620+
);
621+
}
622+
623+
Ok(())
624+
}
625+
423626
}

kernel/src/distributed/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
pub(crate) mod driver;
1+
pub(crate) mod driver;
2+
3+
pub(crate) use driver::{DriverPhase, DriverPhaseResult};

kernel/src/log_replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use tracing::debug;
2626

2727
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2828
/// of adds and removes during log replay.
29-
#[derive(Debug, Hash, Eq, PartialEq)]
29+
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
3030
pub(crate) struct FileActionKey {
3131
pub(crate) path: String,
3232
pub(crate) dv_unique_id: Option<String>,

0 commit comments

Comments
 (0)