diff --git a/datafusion/functions-nested/benches/array_reverse.rs b/datafusion/functions-nested/benches/array_reverse.rs index d4a63e36403a..92a65128fe6b 100644 --- a/datafusion/functions-nested/benches/array_reverse.rs +++ b/datafusion/functions-nested/benches/array_reverse.rs @@ -24,7 +24,7 @@ use std::{hint::black_box, sync::Arc}; use crate::criterion::Criterion; use arrow::{ array::{ArrayRef, FixedSizeListArray, Int32Array, ListArray, ListViewArray}, - buffer::{OffsetBuffer, ScalarBuffer}, + buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field}, }; use datafusion_functions_nested::reverse::array_reverse_inner; @@ -34,44 +34,80 @@ fn array_reverse(array: &ArrayRef) -> ArrayRef { } fn criterion_benchmark(c: &mut Criterion) { - // Construct large arrays for benchmarking - let array_len = 100000; - let step_size: usize = 1000; - let offsets: Vec = (0..array_len as i32).step_by(step_size).collect(); + // Create array sizes with step size of 100, starting from 100. + let number_of_arrays = 1000; + let sizes = (0..number_of_arrays) + .map(|i| 100 + i * 100) + .collect::>(); + + // Calculate the total number of values + let total_values = sizes.iter().sum::(); + + // Calculate sizes and offsets from array lengths + let offsets = sizes + .iter() + .scan(0, |acc, &x| { + let offset = *acc; + *acc += x; + Some(offset) + }) + .collect::>(); let offsets = ScalarBuffer::from(offsets); - let sizes: Vec = vec![step_size as i32; array_len / step_size]; - let values = (0..array_len as i32).collect::>(); + // Set every 10th array to null + let nulls = (0..number_of_arrays) + .map(|i| i % 10 != 0) + .collect::>(); + + let values = (0..total_values).collect::>(); + let values = Arc::new(Int32Array::from(values)); + + // Create ListArray and ListViewArray + let nulls_list_array = Some(NullBuffer::from( + nulls[..((number_of_arrays as usize) - 1)].to_vec(), + )); let list_array: ArrayRef = Arc::new(ListArray::new( Arc::new(Field::new("a", DataType::Int32, false)), OffsetBuffer::new(offsets.clone()), - Arc::new(Int32Array::from(values.clone())), - None, + values.clone(), + nulls_list_array, )); - let fixed_size_list_array: ArrayRef = Arc::new(FixedSizeListArray::new( - Arc::new(Field::new("a", DataType::Int32, false)), - step_size as i32, - Arc::new(Int32Array::from(values.clone())), - None, + let nulls_list_view_array = Some(NullBuffer::from( + nulls[..(number_of_arrays as usize)].to_vec(), )); let list_view_array: ArrayRef = Arc::new(ListViewArray::new( Arc::new(Field::new("a", DataType::Int32, false)), offsets, ScalarBuffer::from(sizes), - Arc::new(Int32Array::from(values)), - None, + values.clone(), + nulls_list_view_array, )); c.bench_function("array_reverse_list", |b| { b.iter(|| array_reverse(&list_array)) }); - c.bench_function("array_reverse_fixed_size_list", |b| { - b.iter(|| array_reverse(&fixed_size_list_array)) - }); - c.bench_function("array_reverse_list_view", |b| { b.iter(|| array_reverse(&list_view_array)) }); + + // Create FixedSizeListArray + let array_len = 1000; + let num_arrays = 5000; + let total_values = num_arrays * array_len; + let values = (0..total_values).collect::>(); + let values = Arc::new(Int32Array::from(values)); + // Set every 10th array to null + let nulls = (0..num_arrays).map(|i| i % 10 != 0).collect::>(); + let nulls = Some(NullBuffer::from(nulls)); + let fixed_size_list_array: ArrayRef = Arc::new(FixedSizeListArray::new( + Arc::new(Field::new("a", DataType::Int32, false)), + array_len, + values.clone(), + nulls.clone(), + )); + c.bench_function("array_reverse_fixed_size_list", |b| { + b.iter(|| array_reverse(&fixed_size_list_array)) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/functions-nested/src/reverse.rs b/datafusion/functions-nested/src/reverse.rs index 635f23967a19..df873ade798d 100644 --- a/datafusion/functions-nested/src/reverse.rs +++ b/datafusion/functions-nested/src/reverse.rs @@ -19,8 +19,8 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, FixedSizeListArray, GenericListArray, - GenericListViewArray, MutableArrayData, OffsetSizeTrait, UInt32Array, + Array, ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, + OffsetSizeTrait, UInt32Array, UInt64Array, }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::compute::take; @@ -155,11 +155,8 @@ fn general_array_reverse( field: &FieldRef, ) -> Result { let values = array.values(); - let original_data = values.to_data(); - let capacity = Capacities::Array(original_data.len()); let mut offsets = vec![O::usize_as(0)]; - let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); + let mut indices: Vec = Vec::with_capacity(values.len()); for (row_index, (&start, &end)) in array.offsets().iter().tuple_windows().enumerate() { @@ -171,18 +168,34 @@ fn general_array_reverse( let mut index = end - O::one(); while index >= start { - mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1); + indices.push(index); index = index - O::one(); } let size = end - start; offsets.push(offsets[row_index] + size); } - let data = mutable.freeze(); + // Materialize values from underlying array with take + let indices_array: ArrayRef = if O::IS_LARGE { + Arc::new(UInt64Array::from( + indices + .iter() + .map(|i| i.as_usize() as u64) + .collect::>(), + )) + } else { + Arc::new(UInt32Array::from( + indices + .iter() + .map(|i| i.as_usize() as u32) + .collect::>(), + )) + }; + let values = take(&values, &indices_array, None)?; Ok(Arc::new(GenericListArray::::try_new( Arc::clone(field), OffsetBuffer::::new(offsets.into()), - arrow::array::make_array(data), + values, array.nulls().cloned(), )?)) } @@ -231,7 +244,7 @@ fn list_view_reverse( // Materialize values from underlying array with take let indices_array: ArrayRef = if O::IS_LARGE { - Arc::new(arrow::array::UInt64Array::from( + Arc::new(UInt64Array::from( indices .iter() .map(|i| i.as_usize() as u64) @@ -245,13 +258,12 @@ fn list_view_reverse( .collect::>(), )) }; - let values_reversed = take(&values, &indices_array, None)?; - + let values = take(&values, &indices_array, None)?; Ok(Arc::new(GenericListViewArray::::try_new( Arc::clone(field), ScalarBuffer::from(new_offsets), ScalarBuffer::from(new_sizes), - values_reversed, + values, array.nulls().cloned(), )?)) } @@ -260,42 +272,34 @@ fn fixed_size_array_reverse( array: &FixedSizeListArray, field: &FieldRef, ) -> Result { - let values = array.values(); - let original_data = values.to_data(); - let capacity = Capacities::Array(original_data.len()); - let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); - let value_length = array.value_length() as usize; + let values: &Arc = array.values(); - for row_index in 0..array.len() { - // skip the null value - if array.is_null(row_index) { - mutable.extend(0, 0, value_length); - continue; - } - let start = row_index * value_length; - let end = start + value_length; - for idx in (start..end).rev() { - mutable.extend(0, idx, idx + 1); - } + // Since each fixed size list in the physical array is the same size and we keep the order + // of the fixed size lists, we can reverse the indices for each fixed size list. + let mut indices: Vec = (0..values.len() as u64).collect(); + for chunk in indices.chunks_mut(array.value_length() as usize) { + chunk.reverse(); } - let data = mutable.freeze(); + // Materialize values from underlying array with take + let indices_array: ArrayRef = Arc::new(UInt64Array::from(indices)); + let values = take(&values, &indices_array, None)?; + Ok(Arc::new(FixedSizeListArray::try_new( Arc::clone(field), array.value_length(), - arrow::array::make_array(data), + values, array.nulls().cloned(), )?)) } #[cfg(test)] mod tests { - use crate::reverse::list_view_reverse; + use crate::reverse::{fixed_size_array_reverse, list_view_reverse}; use arrow::{ array::{ - AsArray, GenericListViewArray, Int32Array, LargeListViewArray, ListViewArray, - OffsetSizeTrait, + AsArray, FixedSizeListArray, GenericListViewArray, Int32Array, + LargeListViewArray, ListViewArray, OffsetSizeTrait, }, buffer::{NullBuffer, ScalarBuffer}, datatypes::{DataType, Field, Int32Type}, @@ -312,6 +316,13 @@ mod tests { .collect() } + fn fixed_size_list_values(array: &FixedSizeListArray) -> Vec>> { + array + .iter() + .map(|x| x.map(|x| x.as_primitive::().values().to_vec())) + .collect() + } + #[test] fn test_reverse_list_view() -> Result<()> { let field = Arc::new(Field::new("a", DataType::Int32, false)); @@ -450,4 +461,40 @@ mod tests { assert_eq!(expected, reversed); Ok(()) } + + #[test] + fn test_reverse_fixed_size_list() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + let result = fixed_size_array_reverse( + &FixedSizeListArray::new( + field, + 3, + values, + Some(NullBuffer::from(vec![true, false, true])), + ), + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = fixed_size_list_values(result.as_fixed_size_list()); + let expected = vec![Some(vec![3, 2, 1]), None, Some(vec![9, 8, 7])]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_fixed_size_list_empty() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let empty_array: Vec = vec![]; + let values = Arc::new(Int32Array::from(empty_array)); + let nulls = None; + let fixed_size_list = FixedSizeListArray::new(field, 3, values, nulls); + let result = fixed_size_array_reverse( + &fixed_size_list, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = fixed_size_list_values(result.as_fixed_size_list()); + let expected: Vec>> = vec![]; + assert_eq!(expected, reversed); + Ok(()) + } }