Skip to content

Commit 3df290c

Browse files
committed
driver
1 parent 6c1aeea commit 3df290c

File tree

1 file changed

+60
-39
lines changed

1 file changed

+60
-39
lines changed

kernel/src/distributed/driver.rs

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ enum DriverState {
7171
Done,
7272
}
7373

74+
/// Result of driver phase processing.
75+
pub(crate) enum DriverPhaseResult<P> {
76+
/// All processing complete on driver - no executor phase needed.
77+
Complete(P),
78+
/// Executor phase needed - distribute files to executors for parallel processing.
79+
NeedsExecutorPhase { processor: P, files: Vec<FileMeta> },
80+
}
81+
7482
impl<P: LogReplayProcessor> DriverPhase<P> {
7583
/// Create a new driver-side log replay.
7684
///
@@ -206,21 +214,26 @@ impl<P: LogReplayProcessor> DriverPhase<P> {
206214
/// Must be called after the iterator is exhausted.
207215
///
208216
/// # Returns
209-
/// - `Some((processor, files))`: Executor phase needed - distribute files to executors
210-
/// - `None`: No executor phase needed - all processing complete on driver
217+
/// - `Complete`: All processing done on driver - no executor phase needed
218+
/// - `NeedsExecutorPhase`: Executor phase needed - distribute files to executors
211219
///
212220
/// # Errors
213221
/// Returns an error if called before iterator exhaustion.
214-
pub(crate) fn finish(self) -> DeltaResult<Option<(P, Vec<FileMeta>)>> {
222+
pub(crate) fn finish(self) -> DeltaResult<DriverPhaseResult<P>> {
215223
if !self.is_finished {
216224
return Err(Error::generic(
217225
"Must exhaust iterator before calling finish()",
218226
));
219227
}
220228

221229
match self.state {
222-
Some(DriverState::ExecutorPhase { files }) => Ok(Some((self.processor, files))),
223-
Some(DriverState::Done) | None => Ok(None),
230+
Some(DriverState::ExecutorPhase { files }) => {
231+
Ok(DriverPhaseResult::NeedsExecutorPhase {
232+
processor: self.processor,
233+
files,
234+
})
235+
}
236+
Some(DriverState::Done) | None => Ok(DriverPhaseResult::Complete(self.processor)),
224237
_ => Err(Error::generic("Unexpected state after iterator exhaustion")),
225238
}
226239
}
@@ -308,10 +321,14 @@ mod tests {
308321

309322
// No executor phase needed for commits-only table
310323
let result = driver.finish()?;
311-
assert!(
312-
result.is_none(),
313-
"DriverPhase should return None for commits-only table (no executor phase needed)"
314-
);
324+
match result {
325+
DriverPhaseResult::Complete(_processor) => {
326+
// Expected - no executor phase needed
327+
}
328+
DriverPhaseResult::NeedsExecutorPhase { .. } => {
329+
panic!("Expected Complete, but got NeedsExecutorPhase for commits-only table");
330+
}
331+
}
315332

316333
Ok(())
317334
}
@@ -365,36 +382,40 @@ mod tests {
365382

366383
// Should have executor phase with sidecars from the checkpoint
367384
let result = driver.finish()?;
368-
assert!(
369-
result.is_some(),
370-
"DriverPhase should return Some for table with sidecars (executor phase needed)"
371-
);
372-
373-
let (_processor, files) = result.unwrap();
374-
assert_eq!(
375-
files.len(),
376-
2,
377-
"DriverPhase should collect exactly 2 sidecar files from checkpoint for distribution"
378-
);
379-
380-
// Extract and verify the sidecar file paths
381-
let mut collected_paths: Vec<String> = files
382-
.iter()
383-
.map(|fm| {
384-
// Get the filename from the URL path
385-
fm.location
386-
.path_segments()
387-
.and_then(|segments| segments.last())
388-
.unwrap_or("")
389-
.to_string()
390-
})
391-
.collect();
392-
393-
collected_paths.sort();
394-
395-
// Verify they're the expected sidecar files for version 6
396-
assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet");
397-
assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet");
385+
match result {
386+
DriverPhaseResult::NeedsExecutorPhase {
387+
processor: _processor,
388+
files,
389+
} => {
390+
assert_eq!(
391+
files.len(),
392+
2,
393+
"DriverPhase should collect exactly 2 sidecar files from checkpoint for distribution"
394+
);
395+
396+
// Extract and verify the sidecar file paths
397+
let mut collected_paths: Vec<String> = files
398+
.iter()
399+
.map(|fm| {
400+
// Get the filename from the URL path
401+
fm.location
402+
.path_segments()
403+
.and_then(|segments| segments.last())
404+
.unwrap_or("")
405+
.to_string()
406+
})
407+
.collect();
408+
409+
collected_paths.sort();
410+
411+
// Verify they're the expected sidecar files for version 6
412+
assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet");
413+
assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet");
414+
}
415+
DriverPhaseResult::Complete(_processor) => {
416+
panic!("Expected NeedsExecutorPhase for table with sidecars");
417+
}
418+
}
398419

399420
Ok(())
400421
}

0 commit comments

Comments
 (0)