Skip to content

Commit 520e59f

Browse files
authored
[Optimize] optimize sharding_rewrite mod (#387)
* optimize(pisa-proxy, sharding): optimize rewrite sql Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, optimize ShardingIdx struct, add InsertValueRowIdx struct for insert statement. * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, optimize ShardingIdx struct, add InsertValueRowIdx struct for insert statement. * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, add ShardingRewwriteResult struct. * optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, add ShardingRewwriteResult struct. Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, add agg_fields, fix update query id. Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): optimize rewrite sql Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, optimize ShardingIdx struct, add InsertValueRowIdx struct for insert statement. * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, optimize ShardingIdx struct, add InsertValueRowIdx struct for insert statement. * Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, add ShardingRewwriteResult struct. * optimize(pisa-proxy, sharding): WIP: optimize rewrite sql logic to support subquery, add ShardingRewwriteResult struct. Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: simplify code Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: simplify code Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: remove unused code Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: Adjust runtime Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: Adjust runtime Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: fix mix_max handle error Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: fix mix_max handle error Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: remove unused code Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> * optimize(pisa-proxy, sharding): WIP: remove unused code Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com> Signed-off-by: xuanyuan300 <xuanyuan300@gmail.com>
1 parent 28c3caf commit 520e59f

File tree

13 files changed

+956
-1048
lines changed

13 files changed

+956
-1048
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Generated by Cargo
33
# will have compiled files and executables
44
**/target/
5+
pisa-proxy/parser/mysql/examples/
56

67
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
78
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html

pisa-proxy/parser/mysql/src/ast/dml.rs

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -362,20 +362,14 @@ impl UnionOpt {
362362
}
363363

364364
#[derive(Debug, Clone)]
365-
pub enum Items {
366-
Wild(ItemWild),
367-
Items(Vec<Item>),
368-
None,
365+
pub struct Items {
366+
pub span: Span,
367+
pub items: Vec<Item>
369368
}
370369

371-
372370
impl Items {
373371
pub fn format(&self) -> String {
374-
match self {
375-
Self::Wild(_) => "*".to_string(),
376-
Self::None => "".to_string(),
377-
Self::Items(val) => val.iter().map(|x| x.format()).collect::<Vec<String>>().join(","),
378-
}
372+
self.items.iter().map(|x| x.format()).collect::<Vec<_>>().join(",")
379373
}
380374
}
381375

@@ -384,37 +378,31 @@ impl Visitor for Items {
384378
where
385379
T: Transformer,
386380
{
387-
match self {
388-
Self::Items(items) => {
389-
let mut new_items = Vec::with_capacity(items.len());
390-
for v in items {
391-
let mut node = Node::Item(v);
392-
let is_skip = tf.trans(&mut node);
393-
let new_node = node.into_item().unwrap();
394-
395-
if is_skip {
396-
tf.complete(&mut Node::Item(new_node));
397-
new_items.push(new_node.clone());
398-
continue;
399-
}
400-
401-
new_items.push(new_node.visit(tf));
402-
}
403381

404-
Items::Items(new_items)
382+
let mut new_items = Vec::with_capacity(self.items.len());
383+
for v in self.items.iter_mut() {
384+
let mut node = Node::Item(v);
385+
let is_skip = tf.trans(&mut node);
386+
let new_node = node.into_item().unwrap();
387+
388+
if is_skip {
389+
tf.complete(&mut Node::Item(new_node));
390+
new_items.push(new_node.clone());
391+
continue;
405392
}
406-
x => x.clone(),
393+
394+
new_items.push(new_node.visit(tf));
407395
}
408-
}
409-
}
410396

411-
#[derive(Debug, Clone)]
412-
pub struct ItemWild {
413-
pub span: Span
397+
self.items = new_items;
398+
399+
self.clone()
400+
}
414401
}
415402

416403
#[derive(Debug, Clone)]
417404
pub enum Item {
405+
Wild(Span),
418406
TableWild(TableWild),
419407

420408
//clippy::large_enum_variant
@@ -424,6 +412,7 @@ pub enum Item {
424412
impl Item {
425413
pub fn format(&self) -> String {
426414
match self {
415+
Self::Wild(_) => String::from("*"),
427416
Self::TableWild(val) => val.format(),
428417

429418
Self::ItemExpr(val) => val.format(),
@@ -434,6 +423,7 @@ impl Item {
434423
impl Visitor for Item {
435424
fn visit<T: Transformer>(&mut self, tf: &mut T) -> Self {
436425
match self {
426+
Self::Wild(val) => Item::Wild(*val),
437427
Self::TableWild(wild) => {
438428
let mut node = Node::TableWild(wild);
439429
tf.trans(&mut node);

pisa-proxy/parser/mysql/src/grammar.y

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -669,20 +669,23 @@ locked_row_action -> LockedRowAction:
669669
select_items -> Items:
670670
select_items ',' select_item
671671
{
672-
if let Items::Items(mut items) = $1 {
673-
items.push($3);
674-
Items::Items(items)
675-
} else {
676-
$1
677-
}
672+
$1.span = $span;
673+
$1.items.push($3);
674+
$1
678675
}
679676
| select_item
680677
{
681-
Items::Items(vec![$1])
678+
Items {
679+
span: $span,
680+
items: vec![$1],
681+
}
682682
}
683683
| '*'
684684
{
685-
Items::Wild( ItemWild { span: $span } )
685+
Items {
686+
span: $span,
687+
items: vec![Item::Wild($span)],
688+
}
686689
}
687690
;
688691

pisa-proxy/protocol/mysql/src/column.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,30 +44,43 @@ pub fn decode_column<T: AsRef<[u8]>>(buf: T) -> ColumnInfo {
4444
buf.decode_column()
4545
}
4646

47-
pub fn remove_column_by_idx(columns: &mut BytesMut, idx: usize) {
47+
//Remove the column of consecutive indexes in the chunk
48+
pub fn remove_column_by_idx(columns: &mut BytesMut, chunk: &[usize]) {
4849
let mut pos = 0;
49-
for _ in 0..idx {
50-
let length = get_length(columns.as_ref());
50+
let start_idx = chunk.first().unwrap_or_else(|| &0);
51+
52+
for _ in 0..*start_idx {
53+
let length = get_length(&columns[pos..]);
5154
pos += length + 4;
5255
}
5356

54-
let mut first_part = columns.split_off(pos);
57+
let mut remain_part = columns.split_off(pos);
5558
// get column idx length
56-
let idx_length = get_length(columns);
57-
let _ = first_part.split_to(idx_length + 4);
58-
columns.unsplit(first_part);
59+
pos = 0;
60+
for _ in chunk.iter() {
61+
if remain_part.len() <= pos {
62+
continue;
63+
}
64+
65+
let length = get_length(&remain_part[pos..]);
66+
pos += length + 4
67+
}
68+
69+
70+
remain_part.advance(pos);
71+
columns.unsplit(remain_part);
5972
}
6073

6174
pub fn add_column_by_idx<T: AsRef<[u8]>>(columns: &mut BytesMut, idx: usize, add: T) {
6275
let mut pos = 0;
6376
for _ in 0..idx {
64-
let length = get_length(columns.as_ref());
77+
let length = get_length(&columns[pos..]);
6578
pos += length + 4;
6679
}
6780

68-
let first_part = columns.split_off(pos);
81+
let remain_part = columns.split_off(pos);
6982
columns.put_slice(add.as_ref());
70-
columns.unsplit(first_part);
83+
columns.put_slice(&remain_part);
7184
}
7285

7386
/// ColumnBuf trait, Inherit BufExt
@@ -240,29 +253,29 @@ mod test {
240253
fn test_remove_column_by_idx() {
241254
let data = [
242255
1, 0, 0, 0, 1,
243-
1, 0, 0, 0, 2,
256+
2, 0, 0, 0, 2, 3,
244257
1, 0, 0, 0, 3,
245258
];
246259

247260
let expect_data = [
248261
1, 0, 0, 0, 1,
249-
1, 0, 0, 0, 2,
262+
2, 0, 0, 0, 2, 3
250263
];
251264
let mut buf = BytesMut::from(&data[..]);
252-
remove_column_by_idx(&mut buf, 2);
265+
remove_column_by_idx(&mut buf, &[2]);
253266
assert_eq!(&buf[..], expect_data);
254267
}
255268

256269
#[test]
257270
fn test_add_column_by_idx() {
258271
let data = [
259272
1, 0, 0, 0, 1,
260-
1, 0, 0, 0, 2,
273+
2, 0, 0, 0, 2, 3,
261274
];
262275

263276
let expect_data = [
264277
1, 0, 0, 0, 1,
265-
1, 0, 0, 0, 2,
278+
2, 0, 0, 0, 2, 3,
266279
1, 0, 0, 0, 3,
267280
];
268281
let mut buf = BytesMut::from(&data[..]);

pisa-proxy/proxy/endpoint/src/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use std::fmt;
1616

17-
#[derive(Clone, Default, PartialEq)]
17+
#[derive(Clone, Default, Hash, PartialEq, Eq)]
1818
pub struct Endpoint {
1919
pub weight: i64,
2020
pub name: String,

pisa-proxy/proxy/strategy/src/sharding_rewrite/meta.rs

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,18 @@ struct QueryRequire {
159159
require_query_id: u8,
160160
}
161161

162+
#[derive(Debug, Clone)]
163+
pub struct QueryMeta {
164+
pub query_id: u8,
165+
pub span: mysql_parser::Span,
166+
}
167+
168+
#[derive(Debug, Clone)]
169+
pub struct FieldBlockMeta {
170+
pub query_id: u8,
171+
pub span: mysql_parser::Span,
172+
}
173+
162174
#[derive(Debug)]
163175
pub struct RewriteMetaData {
164176
tables: IndexMap<u8, Vec<TableMeta>>,
@@ -170,6 +182,8 @@ pub struct RewriteMetaData {
170182
on_conds: IndexMap<u8, Vec<JoinOnCond>>,
171183
avgs: IndexMap<u8, Vec<AvgMeta>>,
172184
inserts: IndexMap<u8, Vec<InsertValsMeta>>,
185+
queries: IndexMap<u8, QueryMeta>,
186+
field_blocks: IndexMap<u8, FieldBlockMeta>,
173187
query_id: u8,
174188
state: ScanState,
175189
requires: Vec<QueryRequire>,
@@ -194,6 +208,8 @@ impl RewriteMetaData {
194208
on_conds: IndexMap::new(),
195209
avgs: IndexMap::new(),
196210
inserts: IndexMap::new(),
211+
queries: IndexMap::new(),
212+
field_blocks: IndexMap::new(),
197213
state: ScanState::Empty,
198214
requires: vec![],
199215
prev_expr_type: None,
@@ -212,6 +228,22 @@ impl RewriteMetaData {
212228
None => input[span.start()..span.end()].to_string(),
213229
}
214230
}
231+
232+
fn push_query(&mut self, meta: QueryMeta) {
233+
self.queries.insert(self.query_id, meta);
234+
}
235+
236+
pub fn get_queries(&self) -> &IndexMap<u8, QueryMeta> {
237+
&self.queries
238+
}
239+
240+
fn push_field_block(&mut self, meta: FieldBlockMeta) {
241+
self.field_blocks.insert(self.query_id, meta);
242+
}
243+
244+
pub fn get_field_blocks(&self) -> &IndexMap<u8, FieldBlockMeta> {
245+
&self.field_blocks
246+
}
215247
}
216248

217249
macro_rules! gen_push_func {
@@ -244,8 +276,12 @@ gen_push_func!(push_insert_value, get_inserts, InsertValsMeta, inserts);
244276
impl Transformer for RewriteMetaData {
245277
fn trans(&mut self, node: &mut Node<'_>) -> bool {
246278
match node {
247-
Node::Query(_q) => {
279+
Node::Query(q) => {
248280
self.query_id += 1;
281+
self.push_query(QueryMeta {
282+
query_id: self.query_id,
283+
span: q.span,
284+
})
249285
}
250286

251287
Node::SubQuery(q) => {
@@ -254,15 +290,23 @@ impl Transformer for RewriteMetaData {
254290
return true;
255291
}
256292

257-
Node::SingleTable(t) => {
258-
self.push_table(t.table_name.clone());
293+
Node::UpdateStmt(stmt) => {
294+
self.query_id += 1;
295+
self.push_query(QueryMeta { query_id: self.query_id, span: stmt.span })
259296
}
260297

261-
Node::DeleteStmt(t) => {
262-
if let Some(table) = &t.table_name {
298+
Node::DeleteStmt(stmt) => {
299+
self.query_id += 1;
300+
self.push_query(QueryMeta { query_id: self.query_id, span: stmt.span });
301+
if let Some(table) = &stmt.table_name {
263302
self.push_table(table.clone());
264303
}
265304
}
305+
306+
Node::SingleTable(t) => {
307+
self.push_table(t.table_name.clone());
308+
}
309+
266310

267311
Node::FromClause(_) => {
268312
self.state = ScanState::TableName;
@@ -273,22 +317,17 @@ impl Transformer for RewriteMetaData {
273317
}
274318

275319
Node::Items(t) => {
320+
self.push_field_block(FieldBlockMeta { query_id: self.query_id, span: t.span });
276321
self.state = ScanState::Field(None);
277-
match t {
278-
Items::Items(t) => {
279-
for i in t {
280-
match i {
281-
Item::TableWild(val) => {
282-
self.push_field(FieldMeta::TableWild(val.clone()))
283-
}
284-
285-
_ => {}
286-
}
322+
for item in t.items.iter() {
323+
match item {
324+
Item::TableWild(val) => {
325+
self.push_field(FieldMeta::TableWild(val.clone()))
287326
}
327+
_ => {}
288328
}
289-
290-
_ => {}
291329
}
330+
292331
}
293332

294333
Node::ItemExpr(item) => {

0 commit comments

Comments
 (0)