Skip to content

Commit 1316e09

Browse files
committed
instrument datafusion create physical plan
1 parent 3d92880 commit 1316e09

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use tokio::sync::{
3333
mpsc::{self},
3434
RwLock, RwLockWriteGuard,
3535
};
36-
use tracing::instrument;
36+
use tracing::{instrument, Instrument};
3737

3838
use crate::{
3939
error::Error as DataFusionIcebergError,
@@ -540,33 +540,35 @@ async fn table_scan(
540540
.manifests(snapshot_range.0, snapshot_range.1)
541541
.await
542542
.map_err(DataFusionIcebergError::from)?;
543-
let data_files: Vec<(ManifestPath, ManifestEntry)> = table
543+
let mut data_files = table
544544
.datafiles(&manifests, None, sequence_number_range)
545545
.await
546-
.map_err(DataFusionIcebergError::from)?
547-
.try_collect()
548546
.map_err(DataFusionIcebergError::from)?;
549-
data_files.into_iter().for_each(|manifest| {
550-
if *manifest.1.status() != Status::Deleted {
551-
match manifest.1.data_file().content() {
552-
Content::Data => {
553-
data_file_groups
554-
.entry(manifest.1.data_file().partition().clone())
555-
.or_default()
556-
.push(manifest);
557-
}
558-
Content::EqualityDeletes => {
559-
equality_delete_file_groups
560-
.entry(manifest.1.data_file().partition().clone())
561-
.or_default()
562-
.push(manifest);
563-
}
564-
Content::PositionDeletes => {
565-
panic!("Position deletes not supported.")
547+
data_files
548+
.try_for_each(|manifest| {
549+
let manifest = manifest?;
550+
if *manifest.1.status() != Status::Deleted {
551+
match manifest.1.data_file().content() {
552+
Content::Data => {
553+
data_file_groups
554+
.entry(manifest.1.data_file().partition().clone())
555+
.or_default()
556+
.push(manifest);
557+
}
558+
Content::EqualityDeletes => {
559+
equality_delete_file_groups
560+
.entry(manifest.1.data_file().partition().clone())
561+
.or_default()
562+
.push(manifest);
563+
}
564+
Content::PositionDeletes => {
565+
panic!("Position deletes not supported.")
566+
}
566567
}
567568
}
568-
}
569-
});
569+
Ok::<_, Error>(())
570+
})
571+
.map_err(DataFusionIcebergError::from)?;
570572
};
571573

572574
let file_source = Arc::new(
@@ -862,6 +864,9 @@ async fn table_scan(
862864

863865
let other_plan = ParquetFormat::default()
864866
.create_physical_plan(session, file_scan_config)
867+
.instrument(tracing::debug_span!(
868+
"datafusion_iceberg::create_physical_plan_scan_data_files"
869+
))
865870
.await?;
866871

867872
plans.push(other_plan);

0 commit comments

Comments
 (0)