@@ -650,6 +650,17 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
650650 timer := time .NewTimer (waitTime )
651651 defer timer .Stop ()
652652
653+ // Wait for signal on the timer.C channel, and error accordingly
654+ eg .Go (func () (err error ) {
655+ <- timer .C
656+ if h .readTimeout != 0 {
657+ // Cancel and return so Vitess can call the CloseConnection callback
658+ ctx .GetLogger ().Tracef ("connection timeout" )
659+ return ErrRowTimeout .New ()
660+ }
661+ return nil
662+ })
663+
653664 // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
654665 // clean out rows that have already been spooled.
655666 // A server-side cursor allows the caller to fetch results cached on the server-side,
@@ -779,6 +790,29 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
779790 }
780791 })
781792
793+ // Read sqltypes.Result from resChan and send to client
794+ eg .Go (func () (err error ) {
795+ defer pan2err (& err )
796+ defer cancelF ()
797+ defer wg .Done ()
798+ for {
799+ select {
800+ case <- ctx .Done ():
801+ return context .Cause (ctx )
802+
803+ case r , ok := <- resChan :
804+ if ! ok {
805+ return nil
806+ }
807+ processedAtLeastOneBatch = true
808+ err = callback (r , more )
809+ if err != nil {
810+ return err
811+ }
812+ }
813+ }
814+ })
815+
782816 // Close() kills this PID in the process list,
783817 // wait until all rows have be sent over the wire
784818 eg .Go (func () (err error ) {
@@ -965,7 +999,9 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
965999 return nil , false , err
9661000 }
9671001
968- res .Rows = res .Rows [:res .RowsAffected ]
1002+ if res != nil {
1003+ res .Rows = res .Rows [:res .RowsAffected ]
1004+ }
9691005 return res , processedAtLeastOneBatch , err
9701006}
9711007
0 commit comments