Skip to content

Commit 6ef2a9b

Browse files
author
James Cor
committed
fix timer code
1 parent 58a65bb commit 6ef2a9b

File tree

1 file changed

+78
-60
lines changed

1 file changed

+78
-60
lines changed

server/handler.go

Lines changed: 78 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -650,17 +650,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
650650
timer := time.NewTimer(waitTime)
651651
defer timer.Stop()
652652

653-
// Wait for signal on the timer.C channel, and error accordingly
654-
eg.Go(func() (err error) {
655-
<-timer.C
656-
if h.readTimeout != 0 {
657-
// Cancel and return so Vitess can call the CloseConnection callback
658-
ctx.GetLogger().Tracef("connection timeout")
659-
return ErrRowTimeout.New()
660-
}
661-
return nil
662-
})
663-
664653
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
665654
// clean out rows that have already been spooled.
666655
// A server-side cursor allows the caller to fetch results cached on the server-side,
@@ -676,7 +665,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
676665
wg := sync.WaitGroup{}
677666
wg.Add(3)
678667

679-
var processedAtLeastOneBatch bool
680668
iter, projs := GetDeferredProjections(iter)
681669

682670
// Read rows off the row iterator and send them to the row channel.
@@ -706,8 +694,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
706694
}
707695
})
708696

709-
// TODO: remember to deal with last result differently
710-
// Read rows off the row channel and convert to wire format.
697+
// Drain rows from rowChan, convert to wire format, and send to resChan
711698
var resChan = make(chan *sqltypes.Result, 4)
712699
var res *sqltypes.Result
713700
eg.Go(func() (err error) {
@@ -749,26 +736,29 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
749736
res.Rows = append(res.Rows, outRow)
750737
res.RowsAffected++
751738

752-
// timer has fired, so send on the timer channel
753-
if !timer.Stop() {
754-
<-timer.C
755-
}
756-
757739
if res.RowsAffected == rowsBatch {
758740
select {
759-
case resChan <- res:
760-
res = nil
761741
case <-ctx.Done():
762742
return context.Cause(ctx)
743+
case resChan <- res:
744+
res = nil
763745
}
764746
}
765747
}
766748

767-
timer.Reset(waitTime)
749+
// timer has gone off
750+
if !timer.Reset(waitTime) {
751+
if h.readTimeout != 0 {
752+
// Cancel and return so Vitess can call the CloseConnection callback
753+
ctx.GetLogger().Tracef("connection timeout")
754+
return ErrRowTimeout.New()
755+
}
756+
}
768757
}
769758
})
770759

771-
// Read sqltypes.Result from resChan and send to client
760+
// Drain sqltypes.Result from resChan and call callback (send to client and reset buffer)
761+
var processedAtLeastOneBatch bool
772762
eg.Go(func() (err error) {
773763
defer pan2err(&err)
774764
defer cancelF()
@@ -777,7 +767,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
777767
select {
778768
case <-ctx.Done():
779769
return context.Cause(ctx)
780-
781770
case r, ok := <-resChan:
782771
if !ok {
783772
return nil
@@ -841,22 +830,22 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
841830
timer := time.NewTimer(waitTime)
842831
defer timer.Stop()
843832

844-
wg := sync.WaitGroup{}
845-
wg.Add(2)
846-
847833
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
848834
// clean out rows that have already been spooled.
849-
resetCallback := func(r *sqltypes.Result, more bool) error {
850-
// A server-side cursor allows the caller to fetch results cached on the server-side,
851-
// so if a cursor exists, we can't release the buffer memory yet.
852-
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
853-
defer buf.Reset()
835+
// A server-side cursor allows the caller to fetch results cached on the server-side,
836+
// so if a cursor exists, we can't release the buffer memory yet.
837+
resetCallback := callback
838+
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
839+
resetCallback = func(r *sqltypes.Result, more bool) error {
840+
buf.Reset()
841+
return callback(r, more)
854842
}
855-
return callback(r, more)
856843
}
857844

858-
// TODO: send results instead of rows?
859-
// Read rows from iter and send them off
845+
wg := sync.WaitGroup{}
846+
wg.Add(3)
847+
848+
// Drain rows from iter and send to rowsChan
860849
var rowChan = make(chan sql.ValueRow, 512)
861850
eg.Go(func() (err error) {
862851
defer pan2err(&err)
@@ -867,12 +856,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
867856
case <-ctx.Done():
868857
return context.Cause(ctx)
869858
default:
870-
row, err := iter.NextValueRow(ctx)
871-
if err == io.EOF {
859+
row, iErr := iter.NextValueRow(ctx)
860+
if iErr == io.EOF {
872861
return nil
873862
}
874-
if err != nil {
875-
return err
863+
if iErr != nil {
864+
return iErr
876865
}
877866
select {
878867
case rowChan <- row:
@@ -883,53 +872,81 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
883872
}
884873
})
885874

875+
// Drain rows from rowChan, convert to wire format, and send to resChan
876+
var resChan = make(chan *sqltypes.Result, 4)
886877
var res *sqltypes.Result
887-
var processedAtLeastOneBatch bool
888878
eg.Go(func() (err error) {
889879
defer pan2err(&err)
890-
defer cancelF()
880+
defer close(resChan)
891881
defer wg.Done()
882+
892883
for {
893884
if res == nil {
894885
res = &sqltypes.Result{
895886
Fields: resultFields,
896-
Rows: make([][]sqltypes.Value, 0, rowsBatch),
897-
}
898-
}
899-
if res.RowsAffected == rowsBatch {
900-
if err := resetCallback(res, more); err != nil {
901-
return err
887+
Rows: make([][]sqltypes.Value, rowsBatch),
902888
}
903-
res = nil
904-
processedAtLeastOneBatch = true
905-
continue
906889
}
907890

908891
select {
909892
case <-ctx.Done():
910893
return context.Cause(ctx)
911-
case <-timer.C:
894+
895+
case row, ok := <-rowChan:
896+
if !ok {
897+
return nil
898+
}
899+
900+
outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf)
901+
if sqlErr != nil {
902+
return sqlErr
903+
}
904+
905+
ctx.GetLogger().Tracef("spooling result row %s", outRow)
906+
res.Rows[res.RowsAffected] = outRow
907+
res.RowsAffected++
908+
909+
if res.RowsAffected == rowsBatch {
910+
select {
911+
case <-ctx.Done():
912+
return context.Cause(ctx)
913+
case resChan <- res:
914+
res = nil
915+
}
916+
}
917+
}
918+
919+
// timer has gone off
920+
if !timer.Reset(waitTime) {
912921
if h.readTimeout != 0 {
913922
// Cancel and return so Vitess can call the CloseConnection callback
914923
ctx.GetLogger().Tracef("connection timeout")
915924
return ErrRowTimeout.New()
916925
}
917-
case row, ok := <-rowChan:
926+
}
927+
}
928+
})
929+
930+
// Drain sqltypes.Result from resChan and call callback (send to client and reset buffer)
931+
var processedAtLeastOneBatch bool
932+
eg.Go(func() (err error) {
933+
defer pan2err(&err)
934+
defer cancelF()
935+
defer wg.Done()
936+
for {
937+
select {
938+
case <-ctx.Done():
939+
return context.Cause(ctx)
940+
case r, ok := <-resChan:
918941
if !ok {
919942
return nil
920943
}
921-
resRow, err := RowValueToSQLValues(ctx, schema, row, buf)
944+
processedAtLeastOneBatch = true
945+
err = resetCallback(r, more)
922946
if err != nil {
923947
return err
924948
}
925-
ctx.GetLogger().Tracef("spooling result row %s", resRow)
926-
res.Rows = append(res.Rows, resRow)
927-
res.RowsAffected++
928-
if !timer.Stop() {
929-
<-timer.C
930-
}
931949
}
932-
timer.Reset(waitTime)
933950
}
934951
})
935952

@@ -950,6 +967,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
950967
return nil, false, err
951968
}
952969

970+
res.Rows = res.Rows[:res.RowsAffected]
953971
return res, processedAtLeastOneBatch, nil
954972
}
955973

0 commit comments

Comments
 (0)