Skip to content

Commit a33bb2c

Browse files
committed
Add receive timeout
1 parent d5f9868 commit a33bb2c

File tree

8 files changed

+25
-1
lines changed

8 files changed

+25
-1
lines changed

cmd/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ var runCmd = &cobra.Command{
407407
// Fill the missing and zero values with the default ones.
408408
clients[name].TCPKeepAlivePeriod = clients[name].GetTCPKeepAlivePeriod()
409409
clients[name].ReceiveDeadline = clients[name].GetReceiveDeadline()
410+
clients[name].ReceiveTimeout = clients[name].GetReceiveTimeout()
410411
clients[name].SendDeadline = clients[name].GetSendDeadline()
411412
clients[name].ReceiveChunkSize = clients[name].GetReceiveChunkSize()
412413

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (c *Config) LoadDefaults(ctx context.Context) {
113113
TCPKeepAlivePeriod: DefaultTCPKeepAlivePeriod,
114114
ReceiveChunkSize: DefaultChunkSize,
115115
ReceiveDeadline: DefaultReceiveDeadline,
116+
ReceiveTimeout: DefaultReceiveTimeout,
116117
SendDeadline: DefaultSendDeadline,
117118
}
118119

config/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const (
101101
DefaultSendDeadline = 0
102102
DefaultTCPKeepAlivePeriod = 30 * time.Second
103103
DefaultTCPKeepAlive = false
104+
DefaultReceiveTimeout = 0
104105

105106
// Pool constants.
106107
EmptyPoolCapacity = 0

config/getters.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ func (c Client) GetReceiveDeadline() time.Duration {
126126
return c.ReceiveDeadline
127127
}
128128

129+
// GetReceiveTimeout returns the receive timeout from config file or default value.
130+
func (c Client) GetReceiveTimeout() time.Duration {
131+
if c.ReceiveTimeout <= 0 {
132+
return DefaultReceiveTimeout
133+
}
134+
return c.ReceiveTimeout
135+
}
136+
129137
// GetSendDeadline returns the send deadline from config file or default value.
130138
func (c Client) GetSendDeadline() time.Duration {
131139
if c.SendDeadline <= 0 {

config/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type Client struct {
4444
TCPKeepAlivePeriod time.Duration `json:"tcpKeepAlivePeriod" jsonschema:"oneof_type=string;integer"`
4545
ReceiveChunkSize int `json:"receiveChunkSize"`
4646
ReceiveDeadline time.Duration `json:"receiveDeadline" jsonschema:"oneof_type=string;integer"`
47+
ReceiveTimeout time.Duration `json:"receiveTimeout" jsonschema:"oneof_type=string;integer"`
4748
SendDeadline time.Duration `json:"sendDeadline" jsonschema:"oneof_type=string;integer"`
4849
}
4950

gatewayd.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ clients:
3333
tcpKeepAlivePeriod: 30s # duration
3434
receiveChunkSize: 8192
3535
receiveDeadline: 0s # duration, 0ms/0s means no deadline
36+
receiveTimeout: 0s # duration, 0ms/0s means no timeout
3637
sendDeadline: 0s # duration, 0ms/0s means no deadline
3738

3839
pools:

network/client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Client struct {
3434
ReceiveChunkSize int
3535
ReceiveDeadline time.Duration
3636
SendDeadline time.Duration
37+
ReceiveTimeout time.Duration
3738
ID string
3839
Network string // tcp/udp/unix
3940
Address string
@@ -168,12 +169,21 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) {
168169
_, span := otel.Tracer(config.TracerName).Start(c.ctx, "Receive")
169170
defer span.End()
170171

172+
var ctx context.Context
173+
var cancel context.CancelFunc
174+
if c.ReceiveTimeout > 0 {
175+
ctx, cancel = context.WithTimeout(c.ctx, c.ReceiveTimeout)
176+
defer cancel()
177+
} else {
178+
ctx = context.Background()
179+
}
180+
171181
var received int
172182
buffer := bytes.NewBuffer(nil)
173183
// Read the data in chunks.
174184
select { //nolint:gosimple
175185
case <-time.After(time.Millisecond):
176-
for {
186+
for ctx.Err() == nil {
177187
chunk := make([]byte, c.ReceiveChunkSize)
178188
read, err := c.Conn.Read(chunk)
179189
if err != nil {

network/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func CreateNewClient(t *testing.T) *Client {
3030
Address: "localhost:5432",
3131
ReceiveChunkSize: config.DefaultChunkSize,
3232
ReceiveDeadline: config.DefaultReceiveDeadline,
33+
ReceiveTimeout: config.DefaultReceiveTimeout,
3334
SendDeadline: config.DefaultSendDeadline,
3435
TCPKeepAlive: false,
3536
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,

0 commit comments

Comments
 (0)