Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

* New types for MessagePack extensions compatible with go-option (#459).
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
* Method `Release` for `Future` and `Response` interface:
`Get` and `GetTyped` are calling `Release` by defer.
`GetResult` and `GetTypedResult` just return result without `Release` (#493).

### Changed

Expand Down
39 changes: 27 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// More on graceful shutdown:
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr net.Addr
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
addr net.Addr
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
slicePool *sync.Pool
// schemaResolver contains a SchemaResolver implementation.
schemaResolver SchemaResolver
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -373,7 +374,12 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
}

conn.cond = sync.NewCond(&conn.mutex)

conn.slicePool = &sync.Pool{
New: func() any {
buf := make([]byte, 0, 4096)
return &buf
},
}
if conn.opts.Reconnect > 0 {
// We don't need these mutex.Lock()/mutex.Unlock() here, but
// runReconnects() expects mutex.Lock() to be set, so it's
Expand Down Expand Up @@ -848,8 +854,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {

go conn.eventer(events)

buf := smallBuf{}
for atomic.LoadUint32(&conn.state) != connClosed {
respBytes, err := read(r, conn.lenbuf[:])
respBytes, err := read(r, conn.lenbuf[:], conn)
if err != nil {
err = ClientError{
ErrIoError,
Expand All @@ -858,7 +865,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.reconnect(err, c)
return
}
buf := smallBuf{b: respBytes}
buf.b, buf.p = respBytes, 0
header, code, err := decodeHeader(conn.dec, &buf)
if err != nil {
err = ClientError{
Expand Down Expand Up @@ -925,7 +932,7 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {

func (conn *Connection) newFuture(req Request) (fut *Future) {
ctx := req.Ctx()
fut = NewFuture(req)
fut = NewFuture(req, conn)
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
case conn.rlimit <- struct{}{}:
Expand Down Expand Up @@ -1187,7 +1194,7 @@ func (conn *Connection) timeouts() {
}
}

func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
func read(r io.Reader, lenbuf []byte, conn ...*Connection) (response []byte, err error) {
var length uint64

if _, err = io.ReadFull(r, lenbuf); err != nil {
Expand All @@ -1211,7 +1218,15 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
return
}

response = make([]byte, length)
if len(conn) == 0 {
response = make([]byte, length)
} else {
response = *conn[0].slicePool.Get().(*[]byte)
if cap(response) < int(length) {
response = make([]byte, length)
}
response = response[:length]
}
_, err = io.ReadFull(r, response)

return
Expand All @@ -1232,7 +1247,7 @@ func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
func (conn *Connection) Do(req Request) *Future {
if connectedReq, ok := req.(ConnectedRequest); ok {
if connectedReq.Conn() != conn {
fut := NewFuture(req)
fut := NewFuture(req, conn)
fut.SetError(errUnknownRequest)
return fut
}
Expand Down
3 changes: 2 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,7 @@ func ExampleConnection_Do_failure() {

// We got a future, the request actually not performed yet.
future := conn.Do(req)
defer future.Release()

// When the future receives the response, the result of the Future is set
// and becomes available. We could wait for that moment with Future.Get(),
Expand All @@ -1305,7 +1306,7 @@ func ExampleConnection_Do_failure() {
fmt.Printf("Response error: %s\n", resp.Header().Error)
}

data, err := future.Get()
data, err := future.GetResult()
if err != nil {
fmt.Printf("Data: %v\n", data)
}
Expand Down
48 changes: 47 additions & 1 deletion future.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Future struct {
req Request
next *Future
timeout time.Duration
pool *sync.Pool
mutex sync.Mutex
resp Response
err error
Expand All @@ -38,10 +39,15 @@ func (fut *Future) isDone() bool {
}

// NewFuture creates a new empty Future for a given Request.
func NewFuture(req Request) (fut *Future) {
func NewFuture(req Request, conn ...*Connection) (fut *Future) {
fut = &Future{}
fut.done = make(chan struct{})
fut.req = req
if len(conn) == 0 {
fut.pool = nil
} else {
fut.pool = conn[0].slicePool
}
return fut
}

Expand Down Expand Up @@ -89,12 +95,28 @@ func (fut *Future) GetResponse() (Response, error) {
}

// Get waits for Future to be filled and returns the data of the Response and error.
// Also Release Future's data. After this, Future becomes invalid.
//
// The data will be []interface{}, so if you want more performance, use GetTyped method.
//
// "error" could be Error, if it is error returned by Tarantool,
// or ClientError, if something bad happens in a client process.
func (fut *Future) Get() ([]interface{}, error) {
defer fut.Release()
fut.wait()
if fut.err != nil {
return nil, fut.err
}
return fut.resp.Decode()
}

// Get waits for Future to be filled and returns the data of the Response and error.
//
// The data will be []interface{}, so if you want more performance, use GetTyped method.
//
// "error" could be Error, if it is error returned by Tarantool,
// or ClientError, if something bad happens in a client process.
func (fut *Future) GetResult() ([]interface{}, error) {
fut.wait()
if fut.err != nil {
return nil, fut.err
Expand All @@ -105,8 +127,23 @@ func (fut *Future) Get() ([]interface{}, error) {
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
// It is could be much faster than Get() function.
//
// Also Release Future's data. After this, Future becomes invalid.
//
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
func (fut *Future) GetTyped(result interface{}) error {
defer fut.Release()
fut.wait()
if fut.err != nil {
return fut.err
}
return fut.resp.DecodeTyped(result)
}

// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
// It is could be much faster than Get() function.
//
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
func (fut *Future) GetTypedResult(result interface{}) error {
fut.wait()
if fut.err != nil {
return fut.err
Expand All @@ -127,3 +164,12 @@ func (fut *Future) WaitChan() <-chan struct{} {
}
return fut.done
}

// Release is freeing the Future resources.
// After this, using this Future becomes invalid.
func (fut *Future) Release() {
if fut.pool != nil && fut.resp != nil {
ptr := fut.resp.Release()
fut.pool.Put(ptr)
}
}
5 changes: 5 additions & 0 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (resp *futureMockResponse) Header() Header {
return resp.header
}

func (resp *futureMockResponse) Release() *[]byte {
// Releasing futureMockResponse data.
return &resp.data
}

func (resp *futureMockResponse) Decode() ([]interface{}, error) {
resp.decodeCnt++

Expand Down
4 changes: 2 additions & 2 deletions pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,12 +1744,12 @@ func TestDoWithExecuteRequest(t *testing.T) {
mem := []Member{}

fut := connPool.Do(tarantool.NewExecuteRequest(request).Args([]interface{}{}), pool.ANY)
data, err := fut.Get()
data, err := fut.GetResult()
require.Nilf(t, err, "failed to Do with ExecuteRequest")
require.NotNilf(t, data, "response is nil after Execute")
require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Do with ExecuteRequest")
require.Equalf(t, len(data[0].([]interface{})), 2, "unexpected response")
err = fut.GetTyped(&mem)
err = fut.GetTypedResult(&mem)
require.Nilf(t, err, "Unable to GetTyped of fut")
require.Equalf(t, len(mem), 1, "wrong count of result")
}
Expand Down
4 changes: 2 additions & 2 deletions prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
if err != nil {
return nil, err
}
return &PrepareResponse{baseResponse: baseResp}, nil
return &PrepareResponse{baseResponse: *baseResp}, nil
}

// UnprepareRequest helps you to create an unprepare request object for
Expand Down Expand Up @@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
if err != nil {
return nil, err
}
return &ExecuteResponse{baseResponse: baseResp}, nil
return &ExecuteResponse{baseResponse: *baseResp}, nil
}
6 changes: 3 additions & 3 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {

// Response creates a response for the SelectRequest.
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
baseResp, err := createBaseResponse(header, body)
SelectResp, err := createSelectResponse(header, body)
if err != nil {
return nil, err
}
return &SelectResponse{baseResponse: baseResp}, nil
return SelectResp, nil
}

// InsertRequest helps you to create an insert request object for execution
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
if err != nil {
return nil, err
}
return &ExecuteResponse{baseResponse: baseResp}, nil
return &ExecuteResponse{baseResponse: *baseResp}, nil
}

// WatchOnceRequest synchronously fetches the value currently associated with a
Expand Down
Loading
Loading