@@ -4,6 +4,7 @@ use datafusion::{
44 scalar:: ScalarValue ,
55} ;
66use iceberg_rust:: error:: Error ;
7+ use iceberg_rust:: file_format:: parquet:: estimate_distinct_count;
78use iceberg_rust:: spec:: {
89 manifest:: { ManifestEntry , Status } ,
910 schema:: Schema ,
@@ -45,12 +46,16 @@ pub(crate) fn statistics_from_datafiles(
4546 . column_statistics
4647 . into_iter ( )
4748 . zip ( column_stats)
48- . map ( |( acc, x) | ColumnStatistics {
49- null_count : acc. null_count . add ( & x. null_count ) ,
50- max_value : acc. max_value . max ( & x. max_value ) ,
51- min_value : acc. min_value . min ( & x. min_value ) ,
52- distinct_count : acc. distinct_count . add ( & x. distinct_count ) ,
53- sum_value : acc. sum_value . add ( & x. sum_value ) ,
49+ . map ( |( acc, x) | {
50+ let new_distinct_count = new_distinct_count ( & acc, & x) ;
51+
52+ ColumnStatistics {
53+ null_count : acc. null_count . add ( & x. null_count ) ,
54+ max_value : acc. max_value . max ( & x. max_value ) ,
55+ min_value : acc. min_value . min ( & x. min_value ) ,
56+ distinct_count : new_distinct_count,
57+ sum_value : acc. sum_value . add ( & x. sum_value ) ,
58+ }
5459 } )
5560 . collect ( ) ,
5661 }
@@ -134,3 +139,49 @@ fn convert_value_to_scalar_value(value: Value) -> Result<ScalarValue, Error> {
134139 ) ) ,
135140 }
136141}
142+
143+ fn new_distinct_count ( acc : & ColumnStatistics , x : & ColumnStatistics ) -> Precision < usize > {
144+ match (
145+ & acc. distinct_count ,
146+ & x. distinct_count ,
147+ & acc. min_value ,
148+ & acc. max_value ,
149+ & x. min_value ,
150+ & x. max_value ,
151+ ) {
152+ (
153+ Precision :: Exact ( old_count) ,
154+ Precision :: Exact ( new_count) ,
155+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( old_min) ) ) ,
156+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( old_max) ) ) ,
157+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( new_min) ) ) ,
158+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( new_max) ) ) ,
159+ ) => {
160+ let estimated = estimate_distinct_count (
161+ & [ old_min, old_max] ,
162+ & [ new_min, new_max] ,
163+ * old_count as i64 ,
164+ * new_count as i64 ,
165+ ) ;
166+ Precision :: Inexact ( * old_count + estimated as usize )
167+ }
168+ (
169+ Precision :: Exact ( old_count) ,
170+ Precision :: Exact ( new_count) ,
171+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( old_min) ) ) ,
172+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( old_max) ) ) ,
173+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( new_min) ) ) ,
174+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( new_max) ) ) ,
175+ ) => {
176+ let estimated = estimate_distinct_count (
177+ & [ old_min, old_max] ,
178+ & [ new_min, new_max] ,
179+ * old_count as i64 ,
180+ * new_count as i64 ,
181+ ) ;
182+ Precision :: Inexact ( * old_count + estimated as usize )
183+ }
184+ ( Precision :: Absent , Precision :: Exact ( _) , _, _, _, _) => x. distinct_count ,
185+ _ => acc. distinct_count . add ( & x. distinct_count ) ,
186+ }
187+ }
0 commit comments