Skip to content

Commit 69298a8

Browse files
authored
Use the result of notification hooks (#616)
* Check for nil or empty result * Use the result of notification hooks by running actions * Fix linter errors
1 parent 8effd0f commit 69298a8

File tree

4 files changed

+82
-18
lines changed

4 files changed

+82
-18
lines changed

act/registry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ func runActionWithTimeout(
407407

408408
// RunAll run all the actions in the outputs and returns the end result.
409409
func (r *Registry) RunAll(result map[string]any) map[string]any {
410+
if len(result) == 0 {
411+
return result
412+
}
413+
410414
if _, exists := result[sdkAct.Outputs]; !exists {
411415
r.Logger.Debug().Msg("Outputs key is not present, returning the result as-is")
412416
return result

cmd/run.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func StopGracefully(
109109
defer cancel()
110110

111111
//nolint:contextcheck
112-
_, err := pluginRegistry.Run(
112+
result, err := pluginRegistry.Run(
113113
pluginTimeoutCtx,
114114
map[string]any{"signal": currentSignal},
115115
v1.HookName_HOOK_NAME_ON_SIGNAL,
@@ -118,6 +118,9 @@ func StopGracefully(
118118
logger.Error().Err(err).Msg("Failed to run OnSignal hooks")
119119
span.RecordError(err)
120120
}
121+
if result != nil {
122+
_ = pluginRegistry.ActRegistry.RunAll(result) //nolint:contextcheck
123+
}
121124
}
122125

123126
logger.Info().Msg("GatewayD is shutting down")
@@ -434,6 +437,9 @@ var runCmd = &cobra.Command{
434437
logger.Error().Err(err).Msg("Failed to run OnConfigLoaded hooks")
435438
span.RecordError(err)
436439
}
440+
if updatedGlobalConfig != nil {
441+
updatedGlobalConfig = pluginRegistry.ActRegistry.RunAll(updatedGlobalConfig)
442+
}
437443

438444
// If the config was modified by the plugins, merge it with the one loaded from the file.
439445
// Only global configuration is merged, which means that plugins cannot modify the plugin
@@ -606,12 +612,15 @@ var runCmd = &cobra.Command{
606612
defer cancel()
607613

608614
if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]any); ok {
609-
_, err = pluginRegistry.Run(
615+
result, err := pluginRegistry.Run(
610616
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_LOGGER)
611617
if err != nil {
612618
logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks")
613619
span.RecordError(err)
614620
}
621+
if result != nil {
622+
_ = pluginRegistry.ActRegistry.RunAll(result)
623+
}
615624
} else {
616625
logger.Error().Msg("Failed to get loggers from config")
617626
}
@@ -767,12 +776,15 @@ var runCmd = &cobra.Command{
767776
"backoffMultiplier": clientConfig.BackoffMultiplier,
768777
"disableBackoffCaps": clientConfig.DisableBackoffCaps,
769778
}
770-
_, err := pluginRegistry.Run(
779+
result, err := pluginRegistry.Run(
771780
pluginTimeoutCtx, clientCfg, v1.HookName_HOOK_NAME_ON_NEW_CLIENT)
772781
if err != nil {
773782
logger.Error().Err(err).Msg("Failed to run OnNewClient hooks")
774783
span.RecordError(err)
775784
}
785+
if result != nil {
786+
_ = pluginRegistry.ActRegistry.RunAll(result)
787+
}
776788

777789
err = pools[configGroupName][configBlockName].Put(client.ID, client)
778790
if err != nil {
@@ -822,14 +834,17 @@ var runCmd = &cobra.Command{
822834
context.Background(), conf.Plugin.Timeout)
823835
defer cancel()
824836

825-
_, err = pluginRegistry.Run(
837+
result, err := pluginRegistry.Run(
826838
pluginTimeoutCtx,
827839
map[string]any{"name": configBlockName, "size": currentPoolSize},
828840
v1.HookName_HOOK_NAME_ON_NEW_POOL)
829841
if err != nil {
830842
logger.Error().Err(err).Msg("Failed to run OnNewPool hooks")
831843
span.RecordError(err)
832844
}
845+
if result != nil {
846+
_ = pluginRegistry.ActRegistry.RunAll(result)
847+
}
833848
}
834849
}
835850

@@ -877,12 +892,15 @@ var runCmd = &cobra.Command{
877892
defer cancel()
878893

879894
if data, ok := conf.GlobalKoanf.Get("proxies").(map[string]any); ok {
880-
_, err = pluginRegistry.Run(
895+
result, err := pluginRegistry.Run(
881896
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_PROXY)
882897
if err != nil {
883898
logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks")
884899
span.RecordError(err)
885900
}
901+
if result != nil {
902+
_ = pluginRegistry.ActRegistry.RunAll(result)
903+
}
886904
} else {
887905
logger.Error().Msg("Failed to get proxy from config")
888906
}
@@ -948,12 +966,15 @@ var runCmd = &cobra.Command{
948966
defer cancel()
949967

950968
if data, ok := conf.GlobalKoanf.Get("servers").(map[string]any); ok {
951-
_, err = pluginRegistry.Run(
969+
result, err := pluginRegistry.Run(
952970
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_SERVER)
953971
if err != nil {
954972
logger.Error().Err(err).Msg("Failed to run OnNewServer hooks")
955973
span.RecordError(err)
956974
}
975+
if result != nil {
976+
_ = pluginRegistry.ActRegistry.RunAll(result)
977+
}
957978
} else {
958979
logger.Error().Msg("Failed to get the servers configuration")
959980
}

network/proxy.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate
446446
defer cancel()
447447

448448
// Run the OnTrafficToServer hooks.
449-
_, err = pr.PluginRegistry.Run(
449+
result, err = pr.PluginRegistry.Run(
450450
pluginTimeoutCtx,
451451
trafficData(
452452
conn.Conn(),
@@ -463,8 +463,11 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate
463463
pr.Logger.Error().Err(err).Msg("Error running hook")
464464
span.RecordError(err)
465465
}
466-
span.AddEvent("Ran the OnTrafficToServer hooks")
466+
if result != nil {
467+
_ = pr.PluginRegistry.ActRegistry.RunAll(result)
468+
}
467469

470+
span.AddEvent("Ran the OnTrafficToServer hooks")
468471
metrics.ProxyPassThroughsToServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc()
469472

470473
return nil
@@ -558,6 +561,9 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate
558561
pr.Logger.Error().Err(err).Msg("Error running hook")
559562
span.RecordError(err)
560563
}
564+
if result != nil {
565+
result = pr.PluginRegistry.ActRegistry.RunAll(result)
566+
}
561567
span.AddEvent("Ran the OnTrafficFromServer hooks")
562568

563569
// If the hook modified the response, use the modified response.
@@ -575,7 +581,7 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate
575581
pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), pr.PluginTimeout)
576582
defer cancel()
577583

578-
_, err = pr.PluginRegistry.Run(
584+
result, err = pr.PluginRegistry.Run(
579585
pluginTimeoutCtx,
580586
trafficData(
581587
conn.Conn(),
@@ -597,6 +603,10 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate
597603
pr.Logger.Error().Err(err).Msg("Error running hook")
598604
span.RecordError(err)
599605
}
606+
if result != nil {
607+
_ = pr.PluginRegistry.ActRegistry.RunAll(result)
608+
}
609+
span.AddEvent("Ran the OnTrafficToClient hooks")
600610

601611
if errVerdict != nil {
602612
span.RecordError(errVerdict)

network/server.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,17 @@ func (s *Server) OnBoot() Action {
9898
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
9999
defer cancel()
100100
// Run the OnBooting hooks.
101-
_, err := s.PluginRegistry.Run(
101+
result, err := s.PluginRegistry.Run(
102102
pluginTimeoutCtx,
103103
map[string]any{"status": fmt.Sprint(s.Status)},
104104
v1.HookName_HOOK_NAME_ON_BOOTING)
105105
if err != nil {
106106
s.Logger.Error().Err(err).Msg("Failed to run OnBooting hook")
107107
span.RecordError(err)
108108
}
109+
if result != nil {
110+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
111+
}
109112
span.AddEvent("Ran the OnBooting hooks")
110113

111114
// Set the server status to running.
@@ -117,14 +120,17 @@ func (s *Server) OnBoot() Action {
117120
pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), s.PluginTimeout)
118121
defer cancel()
119122

120-
_, err = s.PluginRegistry.Run(
123+
result, err = s.PluginRegistry.Run(
121124
pluginTimeoutCtx,
122125
map[string]any{"status": fmt.Sprint(s.Status)},
123126
v1.HookName_HOOK_NAME_ON_BOOTED)
124127
if err != nil {
125128
s.Logger.Error().Err(err).Msg("Failed to run OnBooted hook")
126129
span.RecordError(err)
127130
}
131+
if result != nil {
132+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
133+
}
128134
span.AddEvent("Ran the OnBooted hooks")
129135

130136
s.Logger.Debug().Msg("GatewayD booted")
@@ -150,12 +156,15 @@ func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) {
150156
"remote": RemoteAddr(conn.Conn()),
151157
},
152158
}
153-
_, err := s.PluginRegistry.Run(
159+
result, err := s.PluginRegistry.Run(
154160
pluginTimeoutCtx, onOpeningData, v1.HookName_HOOK_NAME_ON_OPENING)
155161
if err != nil {
156162
s.Logger.Error().Err(err).Msg("Failed to run OnOpening hook")
157163
span.RecordError(err)
158164
}
165+
if result != nil {
166+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
167+
}
159168
span.AddEvent("Ran the OnOpening hooks")
160169

161170
// Attempt to retrieve the next proxy.
@@ -195,12 +204,15 @@ func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) {
195204
"remote": RemoteAddr(conn.Conn()),
196205
},
197206
}
198-
_, err = s.PluginRegistry.Run(
207+
result, err = s.PluginRegistry.Run(
199208
pluginTimeoutCtx, onOpenedData, v1.HookName_HOOK_NAME_ON_OPENED)
200209
if err != nil {
201210
s.Logger.Error().Err(err).Msg("Failed to run OnOpened hook")
202211
span.RecordError(err)
203212
}
213+
if result != nil {
214+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
215+
}
204216
span.AddEvent("Ran the OnOpened hooks")
205217

206218
metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Inc()
@@ -231,12 +243,15 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action {
231243
if err != nil {
232244
data["error"] = err.Error()
233245
}
234-
_, gatewaydErr := s.PluginRegistry.Run(
246+
result, gatewaydErr := s.PluginRegistry.Run(
235247
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_CLOSING)
236248
if gatewaydErr != nil {
237249
s.Logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosing hook")
238250
span.RecordError(gatewaydErr)
239251
}
252+
if result != nil {
253+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
254+
}
240255
span.AddEvent("Ran the OnClosing hooks")
241256

242257
// Shutdown the server if there are no more connections and the server is stopped.
@@ -291,12 +306,15 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action {
291306
if err != nil {
292307
data["error"] = err.Error()
293308
}
294-
_, gatewaydErr = s.PluginRegistry.Run(
309+
result, gatewaydErr = s.PluginRegistry.Run(
295310
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_CLOSED)
296311
if gatewaydErr != nil {
297312
s.Logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosed hook")
298313
span.RecordError(gatewaydErr)
299314
}
315+
if result != nil {
316+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
317+
}
300318
span.AddEvent("Ran the OnClosed hooks")
301319

302320
metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec()
@@ -320,12 +338,15 @@ func (s *Server) OnTraffic(conn *ConnWrapper, stopConnection chan struct{}) Acti
320338
"remote": RemoteAddr(conn.Conn()),
321339
},
322340
}
323-
_, err := s.PluginRegistry.Run(
341+
result, err := s.PluginRegistry.Run(
324342
pluginTimeoutCtx, onTrafficData, v1.HookName_HOOK_NAME_ON_TRAFFIC)
325343
if err != nil {
326344
s.Logger.Error().Err(err).Msg("Failed to run OnTraffic hook")
327345
span.RecordError(err)
328346
}
347+
if result != nil {
348+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
349+
}
329350
span.AddEvent("Ran the OnTraffic hooks")
330351

331352
stack := NewStack()
@@ -391,14 +412,17 @@ func (s *Server) OnShutdown() {
391412
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
392413
defer cancel()
393414
// Run the OnShutdown hooks.
394-
_, err := s.PluginRegistry.Run(
415+
result, err := s.PluginRegistry.Run(
395416
pluginTimeoutCtx,
396417
map[string]any{"connections": s.CountConnections()},
397418
v1.HookName_HOOK_NAME_ON_SHUTDOWN)
398419
if err != nil {
399420
s.Logger.Error().Err(err).Msg("Failed to run OnShutdown hook")
400421
span.RecordError(err)
401422
}
423+
if result != nil {
424+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
425+
}
402426
span.AddEvent("Ran the OnShutdown hooks")
403427

404428
// Shutdown proxies.
@@ -424,14 +448,17 @@ func (s *Server) OnTick() (time.Duration, Action) {
424448
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
425449
defer cancel()
426450
// Run the OnTick hooks.
427-
_, err := s.PluginRegistry.Run(
451+
result, err := s.PluginRegistry.Run(
428452
pluginTimeoutCtx,
429453
map[string]any{"connections": s.CountConnections()},
430454
v1.HookName_HOOK_NAME_ON_TICK)
431455
if err != nil {
432456
s.Logger.Error().Err(err).Msg("Failed to run OnTick hook")
433457
span.RecordError(err)
434458
}
459+
if result != nil {
460+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
461+
}
435462
span.AddEvent("Ran the OnTick hooks")
436463

437464
// TODO: Investigate whether to move schedulers here or not
@@ -474,6 +501,8 @@ func (s *Server) Run() *gerr.GatewayDError {
474501
span.AddEvent("Ran the OnRun hooks")
475502

476503
if result != nil {
504+
_ = s.PluginRegistry.ActRegistry.RunAll(result)
505+
477506
if errMsg, ok := result["error"].(string); ok && errMsg != "" {
478507
s.Logger.Error().Str("error", errMsg).Msg("Error in hook")
479508
}

0 commit comments

Comments
 (0)