Skip to content

Commit 28c3caf

Browse files
wbtlbwangbo
andauthored
fix(sharding): fix min max error (#386)
Signed-off-by: wangbo <wangbo@sphere-ex.com> Signed-off-by: wangbo <wangbo@sphere-ex.com> Co-authored-by: wangbo <wangbo@sphere-ex.com>
1 parent b12b167 commit 28c3caf

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

pisa-proxy/protocol/mysql/src/row.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub enum RowDataTyp<T: AsRef<[u8]>> {
3434
Binary(RowDataBinary<T>),
3535
}
3636

37+
#[derive(Clone, Debug)]
3738
pub struct RowPartData {
3839
pub data: Box<[u8]>,
3940
pub start_idx: usize,

pisa-proxy/runtime/mysql/src/server/executor.rs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,10 @@ where
209209
while let Some(chunk) = stream.next().await {
210210
let mut chunk = chunk
211211
.into_par_iter().map(|x| x.unwrap()).collect::<Result<Vec<_>, _>>().map_err(ErrorKind::from)?;
212-
213-
for i in chunk.iter() {
214-
println!("or min_max {:?}", &i[..]);
215-
}
216212

217213
let ro = &req.rewrite_outputs[0];
218214
Self::handle_min_max(ro, &mut chunk, row_data.clone(), is_binary)?;
219-
for i in chunk.iter() {
220-
println!("min_max {:?}", &i[..]);
221-
}
222-
215+
223216
let avg_change = ro.changes.iter().find_map(|x| {
224217
if let RewriteChange::AvgChange(change) = x {
225218
Some(change)
@@ -233,7 +226,6 @@ where
233226
let sum_field = avg.target.get(AVG_SUM).unwrap();
234227

235228
let (count_data, sum_data): (Vec<_>, Vec<_>) = chunk.par_iter().map(|x| -> Result<(u64, u64), Error> {
236-
println!("xxx {:?}", &x[..]);
237229
let mut row_data = row_data.clone();
238230
row_data.with_buf(&x[4..]);
239231
let count = decode_with_name::<&[u8], u64>(&mut row_data, &count_field, is_binary).map_err(|e| ErrorKind::Runtime(e))?.unwrap_or_else(|| 0);
@@ -254,7 +246,6 @@ where
254246

255247
let count: u64 = count_data.par_iter().sum();
256248
let sum: u64 = sum_data.par_iter().sum();
257-
println!("count {:?}, sum {:?}", count, sum);
258249

259250
chunk.par_iter_mut().for_each(|x| {
260251
let mut row_data = row_data.clone();
@@ -301,10 +292,8 @@ where
301292
let count_sum = chunk.par_iter().map(|x| {
302293
let mut row_data = row_data.clone();
303294
row_data.with_buf(&x[4..]);
304-
println!("count x {:?}", &x[4..]);
305295
decode_with_name::<&[u8], u64>(&mut row_data, &count_field.name, is_binary).unwrap().unwrap()
306296
}).sum::<u64>();
307-
println!("count_sun {:?}", count_sum);
308297

309298
let chunk_data = &chunk[0];
310299
let mut row_data = row_data.clone();
@@ -338,7 +327,7 @@ where
338327
}
339328
}
340329

341-
if chunk.par_iter().min() == chunk.par_iter().max() {
330+
if chunk.par_iter().map(|x| &x[4..]).min() == chunk.par_iter().map(|x| &x[4..]).max() {
342331
let _ = req
343332
.framed
344333
.codec_mut()
@@ -347,7 +336,6 @@ where
347336
}
348337

349338
for row in chunk.iter() {
350-
println!("end row {:?}", &row[..]);
351339
let _ = req
352340
.framed
353341
.codec_mut()
@@ -367,7 +355,6 @@ where
367355
let (a, b) = get_min_max_value(&mut row_data, is_binary, &mmf.name, a, b);
368356
b.cmp(&a)
369357
});
370-
371358
}
372359
FieldWrapFunc::Min => {
373360
chunk.par_sort_unstable_by(|a, b| {
@@ -381,11 +368,16 @@ where
381368
}
382369

383370
let chunk_data = &chunk[0];
371+
let ori_row_data = row_data.clone();
384372
let mut row_data = row_data.clone();
385373
row_data.with_buf(&chunk_data[4..]);
386374
let row_part_data = row_data.get_row_data_with_name(&mmf.name).map_err(|e| ErrorKind::Runtime(e))?.unwrap();
387375
chunk.par_iter_mut().for_each(|x| {
388-
row_data_cut_merge(x, &row_part_data, |data: &mut BytesMut| {
376+
let mut row_data = ori_row_data.clone();
377+
row_data.with_buf(&x[4..]);
378+
let ori_row_part_data = row_data.get_row_data_with_name(&mmf.name).unwrap().unwrap();
379+
380+
row_data_cut_merge(x, &ori_row_part_data, |data: &mut BytesMut| {
389381
if is_binary {
390382
data.extend_from_slice(&row_part_data.data);
391383
} else {
@@ -650,9 +642,8 @@ where F: FnOnce(&mut BytesMut)
650642
{
651643
let mut data = ori_data.split_off(4);
652644
let mut data_remain = data.split_off(row_part_data.start_idx);
653-
654645
f(&mut data);
655-
646+
656647
let _ = data_remain.split_to(row_part_data.part_encode_length + row_part_data.part_data_length);
657648
data.extend_from_slice(&data_remain);
658649
ori_data.extend_from_slice(&data);

0 commit comments

Comments
 (0)