Skip to content

Commit cefb9ed

Browse files
srhwaralexrom
authored andcommitted
chore(cubestore): Upgrade DF: Push down Sort,fetch at logical plan phase
1 parent 613e282 commit cefb9ed

File tree

2 files changed

+106
-63
lines changed

2 files changed

+106
-63
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8397,12 +8397,13 @@ async fn build_range_end(service: Box<dyn SqlClient>) {
83978397
]
83988398
);
83998399
}
8400-
async fn assert_limit_pushdown(
8400+
8401+
async fn assert_limit_pushdown_using_search_string(
84018402
service: &Box<dyn SqlClient>,
84028403
query: &str,
84038404
expected_index: Option<&str>,
84048405
is_limit_expected: bool,
8405-
is_tail_limit: bool,
8406+
search_string: &str,
84068407
) -> Result<Vec<Row>, String> {
84078408
let res = service
84088409
.exec_query(&format!("EXPLAIN ANALYZE {}", query))
@@ -8419,11 +8420,7 @@ async fn assert_limit_pushdown(
84198420
));
84208421
}
84218422
}
8422-
let expected_limit = if is_tail_limit {
8423-
"TailLimit"
8424-
} else {
8425-
"GlobalLimit"
8426-
};
8423+
let expected_limit = search_string;
84278424
if is_limit_expected {
84288425
if s.find(expected_limit).is_none() {
84298426
return Err(format!("{} expected but not found", expected_limit));
@@ -8441,6 +8438,27 @@ async fn assert_limit_pushdown(
84418438
Ok(res.get_rows().clone())
84428439
}
84438440

8441+
async fn assert_limit_pushdown(
8442+
service: &Box<dyn SqlClient>,
8443+
query: &str,
8444+
expected_index: Option<&str>,
8445+
is_limit_expected: bool,
8446+
is_tail_limit: bool,
8447+
) -> Result<Vec<Row>, String> {
8448+
assert_limit_pushdown_using_search_string(
8449+
service,
8450+
query,
8451+
expected_index,
8452+
is_limit_expected,
8453+
if is_tail_limit {
8454+
"TailLimit"
8455+
} else {
8456+
"GlobalLimit"
8457+
},
8458+
)
8459+
.await
8460+
}
8461+
84448462
async fn cache_incr(service: Box<dyn SqlClient>) {
84458463
service.note_non_idempotent_migration_test();
84468464
let query = r#"CACHE INCR "prefix:key""#;
@@ -9465,7 +9483,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
94659483
.await
94669484
.unwrap();
94679485
// ====================================
9468-
let res = assert_limit_pushdown(
9486+
let res = assert_limit_pushdown_using_search_string(
94699487
&service,
94709488
"SELECT a aaa, b bbbb, c FROM (
94719489
SELECT * FROM foo.pushdown_where_group1
@@ -9476,39 +9494,46 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
94769494
ORDER BY 2 LIMIT 4",
94779495
Some("ind1"),
94789496
true,
9479-
false,
9497+
"Sort, fetch: 4",
94809498
)
94819499
.await
94829500
.unwrap();
94839501

9484-
assert_eq!(
9485-
res,
9486-
vec![
9487-
Row::new(vec![
9488-
TableValue::Int(12),
9489-
TableValue::Int(20),
9490-
TableValue::Int(4)
9491-
]),
9492-
Row::new(vec![
9493-
TableValue::Int(12),
9494-
TableValue::Int(25),
9495-
TableValue::Int(5)
9496-
]),
9497-
Row::new(vec![
9498-
TableValue::Int(12),
9499-
TableValue::Int(25),
9500-
TableValue::Int(6)
9501-
]),
9502-
Row::new(vec![
9503-
TableValue::Int(12),
9504-
TableValue::Int(30),
9505-
TableValue::Int(7)
9506-
]),
9507-
]
9508-
);
9502+
let mut expected = vec![
9503+
Row::new(vec![
9504+
TableValue::Int(12),
9505+
TableValue::Int(20),
9506+
TableValue::Int(4),
9507+
]),
9508+
Row::new(vec![
9509+
TableValue::Int(12),
9510+
TableValue::Int(25),
9511+
TableValue::Int(5),
9512+
]),
9513+
Row::new(vec![
9514+
TableValue::Int(12),
9515+
TableValue::Int(25),
9516+
TableValue::Int(6),
9517+
]),
9518+
Row::new(vec![
9519+
TableValue::Int(12),
9520+
TableValue::Int(30),
9521+
TableValue::Int(7),
9522+
]),
9523+
];
9524+
if res != expected {
9525+
// Given the query, there are two valid orderings -- (12, 25, 5) and (12, 25, 6) can be swapped.
9526+
9527+
let mut values1 = expected[1].values().clone();
9528+
let mut values2 = expected[2].values().clone();
9529+
std::mem::swap(&mut values1[2], &mut values2[2]);
9530+
expected[1] = Row::new(values1);
9531+
expected[2] = Row::new(values2);
9532+
assert_eq!(res, expected);
9533+
}
95099534

95109535
// ====================================
9511-
let res = assert_limit_pushdown(
9536+
let res = assert_limit_pushdown_using_search_string(
95129537
&service,
95139538
"SELECT a, b, c FROM (
95149539
SELECT * FROM foo.pushdown_where_group1
@@ -9518,7 +9543,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
95189543
ORDER BY 3 LIMIT 3",
95199544
Some("ind2"),
95209545
true,
9521-
false,
9546+
"Sort, fetch: 3",
95229547
)
95239548
.await
95249549
.unwrap();
@@ -9545,7 +9570,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
95459570
);
95469571
//
95479572
// ====================================
9548-
let res = assert_limit_pushdown(
9573+
let res = assert_limit_pushdown_using_search_string(
95499574
&service,
95509575
"SELECT a, b, c FROM (
95519576
SELECT * FROM foo.pushdown_where_group1
@@ -9555,7 +9580,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
95559580
ORDER BY 3 DESC LIMIT 3",
95569581
Some("ind2"),
95579582
true,
9558-
true,
9583+
"Sort, fetch: 3",
95599584
)
95609585
.await
95619586
.unwrap();
@@ -9582,7 +9607,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
95829607
);
95839608
//
95849609
// ====================================
9585-
let res = assert_limit_pushdown(
9610+
let res = assert_limit_pushdown_using_search_string(
95869611
&service,
95879612
"SELECT a, b FROM (SELECT a, b, c FROM (
95889613
SELECT * FROM foo.pushdown_where_group1
@@ -9592,7 +9617,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
95929617
ORDER BY 1, 2 LIMIT 3) x",
95939618
Some("ind1"),
95949619
true,
9595-
false,
9620+
"Sort, fetch: 3",
95969621
)
95979622
.await
95989623
.unwrap();
@@ -9618,7 +9643,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96189643
]
96199644
);
96209645
// ====================================
9621-
let res = assert_limit_pushdown(
9646+
let res = assert_limit_pushdown_using_search_string(
96229647
&service,
96239648
"SELECT a, b FROM (SELECT a, b, c FROM (
96249649
SELECT * FROM foo.pushdown_where_group1
@@ -9628,7 +9653,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96289653
ORDER BY 1, 2 LIMIT 2 OFFSET 1) x",
96299654
Some("ind1"),
96309655
true,
9631-
false,
9656+
"Sort, fetch: 3",
96329657
)
96339658
.await
96349659
.unwrap();
@@ -9649,7 +9674,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96499674
]
96509675
);
96519676
// ====================================
9652-
let res = assert_limit_pushdown(
9677+
let res = assert_limit_pushdown_using_search_string(
96539678
&service,
96549679
"SELECT a, b, c FROM (
96559680
SELECT * FROM foo.pushdown_where_group1
@@ -9660,7 +9685,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96609685
ORDER BY 1 LIMIT 3",
96619686
Some("ind1"),
96629687
true,
9663-
false,
9688+
"Sort, fetch: 3",
96649689
)
96659690
.await
96669691
.unwrap();
@@ -9681,7 +9706,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96819706
]
96829707
);
96839708
// ====================================
9684-
let res = assert_limit_pushdown(
9709+
let res = assert_limit_pushdown_using_search_string(
96859710
&service,
96869711
"SELECT a, b, c FROM (
96879712
SELECT * FROM foo.pushdown_where_group1
@@ -9692,7 +9717,7 @@ async fn limit_pushdown_without_group(service: Box<dyn SqlClient>) {
96929717
ORDER BY 1, 3 LIMIT 3",
96939718
Some("ind1"),
96949719
true,
9695-
false,
9720+
"Sort, fetch: 3",
96969721
)
96979722
.await
96989723
.unwrap();
@@ -9755,7 +9780,7 @@ async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
97559780
.await
97569781
.unwrap();
97579782
// ====================================
9758-
let res = assert_limit_pushdown(
9783+
let res = assert_limit_pushdown_using_search_string(
97599784
&service,
97609785
"SELECT a aaa, b bbbb, c FROM (
97619786
SELECT * FROM foo.pushdown_where_group1
@@ -9766,7 +9791,7 @@ async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
97669791
ORDER BY 2 desc LIMIT 4",
97679792
Some("ind1"),
97689793
true,
9769-
true,
9794+
"Sort, fetch: 4",
97709795
)
97719796
.await
97729797
.unwrap();
@@ -9798,7 +9823,7 @@ async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
97989823
);
97999824

98009825
// ====================================
9801-
let res = assert_limit_pushdown(
9826+
let res = assert_limit_pushdown_using_search_string(
98029827
&service,
98039828
"SELECT a aaa, b bbbb, c FROM (
98049829
SELECT * FROM foo.pushdown_where_group1
@@ -9808,7 +9833,7 @@ async fn limit_pushdown_without_group_resort(service: Box<dyn SqlClient>) {
98089833
ORDER BY 1 desc, 2 desc LIMIT 3",
98099834
Some("ind1"),
98109835
true,
9811-
true,
9836+
"Sort, fetch: 3",
98129837
)
98139838
.await
98149839
.unwrap();
@@ -9908,7 +9933,7 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
99089933
.await
99099934
.unwrap();
99109935
// ====================================
9911-
let res = assert_limit_pushdown(
9936+
let res = assert_limit_pushdown_using_search_string(
99129937
&service,
99139938
"SELECT a, b, c FROM (
99149939
SELECT * FROM foo.pushdown_where_group1
@@ -9919,7 +9944,7 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
99199944
ORDER BY 2 LIMIT 4",
99209945
Some("ind1"),
99219946
true,
9922-
false,
9947+
"Sort, fetch: 4",
99239948
)
99249949
.await
99259950
.unwrap();
@@ -9946,7 +9971,7 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
99469971
);
99479972

99489973
// ====================================
9949-
let res = assert_limit_pushdown(
9974+
let res = assert_limit_pushdown_using_search_string(
99509975
&service,
99519976
"SELECT a, b, c FROM (
99529977
SELECT * FROM foo.pushdown_where_group1
@@ -9955,8 +9980,8 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
99559980
) as `tb`
99569981
ORDER BY 3 LIMIT 3",
99579982
Some("ind1"),
9958-
false,
9959-
false,
9983+
true,
9984+
"Sort, fetch: 3",
99609985
)
99619986
.await
99629987
.unwrap();

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,20 +1597,38 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result<LogicalPlan, DataFusionErr
15971597
LogicalPlan::Extension { .. } => return Ok(p),
15981598
// These nodes collect results from multiple partitions, return unchanged.
15991599
LogicalPlan::Aggregate { .. }
1600-
| LogicalPlan::Sort { .. }
1601-
| LogicalPlan::Limit { .. }
1602-
| LogicalPlan::Repartition { .. } => return Ok(p),
1600+
| LogicalPlan::Repartition { .. }
1601+
| LogicalPlan::Limit { .. } => return Ok(p),
1602+
// Collects results but let's push sort,fetch underneath the input.
1603+
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
1604+
let Some(send) = try_extract_cluster_send(input) else {
1605+
return Ok(p);
1606+
};
1607+
let Some(fetch) = fetch else {
1608+
return Ok(p);
1609+
};
1610+
let id = send.id;
1611+
snapshots = send.snapshots.clone();
1612+
let under_sort = LogicalPlan::Sort(Sort {
1613+
expr: expr.clone(),
1614+
input: send.input.clone(),
1615+
fetch: Some(*fetch),
1616+
});
1617+
// We discard limit_and_reverse, because we add a Sort node into the plan right here.
1618+
let limit_and_reverse = None;
1619+
let new_send =
1620+
ClusterSendNode::new(id, Arc::new(under_sort), snapshots, limit_and_reverse);
1621+
*input = Arc::new(new_send.into_plan());
1622+
return Ok(p);
1623+
}
16031624
// We can always pull cluster send for these nodes.
16041625
LogicalPlan::Projection(Projection { input, .. })
16051626
| LogicalPlan::Filter(Filter { input, .. })
16061627
| LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. })
16071628
| LogicalPlan::Unnest(Unnest { input, .. }) => {
1608-
let send;
1609-
if let Some(s) = try_extract_cluster_send(input) {
1610-
send = s;
1611-
} else {
1629+
let Some(send) = try_extract_cluster_send(input) else {
16121630
return Ok(p);
1613-
}
1631+
};
16141632
let id = send.id;
16151633
snapshots = send.snapshots.clone();
16161634
let limit = send.limit_and_reverse.clone();

0 commit comments

Comments
 (0)