Skip to content

Commit 1a2c145

Browse files
committed
fix(cubestore): Allowing window functions in cluster send planning
1 parent 2aa7838 commit 1a2c145

File tree

2 files changed

+7
-10
lines changed

2 files changed

+7
-10
lines changed

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,6 +1597,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result<LogicalPlan, DataFusionErr
15971597
LogicalPlan::Extension { .. } => return Ok(p),
15981598
// These nodes collect results from multiple partitions, return unchanged.
15991599
LogicalPlan::Aggregate { .. }
1600+
| LogicalPlan::Window { .. }
16001601
| LogicalPlan::Repartition { .. }
16011602
| LogicalPlan::Limit { .. } => return Ok(p),
16021603
// Collects results but let's push sort,fetch underneath the input.

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -710,11 +710,9 @@ impl CubeTable {
710710
let mut options = TableParquetOptions::new();
711711
options.global = state.config_options().execution.parquet.clone();
712712

713-
let parquet_source = ParquetSource::new(
714-
options,
715-
get_reader_options_customizer(state.config()),
716-
)
717-
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
713+
let parquet_source =
714+
ParquetSource::new(options, get_reader_options_customizer(state.config()))
715+
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
718716
let parquet_source = if let Some(phys_pred) = &physical_predicate {
719717
parquet_source.with_predicate(index_schema.clone(), phys_pred.clone())
720718
} else {
@@ -792,11 +790,9 @@ impl CubeTable {
792790

793791
let mut options = TableParquetOptions::new();
794792
options.global = state.config_options().execution.parquet.clone();
795-
let parquet_source = ParquetSource::new(
796-
options,
797-
get_reader_options_customizer(state.config()),
798-
)
799-
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
793+
let parquet_source =
794+
ParquetSource::new(options, get_reader_options_customizer(state.config()))
795+
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
800796
let parquet_source = if let Some(phys_pred) = &physical_predicate {
801797
parquet_source.with_predicate(index_schema.clone(), phys_pred.clone())
802798
} else {

0 commit comments

Comments
 (0)