Skip to content

Commit 38ad6a5

Browse files
committed
Address comments from Nick and Oussama
1 parent b7219a8 commit 38ad6a5

File tree

7 files changed

+77
-98
lines changed

7 files changed

+77
-98
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler};
1818
///
1919
/// Generic over the inner iterator type and item type.
2020
/// The `event_fn` receives (duration, num_files, bytes_read) to construct the appropriate MetricEvent.
21+
/// Metrics are emitted either when the iterator is exhausted or when dropped.
2122
struct MetricsIterator<I, T>
2223
where
2324
I: Iterator<Item = DeltaResult<T>>,
@@ -155,6 +156,7 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
155156
&self,
156157
path: &Url,
157158
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
159+
let start = Instant::now();
158160
// The offset is used for list-after; the prefix is used to restrict the listing to a specific directory.
159161
// Unfortunately, `Path` provides no easy way to check whether a name is directory-like,
160162
// because it strips trailing /, so we're reduced to manually checking the original URL.
@@ -220,7 +222,6 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
220222
}
221223
});
222224

223-
let start = Instant::now();
224225
let reporter = self.reporter.clone();
225226

226227
if !has_ordered_listing {
@@ -261,8 +262,8 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
261262
&self,
262263
files: Vec<FileSlice>,
263264
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
264-
let store = self.inner.clone();
265265
let start = Instant::now();
266+
let store = self.inner.clone();
266267

267268
// This channel will become the output iterator.
268269
// Because there will already be buffering in the stream, we set the

kernel/src/engine/default/mod.rs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use self::parquet::DefaultParquetHandler;
1919
use super::arrow_conversion::TryFromArrow as _;
2020
use super::arrow_data::ArrowEngineData;
2121
use super::arrow_expression::ArrowEvaluationHandler;
22+
use crate::metrics::MetricsReporter;
2223
use crate::schema::Schema;
2324
use crate::transaction::WriteContext;
2425
use crate::{
@@ -40,7 +41,7 @@ pub struct DefaultEngine<E: TaskExecutor> {
4041
json: Arc<DefaultJsonHandler<E>>,
4142
parquet: Arc<DefaultParquetHandler<E>>,
4243
evaluation: Arc<ArrowEvaluationHandler>,
43-
reporter: Option<Arc<dyn crate::metrics::MetricsReporter>>,
44+
reporter: Option<Arc<dyn MetricsReporter>>,
4445
}
4546

4647
impl DefaultEngine<executor::tokio::TokioBackgroundExecutor> {
@@ -60,18 +61,12 @@ impl DefaultEngine<executor::tokio::TokioBackgroundExecutor> {
6061
}
6162

6263
/// Create a new [`DefaultEngine`] instance with the default executor and metrics reporting.
63-
///
64-
/// Uses `TokioBackgroundExecutor` as the default executor.
65-
/// For custom executors, use [`DefaultEngine::new_with_executor_and_metrics_reporter`].
66-
///
67-
/// # Parameters
68-
///
69-
/// - `object_store`: The object store to use.
70-
/// - `reporter`: Metrics reporter for tracking operation metrics.
64+
/// TODO: Switch to Builder pattern to avoid explosion of constructor methods.
65+
#[allow(dead_code)]
7166
#[internal_api]
7267
pub(crate) fn new_with_metrics_reporter(
7368
object_store: Arc<DynObjectStore>,
74-
reporter: Arc<dyn crate::metrics::MetricsReporter>,
69+
reporter: Arc<dyn MetricsReporter>,
7570
) -> Self {
7671
Self::new_with_executor_and_metrics_reporter(
7772
object_store,
@@ -113,24 +108,13 @@ impl<E: TaskExecutor> DefaultEngine<E> {
113108
}
114109

115110
/// Create a new [`DefaultEngine`] instance with a custom executor and metrics reporting.
116-
///
117-
/// Most users should use [`DefaultEngine::new`] instead. This method is only
118-
/// needed for specialized testing scenarios (e.g., multi-threaded executors) with metrics.
119-
///
120-
/// # Parameters
121-
///
122-
/// - `object_store`: The object store to use.
123-
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
124-
/// - `reporter`: Metrics reporter for tracking operation metrics.
125-
///
126-
/// # TODO
127-
///
128-
/// Switch to Builder pattern to avoid explosion of constructor methods.
111+
/// TODO: Switch to Builder pattern to avoid explosion of constructor methods.
112+
#[allow(dead_code)]
129113
#[internal_api]
130114
pub(crate) fn new_with_executor_and_metrics_reporter(
131115
object_store: Arc<DynObjectStore>,
132116
task_executor: Arc<E>,
133-
reporter: Arc<dyn crate::metrics::MetricsReporter>,
117+
reporter: Arc<dyn MetricsReporter>,
134118
) -> Self {
135119
Self {
136120
storage: Arc::new(ObjectStoreStorageHandler::new(
@@ -194,7 +178,7 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
194178
self.parquet.clone()
195179
}
196180

197-
fn get_metrics_reporter(&self) -> Option<Arc<dyn crate::metrics::MetricsReporter>> {
181+
fn get_metrics_reporter(&self) -> Option<Arc<dyn MetricsReporter>> {
198182
self.reporter.clone()
199183
}
200184
}

kernel/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ pub use engine_data::{EngineData, FilteredEngineData, RowVisitor};
155155
pub use error::{DeltaResult, Error};
156156
pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
157157
pub use log_compaction::{should_compact, LogCompactionWriter};
158+
pub use metrics::MetricsReporter;
158159
pub use snapshot::Snapshot;
159160
pub use snapshot::SnapshotRef;
160161

@@ -717,11 +718,11 @@ pub trait Engine: AsAny {
717718
/// Get the connector provided [`ParquetHandler`].
718719
fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
719720

720-
/// Get the connector provided [`metrics::MetricsReporter`] for metrics collection.
721+
/// Get the connector provided [`MetricsReporter`] for metrics collection.
721722
///
722723
/// Returns an optional reporter that will receive metric events from Delta operations.
723724
/// The default implementation returns None (no metrics reporting).
724-
fn get_metrics_reporter(&self) -> Option<Arc<dyn crate::metrics::MetricsReporter>> {
725+
fn get_metrics_reporter(&self) -> Option<Arc<dyn MetricsReporter>> {
725726
None
726727
}
727728
}

kernel/src/log_segment.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
use std::num::NonZero;
44
use std::sync::{Arc, LazyLock};
55

6+
use std::time::Instant;
7+
68
use crate::actions::visitors::SidecarVisitor;
79
use crate::actions::{
810
get_commit_schema, schema_contains_file_actions, Metadata, Protocol, Sidecar, METADATA_NAME,
911
PROTOCOL_NAME, SIDECAR_NAME,
1012
};
1113
use crate::last_checkpoint_hint::LastCheckpointHint;
1214
use crate::log_replay::ActionsBatch;
15+
use crate::metrics::{MetricEvent, MetricId, MetricsReporter};
1316
use crate::path::{LogPathFileType, ParsedLogPath};
1417
use crate::schema::{SchemaRef, StructField, ToSchema as _};
1518
use crate::utils::require;
@@ -145,22 +148,46 @@ impl LogSegment {
145148
/// - `time_travel_version`: The version of the log that the Snapshot will be at.
146149
///
147150
/// [`Snapshot`]: crate::snapshot::Snapshot
151+
///
152+
/// Reports metrics: `LogSegmentLoaded`.
148153
#[internal_api]
149154
pub(crate) fn for_snapshot(
150155
storage: &dyn StorageHandler,
151156
log_root: Url,
152157
log_tail: Vec<ParsedLogPath>,
153158
time_travel_version: impl Into<Option<Version>>,
159+
reporter: Option<&Arc<dyn MetricsReporter>>,
160+
operation_id: Option<MetricId>,
154161
) -> DeltaResult<Self> {
162+
let operation_id = operation_id.unwrap_or_default();
163+
let start = Instant::now();
164+
155165
let time_travel_version = time_travel_version.into();
156166
let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root)?;
157-
Self::for_snapshot_impl(
167+
let result = Self::for_snapshot_impl(
158168
storage,
159169
log_root,
160170
log_tail,
161171
checkpoint_hint,
162172
time_travel_version,
163-
)
173+
);
174+
let log_segment_loading_duration = start.elapsed();
175+
176+
match result {
177+
Ok(log_segment) => {
178+
reporter.inspect(|r| {
179+
r.report(MetricEvent::LogSegmentLoaded {
180+
operation_id,
181+
duration: log_segment_loading_duration,
182+
num_commit_files: log_segment.ascending_commit_files.len() as u64,
183+
num_checkpoint_files: log_segment.checkpoint_parts.len() as u64,
184+
num_compaction_files: log_segment.ascending_compaction_files.len() as u64,
185+
});
186+
});
187+
Ok(log_segment)
188+
}
189+
Err(e) => Err(e),
190+
}
164191
}
165192

166193
// factored out for testing

kernel/src/log_segment/tests.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2110,7 +2110,8 @@ fn test_latest_commit_file_field_is_captured() {
21102110
);
21112111

21122112
let log_segment =
2113-
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None).unwrap();
2113+
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
2114+
.unwrap();
21142115

21152116
// The latest commit should be version 5
21162117
assert_eq!(log_segment.latest_commit_file.unwrap().version, 5);
@@ -2136,7 +2137,8 @@ fn test_latest_commit_file_with_checkpoint_filtering() {
21362137
);
21372138

21382139
let log_segment =
2139-
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None).unwrap();
2140+
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
2141+
.unwrap();
21402142

21412143
// The latest commit should be version 4
21422144
assert_eq!(log_segment.latest_commit_file.unwrap().version, 4);
@@ -2156,7 +2158,8 @@ fn test_latest_commit_file_with_no_commits() {
21562158
);
21572159

21582160
let log_segment =
2159-
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None).unwrap();
2161+
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
2162+
.unwrap();
21602163

21612164
// latest_commit_file should be None when there are no commits
21622165
assert!(log_segment.latest_commit_file.is_none());
@@ -2179,7 +2182,8 @@ fn test_latest_commit_file_with_checkpoint_at_same_version() {
21792182
);
21802183

21812184
let log_segment =
2182-
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None).unwrap();
2185+
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
2186+
.unwrap();
21832187

21842188
// The latest commit should be version 1 (saved before filtering)
21852189
assert_eq!(log_segment.latest_commit_file.unwrap().version, 1);
@@ -2204,7 +2208,8 @@ fn test_latest_commit_file_edge_case_commit_before_checkpoint() {
22042208
);
22052209

22062210
let log_segment =
2207-
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None).unwrap();
2211+
LogSegment::for_snapshot(storage.as_ref(), log_root.clone(), vec![], None, None, None)
2212+
.unwrap();
22082213

22092214
// latest_commit_file should be None since there's no commit at the checkpoint version
22102215
assert!(log_segment.latest_commit_file.is_none());

kernel/src/snapshot.rs

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ impl Snapshot {
267267
) -> DeltaResult<Arc<Self>> {
268268
let operation_id = operation_id.unwrap_or_default();
269269
let reporter = engine.get_metrics_reporter();
270-
let start = Instant::now();
271270

271+
let start = Instant::now();
272272
let result =
273273
Self::try_new_from_impl(existing_snapshot, log_tail, engine, version, operation_id);
274274
let snapshot_duration = start.elapsed();
@@ -298,15 +298,29 @@ impl Snapshot {
298298

299299
/// Implementation of snapshot creation from log segment.
300300
///
301-
/// Calls `build_table_configuration` which reports `ProtocolMetadataLoaded`.
301+
/// Reports metrics: `ProtocolMetadataLoaded`.
302302
fn try_new_from_log_segment_impl(
303303
location: Url,
304304
log_segment: LogSegment,
305305
engine: &dyn Engine,
306306
operation_id: MetricId,
307307
) -> DeltaResult<Self> {
308+
let reporter = engine.get_metrics_reporter();
309+
310+
let start = Instant::now();
311+
let (metadata, protocol) = log_segment.read_metadata(engine)?;
312+
let read_metadata_duration = start.elapsed();
313+
314+
reporter.as_ref().inspect(|r| {
315+
r.report(MetricEvent::ProtocolMetadataLoaded {
316+
operation_id,
317+
duration: read_metadata_duration,
318+
});
319+
});
320+
308321
let table_configuration =
309-
Self::build_table_configuration(&log_segment, engine, location, operation_id)?;
322+
TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
323+
310324
Ok(Self {
311325
log_segment,
312326
table_configuration,
@@ -324,8 +338,8 @@ impl Snapshot {
324338
) -> DeltaResult<Self> {
325339
let operation_id = operation_id.unwrap_or_default();
326340
let reporter = engine.get_metrics_reporter();
327-
let start = Instant::now();
328341

342+
let start = Instant::now();
329343
let result =
330344
Self::try_new_from_log_segment_impl(location, log_segment, engine, operation_id);
331345
let snapshot_duration = start.elapsed();
@@ -516,33 +530,6 @@ impl Snapshot {
516530
None => Err(Error::generic("Last commit file not found in log segment")),
517531
}
518532
}
519-
520-
/// Build table configuration from log segment.
521-
///
522-
/// Reports metrics: `ProtocolMetadataLoaded`.
523-
fn build_table_configuration(
524-
log_segment: &LogSegment,
525-
engine: &dyn Engine,
526-
location: Url,
527-
operation_id: MetricId,
528-
) -> DeltaResult<TableConfiguration> {
529-
let start = Instant::now();
530-
let reporter = engine.get_metrics_reporter();
531-
let (metadata, protocol) = log_segment.read_metadata(engine)?;
532-
let read_metadata_duration = start.elapsed();
533-
534-
reporter.as_ref().inspect(|r| {
535-
r.report(MetricEvent::ProtocolMetadataLoaded {
536-
operation_id,
537-
duration: read_metadata_duration,
538-
});
539-
});
540-
541-
let table_configuration =
542-
TableConfiguration::try_new(metadata, protocol, location, log_segment.end_version)?;
543-
544-
Ok(table_configuration)
545-
}
546533
}
547534

548535
#[cfg(test)]

kernel/src/snapshot/builder.rs

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
//! Builder for creating [`Snapshot`] instances.
2-
use std::time::Instant;
3-
42
use crate::log_path::LogPath;
53
use crate::log_segment::LogSegment;
6-
use crate::metrics::{MetricEvent, MetricId};
4+
use crate::metrics::MetricId;
75
use crate::snapshot::SnapshotRef;
86
use crate::{DeltaResult, Engine, Error, Snapshot, Version};
97

@@ -84,44 +82,20 @@ impl SnapshotBuilder {
8482
///
8583
/// - `engine`: Implementation of [`Engine`] apis.
8684
pub fn build(self, engine: &dyn Engine) -> DeltaResult<SnapshotRef> {
87-
let start = Instant::now();
8885
let log_tail = self.log_tail.into_iter().map(Into::into).collect();
8986

9087
let operation_id = MetricId::new();
9188
let reporter = engine.get_metrics_reporter();
9289

9390
if let Some(table_root) = self.table_root {
94-
let log_segment_result = LogSegment::for_snapshot(
91+
let log_segment = LogSegment::for_snapshot(
9592
engine.storage_handler().as_ref(),
9693
table_root.join("_delta_log/")?,
9794
log_tail,
9895
self.version,
99-
);
100-
let log_segment_loading_duration = start.elapsed();
101-
102-
let log_segment = match log_segment_result {
103-
Ok(seg) => {
104-
reporter.as_ref().inspect(|r| {
105-
r.report(MetricEvent::LogSegmentLoaded {
106-
operation_id,
107-
duration: log_segment_loading_duration,
108-
num_commit_files: seg.ascending_commit_files.len() as u64,
109-
num_checkpoint_files: seg.checkpoint_parts.len() as u64,
110-
num_compaction_files: seg.ascending_compaction_files.len() as u64,
111-
});
112-
});
113-
seg
114-
}
115-
Err(e) => {
116-
reporter.as_ref().inspect(|r| {
117-
r.report(MetricEvent::SnapshotFailed {
118-
operation_id,
119-
duration: log_segment_loading_duration,
120-
});
121-
});
122-
return Err(e);
123-
}
124-
};
96+
reporter.as_ref(),
97+
Some(operation_id),
98+
)?;
12599

126100
Ok(Snapshot::try_new_from_log_segment(
127101
table_root,

0 commit comments

Comments
 (0)