Skip to content

Commit 7ffb599

Browse files
authored
[ISSUE #1112] feat: optimize producer send async (#1111)
* feat: optimize producer send async * fix: fix mq override bug
1 parent 7ae83c4 commit 7ffb599

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

internal/remote/remote_client.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R
112112

113113
// InvokeAsync send request without blocking, just return immediately.
114114
func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
115-
conn, err := c.connect(ctx, addr)
116-
if err != nil {
117-
return err
118-
}
119-
120115
resp := NewResponseFuture(ctx, request.Opaque, callback)
121116
c.responseTable.Store(resp.Opaque, resp)
122117

123-
err = c.sendRequest(ctx, conn, request)
124-
if err != nil {
125-
c.responseTable.Delete(request.Opaque)
126-
return err
127-
}
128-
129118
go primitive.WithRecover(func() {
119+
defer resp.executeInvokeCallback()
120+
defer c.responseTable.Delete(request.Opaque)
121+
122+
conn, err := c.connect(ctx, addr)
123+
if err != nil {
124+
resp.Err = err
125+
return
126+
}
127+
err = c.sendRequest(ctx, conn, request)
128+
if err != nil {
129+
resp.Err = err
130+
return
131+
}
130132
c.receiveAsync(resp)
131-
c.responseTable.Delete(request.Opaque)
132133
})
133134

134135
return nil

producer/producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
329329
if mq != nil {
330330
lastBrokerName = mq.BrokerName
331331
}
332-
mq := p.selectMessageQueue(msg, lastBrokerName)
332+
mq = p.selectMessageQueue(msg, lastBrokerName)
333333
if mq == nil {
334334
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
335335
continue

0 commit comments

Comments
 (0)