Skip to content

Commit 2af7649

Browse files
committed
better
1 parent 8ffc299 commit 2af7649

File tree

2 files changed

+89
-43
lines changed

2 files changed

+89
-43
lines changed

kernel/src/snapshot.rs

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,13 @@ impl Snapshot {
100100
}
101101
}
102102

103-
/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you
104-
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the
105-
/// snapshot to a later version.
106-
fn try_new_from(
103+
/// Implementation of snapshot creation from existing snapshot.
104+
fn try_new_from_impl(
107105
existing_snapshot: Arc<Snapshot>,
108106
log_tail: Vec<ParsedLogPath>,
109107
engine: &dyn Engine,
110108
version: impl Into<Option<Version>>,
109+
operation_id: MetricId,
111110
) -> DeltaResult<Arc<Self>> {
112111
let old_log_segment = &existing_snapshot.log_segment;
113112
let old_version = existing_snapshot.version();
@@ -181,11 +180,11 @@ impl Snapshot {
181180

182181
if new_log_segment.checkpoint_version.is_some() {
183182
// we have a checkpoint in the new LogSegment, just construct a new snapshot from that
184-
let snapshot = Self::try_new_from_log_segment(
183+
let snapshot = Self::try_new_from_log_segment_impl(
185184
existing_snapshot.table_root().clone(),
186185
new_log_segment,
187186
engine,
188-
None,
187+
operation_id,
189188
);
190189
return Ok(Arc::new(snapshot?));
191190
}
@@ -254,7 +253,69 @@ impl Snapshot {
254253
)))
255254
}
256255

256+
/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you
257+
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the
258+
/// snapshot to a later version.
259+
///
260+
/// Reports metrics: `SnapshotCompleted` or `SnapshotFailed`.
261+
fn try_new_from(
262+
existing_snapshot: Arc<Snapshot>,
263+
log_tail: Vec<ParsedLogPath>,
264+
engine: &dyn Engine,
265+
version: impl Into<Option<Version>>,
266+
operation_id: Option<MetricId>,
267+
) -> DeltaResult<Arc<Self>> {
268+
let operation_id = operation_id.unwrap_or_default();
269+
let reporter = engine.get_metrics_reporter();
270+
let start = Instant::now();
271+
272+
let result =
273+
Self::try_new_from_impl(existing_snapshot, log_tail, engine, version, operation_id);
274+
let snapshot_duration = start.elapsed();
275+
276+
match result {
277+
Ok(snapshot) => {
278+
reporter.as_ref().inspect(|r| {
279+
r.report(MetricEvent::SnapshotCompleted {
280+
operation_id,
281+
version: snapshot.version(),
282+
total_duration: snapshot_duration,
283+
});
284+
});
285+
Ok(snapshot)
286+
}
287+
Err(e) => {
288+
reporter.as_ref().inspect(|r| {
289+
r.report(MetricEvent::SnapshotFailed {
290+
operation_id,
291+
duration: snapshot_duration,
292+
});
293+
});
294+
Err(e)
295+
}
296+
}
297+
}
298+
299+
/// Implementation of snapshot creation from log segment.
300+
///
301+
/// Calls `build_table_configuration` which reports `ProtocolMetadataLoaded`.
302+
fn try_new_from_log_segment_impl(
303+
location: Url,
304+
log_segment: LogSegment,
305+
engine: &dyn Engine,
306+
operation_id: MetricId,
307+
) -> DeltaResult<Self> {
308+
let table_configuration =
309+
Self::build_table_configuration(&log_segment, engine, location, operation_id)?;
310+
Ok(Self {
311+
log_segment,
312+
table_configuration,
313+
})
314+
}
315+
257316
/// Create a new [`Snapshot`] instance.
317+
///
318+
/// Reports metrics: `SnapshotCompleted` or `SnapshotFailed`.
258319
pub(crate) fn try_new_from_log_segment(
259320
location: Url,
260321
log_segment: LogSegment,
@@ -265,23 +326,20 @@ impl Snapshot {
265326
let reporter = engine.get_metrics_reporter();
266327
let start = Instant::now();
267328

268-
let table_configuration =
269-
Self::build_table_configuration(&log_segment, engine, location, operation_id);
329+
let result =
330+
Self::try_new_from_log_segment_impl(location, log_segment, engine, operation_id);
270331
let snapshot_duration = start.elapsed();
271332

272-
match table_configuration {
273-
Ok(table_configuration) => {
333+
match result {
334+
Ok(snapshot) => {
274335
reporter.as_ref().inspect(|r| {
275336
r.report(MetricEvent::SnapshotCompleted {
276337
operation_id,
277-
version: table_configuration.version(),
338+
version: snapshot.table_configuration.version(),
278339
total_duration: snapshot_duration,
279340
});
280341
});
281-
Ok(Self {
282-
log_segment,
283-
table_configuration,
284-
})
342+
Ok(snapshot)
285343
}
286344
Err(e) => {
287345
reporter.as_ref().inspect(|r| {
@@ -459,6 +517,9 @@ impl Snapshot {
459517
}
460518
}
461519

520+
/// Build table configuration from log segment.
521+
///
522+
/// Reports metrics: `ProtocolMetadataLoaded`.
462523
fn build_table_configuration(
463524
log_segment: &LogSegment,
464525
engine: &dyn Engine,
@@ -1574,7 +1635,7 @@ mod tests {
15741635
.build(&engine)?;
15751636

15761637
// Test with empty log tail - should return same snapshot
1577-
let result = Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, None)?;
1638+
let result = Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, None, None)?;
15781639
assert_eq!(result, base_snapshot);
15791640

15801641
Ok(())
@@ -1642,7 +1703,7 @@ mod tests {
16421703

16431704
// Create new snapshot from base to version 2 using try_new_from directly
16441705
let new_snapshot =
1645-
Snapshot::try_new_from(base_snapshot.clone(), log_tail, &engine, Some(2))?;
1706+
Snapshot::try_new_from(base_snapshot.clone(), log_tail, &engine, Some(2), None)?;
16461707

16471708
// Latest commit should now be version 2
16481709
assert_eq!(
@@ -1691,11 +1752,13 @@ mod tests {
16911752
.build(&engine)?;
16921753

16931754
// Test requesting same version - should return same snapshot
1694-
let same_version = Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, Some(1))?;
1755+
let same_version =
1756+
Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, Some(1), None)?;
16951757
assert!(Arc::ptr_eq(&same_version, &base_snapshot));
16961758

16971759
// Test requesting older version - should error
1698-
let older_version = Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, Some(0));
1760+
let older_version =
1761+
Snapshot::try_new_from(base_snapshot.clone(), vec![], &engine, Some(0), None);
16991762
assert!(matches!(
17001763
older_version,
17011764
Err(Error::Generic(msg)) if msg.contains("older than snapshot hint version")

kernel/src/snapshot/builder.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -137,30 +137,13 @@ impl SnapshotBuilder {
137137
)
138138
})?;
139139

140-
let result = Snapshot::try_new_from(existing_snapshot, log_tail, engine, self.version);
141-
let snapshot_building_duration = start.elapsed();
142-
143-
match result {
144-
Ok(snapshot) => {
145-
reporter.as_ref().inspect(|r| {
146-
r.report(MetricEvent::SnapshotCompleted {
147-
operation_id,
148-
version: snapshot.version(),
149-
total_duration: snapshot_building_duration,
150-
});
151-
});
152-
Ok(snapshot)
153-
}
154-
Err(e) => {
155-
reporter.as_ref().inspect(|r| {
156-
r.report(MetricEvent::SnapshotFailed {
157-
operation_id,
158-
duration: snapshot_building_duration,
159-
});
160-
});
161-
Err(e)
162-
}
163-
}
140+
Snapshot::try_new_from(
141+
existing_snapshot,
142+
log_tail,
143+
engine,
144+
self.version,
145+
Some(operation_id),
146+
)
164147
}
165148
}
166149
}

0 commit comments

Comments
 (0)