Skip to content

Commit df4b383

Browse files
committed
refactor table statistics
1 parent 1316e09 commit df4b383

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use datafusion::{
55
physical_plan::{ColumnStatistics, Statistics},
66
scalar::ScalarValue,
77
};
8-
use futures::{future, stream, TryFutureExt, TryStreamExt};
98
use iceberg_rust::spec::{
109
manifest::{ManifestEntry, Status},
1110
schema::Schema,
@@ -48,9 +47,17 @@ pub(crate) async fn table_statistics(
4847
let datafiles = table
4948
.datafiles(&manifests, None, sequence_number_range)
5049
.await?;
51-
stream::iter(datafiles)
52-
.try_filter(|manifest| future::ready(!matches!(manifest.1.status(), Status::Deleted)))
53-
.try_fold(
50+
Ok(statistics_from_datafiles(
51+
schema,
52+
&datafiles.collect::<Result<Vec<_>, Error>>()?,
53+
))
54+
}
55+
56+
fn statistics_from_datafiles(schema: &Schema, datafiles: &[(String, ManifestEntry)]) -> Statistics {
57+
datafiles
58+
.iter()
59+
.filter(|(_, manifest)| !matches!(manifest.status(), Status::Deleted))
60+
.fold(
5461
Statistics {
5562
num_rows: Precision::Exact(0),
5663
total_byte_size: Precision::Exact(0),
@@ -65,14 +72,14 @@ pub(crate) async fn table_statistics(
6572
schema.fields().len()
6673
],
6774
},
68-
|acc, manifest| async move {
69-
let column_stats = column_statistics(schema, &manifest.1);
70-
Ok(Statistics {
75+
|acc, (_, manifest)| {
76+
let column_stats = column_statistics(schema, manifest);
77+
Statistics {
7178
num_rows: acc.num_rows.add(&Precision::Exact(
72-
*manifest.1.data_file().record_count() as usize,
79+
*manifest.data_file().record_count() as usize,
7380
)),
7481
total_byte_size: acc.total_byte_size.add(&Precision::Exact(
75-
*manifest.1.data_file().file_size_in_bytes() as usize,
82+
*manifest.data_file().file_size_in_bytes() as usize,
7683
)),
7784
column_statistics: acc
7885
.column_statistics
@@ -86,11 +93,9 @@ pub(crate) async fn table_statistics(
8693
sum_value: acc.sum_value.add(&x.sum_value),
8794
})
8895
.collect(),
89-
})
96+
}
9097
},
9198
)
92-
.map_err(Error::from)
93-
.await
9499
}
95100

96101
fn column_statistics<'a>(

0 commit comments

Comments
 (0)