Skip to content

Commit 2ef135f

Browse files
authored
Merge pull request #344 from gatewayd-io/refactor-server-and-proxy
Refactor server and proxy and remove `gnet/v2`
2 parents 38bbb86 + fc41aed commit 2ef135f

29 files changed

+1190
-637
lines changed

.github/workflows/test.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ jobs:
6969
name: Test GatewayD Plugins
7070
runs-on: ubuntu-latest
7171
needs: test
72+
timeout-minutes: 3
7273
services:
7374
postgres:
7475
image: postgres

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dist/
2222

2323
# Editor files
2424
.vscode
25+
.idea
2526

2627
# Logs
2728
*.log

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ clean:
110110
test:
111111
@go test -v ./...
112112

113+
test-race:
114+
@go test -race -v ./...
115+
113116
benchmark:
114117
@go test -bench=. -benchmem -run=^# ./...
115118

cmd/run.go

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
3131
"github.com/getsentry/sentry-go"
3232
"github.com/go-co-op/gocron"
33-
"github.com/panjf2000/gnet/v2"
3433
"github.com/prometheus/client_golang/prometheus"
3534
"github.com/prometheus/client_golang/prometheus/promhttp"
3635
"github.com/rs/zerolog"
@@ -71,7 +70,6 @@ var (
7170

7271
func StopGracefully(
7372
runCtx context.Context,
74-
pluginTimeoutCtx context.Context,
7573
sig os.Signal,
7674
metricsMerger *metrics.Merger,
7775
metricsServer *http.Server,
@@ -88,6 +86,10 @@ func StopGracefully(
8886

8987
logger.Info().Msg("Notifying the plugins that the server is shutting down")
9088
if pluginRegistry != nil {
89+
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), conf.Plugin.Timeout)
90+
defer cancel()
91+
92+
//nolint:contextcheck
9193
_, err := pluginRegistry.Run(
9294
pluginTimeoutCtx,
9395
map[string]interface{}{"signal": signal},
@@ -99,11 +101,12 @@ func StopGracefully(
99101
}
100102
}
101103

102-
logger.Info().Msg("Stopping GatewayD")
103-
span.AddEvent("Stopping GatewayD", trace.WithAttributes(
104+
logger.Info().Msg("GatewayD is shutting down")
105+
span.AddEvent("GatewayD is shutting down", trace.WithAttributes(
104106
attribute.String("signal", signal),
105107
))
106108
if healthCheckScheduler != nil {
109+
healthCheckScheduler.Stop()
107110
healthCheckScheduler.Clear()
108111
logger.Info().Msg("Stopped health check scheduler")
109112
span.AddEvent("Stopped health check scheduler")
@@ -269,7 +272,7 @@ var runCmd = &cobra.Command{
269272
startDelay := time.Now().Add(conf.Plugin.HealthCheckPeriod)
270273
if _, err := healthCheckScheduler.Every(
271274
conf.Plugin.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(func() {
272-
_, span = otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
275+
_, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
273276
defer span.End()
274277

275278
var plugins []string
@@ -461,6 +464,9 @@ var runCmd = &cobra.Command{
461464
}(conf.Global.Metrics[config.Default], logger)
462465

463466
// This is a notification hook, so we don't care about the result.
467+
pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), conf.Plugin.Timeout)
468+
defer cancel()
469+
464470
if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]interface{}); ok {
465471
_, err = pluginRegistry.Run(
466472
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_LOGGER)
@@ -527,6 +533,10 @@ var runCmd = &cobra.Command{
527533

528534
span.AddEvent("Create client", eventOptions)
529535

536+
pluginTimeoutCtx, cancel = context.WithTimeout(
537+
context.Background(), conf.Plugin.Timeout)
538+
defer cancel()
539+
530540
clientCfg := map[string]interface{}{
531541
"id": client.ID,
532542
"network": client.Network,
@@ -571,6 +581,10 @@ var runCmd = &cobra.Command{
571581
os.Exit(gerr.FailedToInitializePool)
572582
}
573583

584+
pluginTimeoutCtx, cancel = context.WithTimeout(
585+
context.Background(), conf.Plugin.Timeout)
586+
defer cancel()
587+
574588
_, err = pluginRegistry.Run(
575589
pluginTimeoutCtx,
576590
map[string]interface{}{"name": name, "size": cfg.GetSize()},
@@ -610,6 +624,10 @@ var runCmd = &cobra.Command{
610624
attribute.String("healthCheckPeriod", cfg.HealthCheckPeriod.String()),
611625
))
612626

627+
pluginTimeoutCtx, cancel = context.WithTimeout(
628+
context.Background(), conf.Plugin.Timeout)
629+
defer cancel()
630+
613631
if data, ok := conf.GlobalKoanf.Get("proxies").(map[string]interface{}); ok {
614632
_, err = pluginRegistry.Run(
615633
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_PROXY)
@@ -633,30 +651,9 @@ var runCmd = &cobra.Command{
633651
cfg.Network,
634652
cfg.Address,
635653
cfg.GetTickInterval(),
636-
[]gnet.Option{
637-
// Scheduling options
638-
gnet.WithMulticore(cfg.MultiCore),
639-
gnet.WithLockOSThread(cfg.LockOSThread),
640-
// NumEventLoop overrides Multicore option.
641-
// gnet.WithNumEventLoop(1),
642-
654+
network.Option{
643655
// Can be used to send keepalive messages to the client.
644-
gnet.WithTicker(cfg.EnableTicker),
645-
646-
// Internal event-loop load balancing options
647-
gnet.WithLoadBalancing(cfg.GetLoadBalancer()),
648-
649-
// Buffer options
650-
gnet.WithReadBufferCap(cfg.ReadBufferCap),
651-
gnet.WithWriteBufferCap(cfg.WriteBufferCap),
652-
gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer),
653-
gnet.WithSocketSendBuffer(cfg.SocketSendBuffer),
654-
655-
// TCP options
656-
gnet.WithReuseAddr(cfg.ReuseAddress),
657-
gnet.WithReusePort(cfg.ReusePort),
658-
gnet.WithTCPKeepAlive(cfg.TCPKeepAlive),
659-
gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()),
656+
EnableTicker: cfg.EnableTicker,
660657
},
661658
proxies[name],
662659
logger,
@@ -669,21 +666,13 @@ var runCmd = &cobra.Command{
669666
attribute.String("network", cfg.Network),
670667
attribute.String("address", cfg.Address),
671668
attribute.String("tickInterval", cfg.TickInterval.String()),
672-
attribute.Bool("multiCore", cfg.MultiCore),
673-
attribute.Bool("lockOSThread", cfg.LockOSThread),
674-
attribute.Bool("enableTicker", cfg.EnableTicker),
675-
attribute.String("loadBalancer", cfg.LoadBalancer),
676-
attribute.Int("readBufferCap", cfg.ReadBufferCap),
677-
attribute.Int("writeBufferCap", cfg.WriteBufferCap),
678-
attribute.Int("socketRecvBuffer", cfg.SocketRecvBuffer),
679-
attribute.Int("socketSendBuffer", cfg.SocketSendBuffer),
680-
attribute.Bool("reuseAddress", cfg.ReuseAddress),
681-
attribute.Bool("reusePort", cfg.ReusePort),
682-
attribute.String("tcpKeepAlive", cfg.TCPKeepAlive.String()),
683-
attribute.Bool("tcpNoDelay", cfg.TCPNoDelay),
684669
attribute.String("pluginTimeout", conf.Plugin.Timeout.String()),
685670
))
686671

672+
pluginTimeoutCtx, cancel = context.WithTimeout(
673+
context.Background(), conf.Plugin.Timeout)
674+
defer cancel()
675+
687676
if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok {
688677
_, err = pluginRegistry.Run(
689678
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_SERVER)
@@ -786,13 +775,15 @@ var runCmd = &cobra.Command{
786775
go func(pluginRegistry *plugin.Registry,
787776
logger zerolog.Logger,
788777
servers map[string]*network.Server,
778+
metricsMerger *metrics.Merger,
779+
metricsServer *http.Server,
780+
stopChan chan struct{},
789781
) {
790782
for sig := range signalsCh {
791783
for _, s := range signals {
792784
if sig != s {
793785
StopGracefully(
794786
runCtx,
795-
pluginTimeoutCtx,
796787
sig,
797788
metricsMerger,
798789
metricsServer,
@@ -805,13 +796,14 @@ var runCmd = &cobra.Command{
805796
}
806797
}
807798
}
808-
}(pluginRegistry, logger, servers)
799+
}(pluginRegistry, logger, servers, metricsMerger, metricsServer, stopChan)
809800

810801
_, span = otel.Tracer(config.TracerName).Start(runCtx, "Start servers")
811802
// Start the server.
812803
for name, server := range servers {
813804
logger := loggers[name]
814805
go func(
806+
span trace.Span,
815807
server *network.Server,
816808
logger zerolog.Logger,
817809
healthCheckScheduler *gocron.Scheduler,
@@ -831,7 +823,7 @@ var runCmd = &cobra.Command{
831823
pluginRegistry.Shutdown()
832824
os.Exit(gerr.FailedToStartServer)
833825
}
834-
}(server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
826+
}(span, server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
835827
}
836828
span.End()
837829

cmd/run_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cmd
22

33
import (
4+
"context"
45
"os"
56
"sync"
67
"testing"
@@ -29,8 +30,7 @@ func Test_runCmd(t *testing.T) {
2930
time.Sleep(100 * time.Millisecond)
3031

3132
StopGracefully(
32-
runCmd.Context(),
33-
runCmd.Context(),
33+
context.Background(),
3434
nil,
3535
nil,
3636
nil,
@@ -88,8 +88,7 @@ func Test_runCmdWithMultiTenancy(t *testing.T) {
8888
time.Sleep(500 * time.Millisecond)
8989

9090
StopGracefully(
91-
runCmd.Context(),
92-
runCmd.Context(),
91+
context.Background(),
9392
nil,
9493
nil,
9594
nil,
@@ -165,11 +164,10 @@ func Test_runCmdWithCachePlugin(t *testing.T) {
165164
var waitGroup sync.WaitGroup
166165
waitGroup.Add(1)
167166
go func(waitGroup *sync.WaitGroup) {
168-
time.Sleep(500 * time.Millisecond)
167+
time.Sleep(time.Second)
169168

170169
StopGracefully(
171-
runCmd.Context(),
172-
runCmd.Context(),
170+
context.Background(),
173171
nil,
174172
nil,
175173
nil,

config/config.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,21 +129,10 @@ func (c *Config) LoadDefaults(ctx context.Context) {
129129
}
130130

131131
defaultServer := Server{
132-
Network: DefaultListenNetwork,
133-
Address: DefaultListenAddress,
134-
EnableTicker: false,
135-
TickInterval: DefaultTickInterval,
136-
MultiCore: true,
137-
LockOSThread: false,
138-
ReuseAddress: true,
139-
ReusePort: true,
140-
LoadBalancer: DefaultLoadBalancer,
141-
ReadBufferCap: DefaultBufferSize,
142-
WriteBufferCap: DefaultBufferSize,
143-
SocketRecvBuffer: DefaultBufferSize,
144-
SocketSendBuffer: DefaultBufferSize,
145-
TCPKeepAlive: DefaultTCPKeepAliveDuration,
146-
TCPNoDelay: DefaultTCPNoDelay,
132+
Network: DefaultListenNetwork,
133+
Address: DefaultListenAddress,
134+
EnableTicker: false,
135+
TickInterval: DefaultTickInterval,
147136
}
148137

149138
c.globalDefaults = GlobalConfig{

config/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ const (
117117
DefaultTCPKeepAliveDuration = 3 * time.Second
118118
DefaultLoadBalancer = "roundrobin"
119119
DefaultTCPNoDelay = true
120+
DefaultEngineStopTimeout = 5 * time.Second
120121

121122
// Utility constants.
122123
DefaultSeed = 1000

config/getters.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"path/filepath"
66
"time"
77

8-
"github.com/panjf2000/gnet/v2"
98
"github.com/rs/zerolog"
109
)
1110

@@ -28,11 +27,6 @@ var (
2827
"continue": Continue,
2928
"stop": Stop,
3029
}
31-
loadBalancers = map[string]gnet.LoadBalancing{
32-
"roundrobin": gnet.RoundRobin,
33-
"leastconnections": gnet.LeastConnections,
34-
"sourceaddrhash": gnet.SourceAddrHash,
35-
}
3630
logOutputs = map[string]LogOutput{
3731
"console": Console,
3832
"stdout": Stdout,
@@ -166,23 +160,6 @@ func (s Server) GetTickInterval() time.Duration {
166160
return s.TickInterval
167161
}
168162

169-
// GetLoadBalancer returns the load balancing algorithm to use.
170-
func (s Server) GetLoadBalancer() gnet.LoadBalancing {
171-
if lb, ok := loadBalancers[s.LoadBalancer]; ok {
172-
return lb
173-
}
174-
return gnet.RoundRobin
175-
}
176-
177-
// GetTCPNoDelay returns the TCP no delay option from config file.
178-
func (s Server) GetTCPNoDelay() gnet.TCPSocketOpt {
179-
if s.TCPNoDelay {
180-
return gnet.TCPNoDelay
181-
}
182-
183-
return gnet.TCPDelay
184-
}
185-
186163
// GetSize returns the pool size from config file.
187164
func (p Pool) GetSize() int {
188165
if p.Size == 0 {

config/getters_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"testing"
55
"time"
66

7-
"github.com/panjf2000/gnet/v2"
87
"github.com/rs/zerolog"
98
"github.com/stretchr/testify/assert"
109
)
@@ -75,18 +74,6 @@ func TestGetTickInterval(t *testing.T) {
7574
assert.Equal(t, DefaultTickInterval, server.GetTickInterval())
7675
}
7776

78-
// TestGetLoadBalancer tests the GetLoadBalancer function.
79-
func TestGetLoadBalancer(t *testing.T) {
80-
server := Server{}
81-
assert.Equal(t, gnet.RoundRobin, server.GetLoadBalancer())
82-
}
83-
84-
// TestGetTCPNoDelay tests the GetTCPNoDelay function.
85-
func TestGetTCPNoDelay(t *testing.T) {
86-
server := Server{}
87-
assert.Equal(t, gnet.TCPDelay, server.GetTCPNoDelay())
88-
}
89-
9077
// TestGetSize tests the GetSize function.
9178
func TestGetSize(t *testing.T) {
9279
pool := Pool{}

0 commit comments

Comments
 (0)