Skip to content

Commit ffb1442

Browse files
authored
Merge pull request #372 from gatewayd-io/circuit-breaking-all-the-way
Circuit breaking all the way
2 parents d55fd53 + 8a2172c commit ffb1442

19 files changed

+438
-55
lines changed

api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestGetProxies(t *testing.T) {
180180
Network: config.DefaultNetwork,
181181
Address: config.DefaultAddress,
182182
}
183-
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{})
183+
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
184184
newPool := pool.NewPool(context.TODO(), 1)
185185
assert.Nil(t, newPool.Put(client.ID, client))
186186

@@ -225,7 +225,7 @@ func TestGetServers(t *testing.T) {
225225
Network: config.DefaultNetwork,
226226
Address: config.DefaultAddress,
227227
}
228-
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{})
228+
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
229229
newPool := pool.NewPool(context.TODO(), 1)
230230
assert.Nil(t, newPool.Put(client.ID, client))
231231

cmd/run.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ var runCmd = &cobra.Command{
248248
)
249249

250250
// Load plugins and register their hooks.
251-
pluginRegistry.LoadPlugins(runCtx, conf.Plugin.Plugins)
251+
pluginRegistry.LoadPlugins(runCtx, conf.Plugin.Plugins, conf.Plugin.StartTimeout)
252252

253253
// Start the metrics merger if enabled.
254254
var metricsMerger *metrics.Merger
@@ -295,7 +295,7 @@ var runCmd = &cobra.Command{
295295
logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin")
296296
pluginConfig := conf.Plugin.GetPlugins(pluginId.Name)
297297
if pluginConfig != nil {
298-
pluginRegistry.LoadPlugins(runCtx, pluginConfig)
298+
pluginRegistry.LoadPlugins(runCtx, pluginConfig, conf.Plugin.StartTimeout)
299299
}
300300
} else {
301301
logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin")
@@ -415,7 +415,15 @@ var runCmd = &cobra.Command{
415415

416416
// Check if the metrics server is already running before registering the handler.
417417
if _, err = http.Get(address); err != nil { //nolint:gosec
418-
mux.Handle(metricsConfig.Path, gziphandler.GzipHandler(handler))
418+
// The timeout handler limits the nested handlers from running for too long.
419+
mux.Handle(
420+
metricsConfig.Path,
421+
http.TimeoutHandler(
422+
gziphandler.GzipHandler(handler),
423+
metricsConfig.GetTimeout(),
424+
"The request timed out while fetching the metrics",
425+
),
426+
)
419427
} else {
420428
logger.Warn().Msg("Metrics server is already running, consider changing the port")
421429
span.RecordError(err)
@@ -426,9 +434,16 @@ var runCmd = &cobra.Command{
426434
Addr: metricsConfig.Address,
427435
Handler: mux,
428436
ReadHeaderTimeout: metricsConfig.GetReadHeaderTimeout(),
437+
ReadTimeout: metricsConfig.GetTimeout(),
438+
WriteTimeout: metricsConfig.GetTimeout(),
439+
IdleTimeout: metricsConfig.GetTimeout(),
429440
}
430441

431-
logger.Info().Str("address", address).Msg("Metrics are exposed")
442+
logger.Info().Fields(map[string]interface{}{
443+
"address": address,
444+
"timeout": metricsConfig.GetTimeout().String(),
445+
"readHeaderTimeout": metricsConfig.GetReadHeaderTimeout().String(),
446+
}).Msg("Metrics are exposed")
432447

433448
if metricsConfig.CertFile != "" && metricsConfig.KeyFile != "" {
434449
// Set up TLS.
@@ -507,11 +522,21 @@ var runCmd = &cobra.Command{
507522
clients[name].ReceiveTimeout = clients[name].GetReceiveTimeout()
508523
clients[name].SendDeadline = clients[name].GetSendDeadline()
509524
clients[name].ReceiveChunkSize = clients[name].GetReceiveChunkSize()
525+
clients[name].DialTimeout = clients[name].GetDialTimeout()
510526

511527
// Add clients to the pool.
512528
for i := 0; i < cfg.GetSize(); i++ {
513529
clientConfig := clients[name]
514-
client := network.NewClient(runCtx, clientConfig, logger)
530+
client := network.NewClient(
531+
runCtx, clientConfig, logger,
532+
network.NewRetry(
533+
clientConfig.Retries,
534+
clientConfig.GetBackoff(),
535+
clientConfig.BackoffMultiplier,
536+
clientConfig.DisableBackoffCaps,
537+
loggers[name],
538+
),
539+
)
515540

516541
if client != nil {
517542
eventOptions := trace.WithAttributes(
@@ -522,10 +547,15 @@ var runCmd = &cobra.Command{
522547
attribute.String("receiveDeadline", client.ReceiveDeadline.String()),
523548
attribute.String("receiveTimeout", client.ReceiveTimeout.String()),
524549
attribute.String("sendDeadline", client.SendDeadline.String()),
550+
attribute.String("dialTimeout", client.DialTimeout.String()),
525551
attribute.Bool("tcpKeepAlive", client.TCPKeepAlive),
526552
attribute.String("tcpKeepAlivePeriod", client.TCPKeepAlivePeriod.String()),
527553
attribute.String("localAddress", client.LocalAddr()),
528554
attribute.String("remoteAddress", client.RemoteAddr()),
555+
attribute.Int("retries", clientConfig.Retries),
556+
attribute.String("backoff", clientConfig.GetBackoff().String()),
557+
attribute.Float64("backoffMultiplier", clientConfig.BackoffMultiplier),
558+
attribute.Bool("disableBackoffCaps", clientConfig.DisableBackoffCaps),
529559
)
530560
if client.ID != "" {
531561
eventOptions = trace.WithAttributes(
@@ -547,8 +577,15 @@ var runCmd = &cobra.Command{
547577
"receiveDeadline": client.ReceiveDeadline.String(),
548578
"receiveTimeout": client.ReceiveTimeout.String(),
549579
"sendDeadline": client.SendDeadline.String(),
580+
"dialTimeout": client.DialTimeout.String(),
550581
"tcpKeepAlive": client.TCPKeepAlive,
551582
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(),
583+
"localAddress": client.LocalAddr(),
584+
"remoteAddress": client.RemoteAddr(),
585+
"retries": clientConfig.Retries,
586+
"backoff": clientConfig.GetBackoff().String(),
587+
"backoffMultiplier": clientConfig.BackoffMultiplier,
588+
"disableBackoffCaps": clientConfig.DisableBackoffCaps,
552589
}
553590
_, err := pluginRegistry.Run(
554591
pluginTimeoutCtx, clientCfg, v1.HookName_HOOK_NAME_ON_NEW_CLIENT)

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ func (c *Config) LoadDefaults(ctx context.Context) {
116116
ReceiveDeadline: DefaultReceiveDeadline,
117117
ReceiveTimeout: DefaultReceiveTimeout,
118118
SendDeadline: DefaultSendDeadline,
119+
DialTimeout: DefaultDialTimeout,
120+
Retries: DefaultRetries,
121+
Backoff: DefaultBackoff,
122+
BackoffMultiplier: DefaultBackoffMultiplier,
123+
DisableBackoffCaps: DefaultDisableBackoffCaps,
119124
}
120125

121126
defaultPool := Pool{
@@ -210,6 +215,7 @@ func (c *Config) LoadDefaults(ctx context.Context) {
210215
HealthCheckPeriod: DefaultPluginHealthCheckPeriod,
211216
ReloadOnCrash: true,
212217
Timeout: DefaultPluginTimeout,
218+
StartTimeout: DefaultPluginStartTimeout,
213219
}
214220

215221
if c.GlobalKoanf != nil {

config/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ const (
9292
DefaultMetricsMergerPeriod = 5 * time.Second
9393
DefaultPluginHealthCheckPeriod = 5 * time.Second
9494
DefaultPluginTimeout = 30 * time.Second
95+
DefaultPluginStartTimeout = 1 * time.Minute
9596

9697
// Client constants.
9798
DefaultNetwork = "tcp"
@@ -102,6 +103,11 @@ const (
102103
DefaultTCPKeepAlivePeriod = 30 * time.Second
103104
DefaultTCPKeepAlive = false
104105
DefaultReceiveTimeout = 0
106+
DefaultDialTimeout = 60 * time.Second
107+
DefaultRetries = 3
108+
DefaultBackoff = 1 * time.Second
109+
DefaultBackoffMultiplier = 2.0
110+
DefaultDisableBackoffCaps = false
105111

106112
// Pool constants.
107113
EmptyPoolCapacity = 0

config/getters.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,31 +106,31 @@ func (p PluginConfig) GetTerminationPolicy() TerminationPolicy {
106106

107107
// GetTCPKeepAlivePeriod returns the TCP keep alive period from config file or default value.
108108
func (c Client) GetTCPKeepAlivePeriod() time.Duration {
109-
if c.TCPKeepAlivePeriod <= 0 {
109+
if c.TCPKeepAlivePeriod < 0 {
110110
return DefaultTCPKeepAlivePeriod
111111
}
112112
return c.TCPKeepAlivePeriod
113113
}
114114

115115
// GetReceiveDeadline returns the receive deadline from config file or default value.
116116
func (c Client) GetReceiveDeadline() time.Duration {
117-
if c.ReceiveDeadline <= 0 {
117+
if c.ReceiveDeadline < 0 {
118118
return DefaultReceiveDeadline
119119
}
120120
return c.ReceiveDeadline
121121
}
122122

123123
// GetReceiveTimeout returns the receive timeout from config file or default value.
124124
func (c Client) GetReceiveTimeout() time.Duration {
125-
if c.ReceiveTimeout <= 0 {
125+
if c.ReceiveTimeout < 0 {
126126
return DefaultReceiveTimeout
127127
}
128128
return c.ReceiveTimeout
129129
}
130130

131131
// GetSendDeadline returns the send deadline from config file or default value.
132132
func (c Client) GetSendDeadline() time.Duration {
133-
if c.SendDeadline <= 0 {
133+
if c.SendDeadline < 0 {
134134
return DefaultSendDeadline
135135
}
136136
return c.SendDeadline
@@ -144,6 +144,22 @@ func (c Client) GetReceiveChunkSize() int {
144144
return c.ReceiveChunkSize
145145
}
146146

147+
// GetDialTimeout returns the dial timeout from config file or default value.
148+
func (c Client) GetDialTimeout() time.Duration {
149+
if c.DialTimeout < 0 {
150+
return DefaultDialTimeout
151+
}
152+
return c.DialTimeout
153+
}
154+
155+
// GetBackoff returns the backoff from config file or default value.
156+
func (c Client) GetBackoff() time.Duration {
157+
if c.Backoff < 0 {
158+
return DefaultBackoff
159+
}
160+
return c.Backoff
161+
}
162+
147163
// GetHealthCheckPeriod returns the health check period from config file or default value.
148164
func (pr Proxy) GetHealthCheckPeriod() time.Duration {
149165
if pr.HealthCheckPeriod <= 0 {
@@ -154,7 +170,7 @@ func (pr Proxy) GetHealthCheckPeriod() time.Duration {
154170

155171
// GetTickInterval returns the tick interval from config file or default value.
156172
func (s Server) GetTickInterval() time.Duration {
157-
if s.TickInterval <= 0 {
173+
if s.TickInterval < 0 {
158174
return DefaultTickInterval
159175
}
160176
return s.TickInterval
@@ -247,20 +263,23 @@ func GetDefaultConfigFilePath(filename string) string {
247263
return filepath.Join("./", filename)
248264
}
249265

266+
// GetReadHeaderTimeout returns the read header timeout from config file or default value.
250267
func (m Metrics) GetReadHeaderTimeout() time.Duration {
251-
if m.ReadHeaderTimeout <= 0 {
268+
if m.ReadHeaderTimeout < 0 {
252269
return DefaultReadHeaderTimeout
253270
}
254271
return m.ReadHeaderTimeout
255272
}
256273

274+
// GetTimeout returns the metrics server timeout from config file or default value.
257275
func (m Metrics) GetTimeout() time.Duration {
258-
if m.Timeout <= 0 {
276+
if m.Timeout < 0 {
259277
return DefaultMetricsServerTimeout
260278
}
261279
return m.Timeout
262280
}
263281

282+
// Filter returns a filtered global config based on the group name.
264283
func (gc GlobalConfig) Filter(groupName string) *GlobalConfig {
265284
if _, ok := gc.Servers[groupName]; !ok {
266285
return nil

config/getters_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestGetTerminationPolicy(t *testing.T) {
3636
// TestGetTCPKeepAlivePeriod tests the GetTCPKeepAlivePeriod function.
3737
func TestGetTCPKeepAlivePeriod(t *testing.T) {
3838
client := Client{}
39-
assert.Equal(t, DefaultTCPKeepAlivePeriod, client.GetTCPKeepAlivePeriod())
39+
assert.Equal(t, client.GetTCPKeepAlivePeriod(), time.Duration(0))
4040
}
4141

4242
// TestGetReceiveDeadline tests the GetReceiveDeadline function.
@@ -72,7 +72,7 @@ func TestGetHealthCheckPeriod(t *testing.T) {
7272
// TestGetTickInterval tests the GetTickInterval function.
7373
func TestGetTickInterval(t *testing.T) {
7474
server := Server{}
75-
assert.Equal(t, DefaultTickInterval, server.GetTickInterval())
75+
assert.Equal(t, server.GetTickInterval(), time.Duration(0))
7676
}
7777

7878
// TestGetSize tests the GetSize function.
@@ -120,13 +120,13 @@ func TestGetDefaultConfigFilePath(t *testing.T) {
120120
// TestGetReadTimeout tests the GetReadTimeout function.
121121
func TestGetReadHeaderTimeout(t *testing.T) {
122122
metrics := Metrics{}
123-
assert.Equal(t, DefaultReadHeaderTimeout, metrics.GetReadHeaderTimeout())
123+
assert.Equal(t, metrics.GetReadHeaderTimeout(), time.Duration(0))
124124
}
125125

126126
// TestGetTimeout tests the GetTimeout function of the metrics server.
127127
func TestGetTimeout(t *testing.T) {
128128
metrics := Metrics{}
129-
assert.Equal(t, DefaultMetricsServerTimeout, metrics.GetTimeout())
129+
assert.Equal(t, metrics.GetTimeout(), time.Duration(0))
130130
}
131131

132132
// TestFilter tests the Filter function.

config/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type PluginConfig struct {
2424
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
2525
ReloadOnCrash bool `json:"reloadOnCrash"`
2626
Timeout time.Duration `json:"timeout" jsonschema:"oneof_type=string;integer"`
27+
StartTimeout time.Duration `json:"startTimeout" jsonschema:"oneof_type=string;integer"`
2728
Plugins []Plugin `json:"plugins"`
2829
}
2930

@@ -36,6 +37,11 @@ type Client struct {
3637
ReceiveDeadline time.Duration `json:"receiveDeadline" jsonschema:"oneof_type=string;integer"`
3738
ReceiveTimeout time.Duration `json:"receiveTimeout" jsonschema:"oneof_type=string;integer"`
3839
SendDeadline time.Duration `json:"sendDeadline" jsonschema:"oneof_type=string;integer"`
40+
DialTimeout time.Duration `json:"dialTimeout" jsonschema:"oneof_type=string;integer"`
41+
Retries int `json:"retries"`
42+
Backoff time.Duration `json:"backoff" jsonschema:"oneof_type=string;integer"`
43+
BackoffMultiplier float64 `json:"backoffMultiplier"`
44+
DisableBackoffCaps bool `json:"disableBackoffCaps"`
3945
}
4046

4147
type Logger struct {

gatewayd.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ loggers:
77
noColor: False
88
timeFormat: "unix" # unixms, unixmicro and unixnano
99
consoleTimeFormat: "RFC3339" # Go time format string
10-
# If output is file, the following fields are used.
10+
# If the output contains "file", the following fields are used:
1111
fileName: "gatewayd.log"
1212
maxSize: 500 # MB
13+
# If maxBackups and maxAge are both 0, no old log files will be deleted.
1314
maxBackups: 5
1415
maxAge: 30 # days
1516
compress: True
@@ -39,6 +40,12 @@ clients:
3940
receiveDeadline: 0s # duration, 0ms/0s means no deadline
4041
receiveTimeout: 0s # duration, 0ms/0s means no timeout
4142
sendDeadline: 0s # duration, 0ms/0s means no deadline
43+
dialTimeout: 60s # duration
44+
# Retry configuration
45+
retries: 3 # 0 means no retry
46+
backoff: 1s # duration
47+
backoffMultiplier: 2.0 # 0 means no backoff
48+
disableBackoffCaps: false
4249

4350
pools:
4451
default:

gatewayd_plugins.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ reloadOnCrash: True
5959
# The timeout controls how long to wait for a plugin to respond to a request before timing out.
6060
timeout: 30s
6161

62+
# The start timeout controls how long to wait for a plugin to start before timing out.
63+
startTimeout: 1m
64+
6265
# The plugin configuration is a list of plugins to load. Each plugin is defined by a name,
6366
# a path to the plugin's executable, and a list of arguments to pass to the plugin. The
6467
# plugin's executable is expected to be a Go plugin that implements the GatewayD plugin

metrics/merger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func (m *Merger) Start() {
259259
m.scheduler.StartAsync()
260260
m.Logger.Info().Fields(
261261
map[string]interface{}{
262-
"startDelay": startDelay,
262+
"startDelay": startDelay.Format(time.RFC3339),
263263
"metricsMergerPeriod": m.MetricsMergerPeriod.String(),
264264
},
265265
).Msg("Started the metrics merger scheduler")

0 commit comments

Comments
 (0)