Skip to content

Commit fb70489

Browse files
author
James Cor
committed
fix timer code
1 parent a48dfeb commit fb70489

File tree

1 file changed

+13
-38
lines changed

1 file changed

+13
-38
lines changed

server/handler.go

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -650,17 +650,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
650650
timer := time.NewTimer(waitTime)
651651
defer timer.Stop()
652652

653-
// Wait for signal on the timer.C channel, and error accordingly
654-
eg.Go(func() (err error) {
655-
<-timer.C
656-
if h.readTimeout != 0 {
657-
// Cancel and return so Vitess can call the CloseConnection callback
658-
ctx.GetLogger().Tracef("connection timeout")
659-
return ErrRowTimeout.New()
660-
}
661-
return nil
662-
})
663-
664653
// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
665654
// clean out rows that have already been spooled.
666655
// A server-side cursor allows the caller to fetch results cached on the server-side,
@@ -677,6 +666,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
677666
wg := sync.WaitGroup{}
678667
wg.Add(3)
679668

669+
iter, projs := GetDeferredProjections(iter)
670+
680671
// Read rows off the row iterator and send them to the row channel.
681672
var rowChan = make(chan sql.Row, 512)
682673
eg.Go(func() (err error) {
@@ -767,7 +758,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
767758
}
768759
})
769760

770-
// Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer)
761+
// Drain sqltypes.Result from resChan and call callback (send to client and reset buffer)
771762
var processedAtLeastOneBatch bool
772763
eg.Go(func() (err error) {
773764
defer pan2err(&err)
@@ -790,29 +781,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
790781
}
791782
})
792783

793-
// Read sqltypes.Result from resChan and send to client
794-
eg.Go(func() (err error) {
795-
defer pan2err(&err)
796-
defer cancelF()
797-
defer wg.Done()
798-
for {
799-
select {
800-
case <-ctx.Done():
801-
return context.Cause(ctx)
802-
803-
case r, ok := <-resChan:
804-
if !ok {
805-
return nil
806-
}
807-
processedAtLeastOneBatch = true
808-
err = callback(r, more)
809-
if err != nil {
810-
return err
811-
}
812-
}
813-
}
814-
})
815-
816784
// Close() kills this PID in the process list,
817785
// wait until all rows have be sent over the wire
818786
eg.Go(func() (err error) {
@@ -955,7 +923,14 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
955923
}
956924
}
957925

958-
timer.Reset(waitTime)
926+
// timer has gone off
927+
if !timer.Reset(waitTime) {
928+
if h.readTimeout != 0 {
929+
// Cancel and return so Vitess can call the CloseConnection callback
930+
ctx.GetLogger().Warn("connection timeout")
931+
return ErrRowTimeout.New()
932+
}
933+
}
959934
}
960935
})
961936

@@ -974,7 +949,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
974949
return nil
975950
}
976951
processedAtLeastOneBatch = true
977-
err = callback(r, more)
952+
err = resetCallback(r, more)
978953
if err != nil {
979954
return err
980955
}
@@ -1002,7 +977,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
1002977
if res != nil {
1003978
res.Rows = res.Rows[:res.RowsAffected]
1004979
}
1005-
return res, processedAtLeastOneBatch, err
980+
return res, processedAtLeastOneBatch, nil
1006981
}
1007982

1008983
// See https://dev.mysql.com/doc/internals/en/status-flags.html

0 commit comments

Comments
 (0)