Skip to content

Commit 7fc30e6

Browse files
committed
avoid second call to get manifests
1 parent 48ce682 commit 7fc30e6

File tree

2 files changed

+37
-83
lines changed

2 files changed

+37
-83
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,19 @@
1-
use std::ops::Deref;
2-
31
use datafusion::{
42
common::stats::Precision,
53
physical_plan::{ColumnStatistics, Statistics},
64
scalar::ScalarValue,
75
};
6+
use iceberg_rust::error::Error;
87
use iceberg_rust::spec::{
98
manifest::{ManifestEntry, Status},
109
schema::Schema,
1110
values::Value,
1211
};
13-
use iceberg_rust::{catalog::tabular::Tabular, error::Error, table::Table};
14-
use itertools::Itertools;
15-
16-
use super::table::DataFusionTable;
17-
18-
impl DataFusionTable {
19-
pub(crate) async fn statistics(&self) -> Result<Statistics, Error> {
20-
match self.tabular.read().await.deref() {
21-
Tabular::Table(table) => table_statistics(table, &self.snapshot_range).await,
22-
Tabular::View(_) => Err(Error::NotSupported("Statistics for views".to_string())),
23-
Tabular::MaterializedView(mv) => {
24-
let table = mv.storage_table().await?;
25-
table_statistics(&table, &self.snapshot_range).await
26-
}
27-
}
28-
}
29-
}
30-
31-
pub(crate) async fn table_statistics(
32-
table: &Table,
33-
snapshot_range: &(Option<i64>, Option<i64>),
34-
) -> Result<Statistics, Error> {
35-
let schema = &snapshot_range
36-
.1
37-
.and_then(|snapshot_id| table.metadata().schema(snapshot_id).ok().cloned())
38-
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
39-
40-
let sequence_number_range = [snapshot_range.0, snapshot_range.1]
41-
.iter()
42-
.map(|x| x.and_then(|y| table.metadata().sequence_number(y)))
43-
.collect_tuple::<(Option<i64>, Option<i64>)>()
44-
.unwrap();
45-
46-
let manifests = table.manifests(snapshot_range.0, snapshot_range.1).await?;
47-
let datafiles = table
48-
.datafiles(&manifests, None, sequence_number_range)
49-
.await?;
50-
Ok(statistics_from_datafiles(
51-
schema,
52-
&datafiles.collect::<Result<Vec<_>, Error>>()?,
53-
))
54-
}
5512

56-
fn statistics_from_datafiles(schema: &Schema, datafiles: &[(String, ManifestEntry)]) -> Statistics {
13+
pub(crate) fn statistics_from_datafiles(
14+
schema: &Schema,
15+
datafiles: &[(String, ManifestEntry)],
16+
) -> Statistics {
5717
datafiles
5818
.iter()
5919
.filter(|(_, manifest)| !matches!(manifest.status(), Status::Deleted))

datafusion_iceberg/src/table.rs

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use tokio::sync::{
3535
};
3636
use tracing::{instrument, Instrument};
3737

38+
use crate::statistics::statistics_from_datafiles;
3839
use crate::{
3940
error::Error as DataFusionIcebergError,
4041
pruning_statistics::{transform_predicate, PruneDataFiles, PruneManifests},
@@ -73,7 +74,6 @@ use datafusion::{
7374
projection::ProjectionExec,
7475
union::UnionExec,
7576
DisplayAs, DisplayFormatType, ExecutionPlan, PhysicalExpr, SendableRecordBatchStream,
76-
Statistics,
7777
},
7878
prelude::Expr,
7979
scalar::ScalarValue,
@@ -259,16 +259,11 @@ impl TableProvider for DataFusionTable {
259259
}
260260
Tabular::Table(table) => {
261261
let schema = self.schema();
262-
let statistics = self
263-
.statistics()
264-
.await
265-
.map_err(DataFusionIcebergError::from)?;
266262
table_scan(
267263
table,
268264
&self.snapshot_range,
269265
schema,
270266
self.config.as_ref(),
271-
statistics,
272267
session_state,
273268
projection,
274269
filters,
@@ -282,16 +277,11 @@ impl TableProvider for DataFusionTable {
282277
.await
283278
.map_err(DataFusionIcebergError::from)?;
284279
let schema = self.schema();
285-
let statistics = self
286-
.statistics()
287-
.await
288-
.map_err(DataFusionIcebergError::from)?;
289280
table_scan(
290281
&table,
291282
&self.snapshot_range,
292283
schema,
293284
self.config.as_ref(),
294-
statistics,
295285
session_state,
296286
projection,
297287
filters,
@@ -350,7 +340,7 @@ fn fake_object_store_url(table_location_url: &str) -> ObjectStoreUrl {
350340
}
351341

352342
#[allow(clippy::too_many_arguments)]
353-
#[instrument(name = "datafusion_iceberg::table_scan", level = "debug", skip(arrow_schema, statistics, session, filters), fields(
343+
#[instrument(name = "datafusion_iceberg::table_scan", level = "debug", skip(arrow_schema, session, filters), fields(
354344
table_identifier = %table.identifier(),
355345
snapshot_range = ?snapshot_range,
356346
projection = ?projection,
@@ -362,7 +352,6 @@ async fn table_scan(
362352
snapshot_range: &(Option<i64>, Option<i64>),
363353
arrow_schema: SchemaRef,
364354
config: Option<&DataFusionTableConfig>,
365-
statistics: Statistics,
366355
session: &SessionState,
367356
projection: Option<&Vec<usize>>,
368357
filters: &[Expr],
@@ -445,7 +434,7 @@ async fn table_scan(
445434
HashMap::new();
446435

447436
// Prune data & delete file and insert them into the according map
448-
if let Some(physical_predicate) = physical_predicate.clone() {
437+
let statistics = if let Some(physical_predicate) = physical_predicate.clone() {
449438
let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone()));
450439
let partition_column_names = partition_fields
451440
.iter()
@@ -511,6 +500,8 @@ async fn table_scan(
511500
&data_files,
512501
))?;
513502

503+
let statistics = statistics_from_datafiles(&schema, &data_files);
504+
514505
data_files
515506
.into_iter()
516507
.zip(files_to_prune.into_iter())
@@ -535,40 +526,43 @@ async fn table_scan(
535526
}
536527
};
537528
});
529+
statistics
538530
} else {
539531
let manifests = table
540532
.manifests(snapshot_range.0, snapshot_range.1)
541533
.await
542534
.map_err(DataFusionIcebergError::from)?;
543-
let mut data_files = table
535+
let data_files: Vec<_> = table
544536
.datafiles(&manifests, None, sequence_number_range)
545537
.await
538+
.map_err(DataFusionIcebergError::from)?
539+
.try_collect()
546540
.map_err(DataFusionIcebergError::from)?;
547-
data_files
548-
.try_for_each(|manifest| {
549-
let manifest = manifest?;
550-
if *manifest.1.status() != Status::Deleted {
551-
match manifest.1.data_file().content() {
552-
Content::Data => {
553-
data_file_groups
554-
.entry(manifest.1.data_file().partition().clone())
555-
.or_default()
556-
.push(manifest);
557-
}
558-
Content::EqualityDeletes => {
559-
equality_delete_file_groups
560-
.entry(manifest.1.data_file().partition().clone())
561-
.or_default()
562-
.push(manifest);
563-
}
564-
Content::PositionDeletes => {
565-
panic!("Position deletes not supported.")
566-
}
541+
542+
let statistics = statistics_from_datafiles(&schema, &data_files);
543+
544+
data_files.into_iter().for_each(|manifest| {
545+
if *manifest.1.status() != Status::Deleted {
546+
match manifest.1.data_file().content() {
547+
Content::Data => {
548+
data_file_groups
549+
.entry(manifest.1.data_file().partition().clone())
550+
.or_default()
551+
.push(manifest);
552+
}
553+
Content::EqualityDeletes => {
554+
equality_delete_file_groups
555+
.entry(manifest.1.data_file().partition().clone())
556+
.or_default()
557+
.push(manifest);
558+
}
559+
Content::PositionDeletes => {
560+
panic!("Position deletes not supported.")
567561
}
568562
}
569-
Ok::<_, Error>(())
570-
})
571-
.map_err(DataFusionIcebergError::from)?;
563+
}
564+
});
565+
statistics
572566
};
573567

574568
let file_source = Arc::new(

0 commit comments

Comments
 (0)