|
4 | 4 |
|
5 | 5 | use std::{ |
6 | 6 | collections::{hash_map::Entry, HashMap}, |
| 7 | + ops::Sub, |
7 | 8 | sync::Arc, |
8 | 9 | }; |
9 | 10 |
|
@@ -89,18 +90,59 @@ pub fn parquet_to_datafile( |
89 | 90 | .and_modify(|x| *x += null_count as i64) |
90 | 91 | .or_insert(null_count as i64); |
91 | 92 | } |
92 | | - if let Some(distinct_count) = statistics.distinct_count_opt() { |
93 | | - distinct_counts |
94 | | - .entry(id) |
95 | | - .and_modify(|x| *x += distinct_count as i64) |
96 | | - .or_insert(distinct_count as i64); |
97 | | - } |
| 93 | + |
98 | 94 | let data_type = &schema |
99 | 95 | .fields() |
100 | 96 | .get(id as usize) |
101 | 97 | .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))? |
102 | 98 | .field_type; |
103 | 99 |
|
| 100 | + if let (Some(distinct_count), Some(min_bytes), Some(max_bytes)) = ( |
| 101 | + statistics.distinct_count_opt(), |
| 102 | + statistics.min_bytes_opt(), |
| 103 | + statistics.max_bytes_opt(), |
| 104 | + ) { |
| 105 | + let min = Value::try_from_bytes(min_bytes, data_type)?; |
| 106 | + let max = Value::try_from_bytes(max_bytes, data_type)?; |
| 107 | + let current_min = lower_bounds.get(&id); |
| 108 | + let current_max = upper_bounds.get(&id); |
| 109 | + match (min, max, current_min, current_max) { |
| 110 | + ( |
| 111 | + Value::Int(min), |
| 112 | + Value::Int(max), |
| 113 | + Some(Value::Int(current_min)), |
| 114 | + Some(Value::Int(current_max)), |
| 115 | + ) => { |
| 116 | + let overlap = |
| 117 | + range_overlap(&[current_min, current_max], &[&min, &max]).max(0); |
| 118 | + distinct_counts |
| 119 | + .entry(id) |
| 120 | + .and_modify(|x| { |
| 121 | + *x += ((1 - overlap as i64 / (max - min) as i64) |
| 122 | + * distinct_count as i64) |
| 123 | + as i64 |
| 124 | + }) |
| 125 | + .or_insert(distinct_count as i64); |
| 126 | + } |
| 127 | + ( |
| 128 | + Value::LongInt(_min), |
| 129 | + Value::LongInt(_max), |
| 130 | + Some(Value::LongInt(_current_min)), |
| 131 | + Some(Value::LongInt(_current_max)), |
| 132 | + ) => (), |
| 133 | + (_, _, None, None) => { |
| 134 | + distinct_counts.entry(id).or_insert(distinct_count as i64); |
| 135 | + } |
| 136 | + _ => (), |
| 137 | + } |
| 138 | + if let Type::Primitive(_) = &data_type { |
| 139 | + distinct_counts |
| 140 | + .entry(id) |
| 141 | + .and_modify(|x| *x += distinct_count as i64) |
| 142 | + .or_insert(distinct_count as i64); |
| 143 | + } |
| 144 | + } |
| 145 | + |
104 | 146 | if let Some(min_bytes) = statistics.min_bytes_opt() { |
105 | 147 | if let Type::Primitive(_) = &data_type { |
106 | 148 | let new = Value::try_from_bytes(min_bytes, data_type)?; |
@@ -275,3 +317,12 @@ pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> { |
275 | 317 | metadata.write_to_out_protocol(&mut protocol)?; |
276 | 318 | Ok(buffer.bytes_written()) |
277 | 319 | } |
| 320 | + |
| 321 | +fn range_overlap<T: Ord + Sub + Copy>( |
| 322 | + old_range: &[&T; 2], |
| 323 | + new_range: &[&T; 2], |
| 324 | +) -> <T as Sub>::Output { |
| 325 | + let overlap_start = (*old_range[0]).max(*new_range[0]); |
| 326 | + let overlap_end = (*old_range[1]).min(*new_range[1]); |
| 327 | + overlap_end - overlap_start |
| 328 | +} |
0 commit comments