Skip to content

Commit a9dae18

Browse files
committed
Flush OpenTelemetry data with a deadline
Issue: PGO-1954
1 parent 58351d3 commit a9dae18

File tree

4 files changed

+78
-35
lines changed

4 files changed

+78
-35
lines changed

cmd/postgres-operator/main.go

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"fmt"
1010
"net/http"
1111
"os"
12+
"os/signal"
1213
"strconv"
1314
"strings"
15+
"syscall"
1416
"time"
1517
"unicode"
1618

@@ -58,8 +60,8 @@ func initLogging() {
5860

5961
//+kubebuilder:rbac:groups="coordination.k8s.io",resources="leases",verbs={get,create,update,watch}
6062

61-
func initManager() (runtime.Options, error) {
62-
log := logging.FromContext(context.Background())
63+
func initManager(ctx context.Context) (runtime.Options, error) {
64+
log := logging.FromContext(ctx)
6365

6466
options := runtime.Options{}
6567
options.Cache.SyncPeriod = initialize.Pointer(time.Hour)
@@ -120,39 +122,63 @@ func initManager() (runtime.Options, error) {
120122
}
121123

122124
func main() {
123-
// This context is canceled by SIGINT, SIGTERM, or by calling shutdown.
124-
ctx, shutdown := context.WithCancel(runtime.SignalHandler())
125+
starting, stopStarting := context.WithCancel(context.Background())
126+
defer stopStarting()
125127

126-
otelFlush, err := initOpenTelemetry()
127-
assertNoError(err)
128-
defer otelFlush()
128+
running, stopRunning := context.WithCancel(context.Background())
129+
defer stopRunning()
129130

130131
initLogging()
131-
132-
log := logging.FromContext(ctx)
132+
log := logging.FromContext(starting)
133133
log.V(1).Info("debug flag set to true")
134134

135+
// Start a goroutine that waits for SIGINT or SIGTERM.
136+
{
137+
signals := []os.Signal{os.Interrupt, syscall.SIGTERM}
138+
receive := make(chan os.Signal, len(signals))
139+
signal.Notify(receive, signals...)
140+
go func() {
141+
// Wait for a signal then immediately restore the default signal handlers.
142+
// After this, a SIGHUP, SIGINT, or SIGTERM causes the program to exit.
143+
// - https://pkg.go.dev/os/signal#hdr-Default_behavior_of_signals_in_Go_programs
144+
s := <-receive
145+
signal.Stop(receive)
146+
147+
log.Info("received signal from OS", "signal", s.String())
148+
149+
// Cancel the "starting" and "running" contexts.
150+
stopStarting()
151+
stopRunning()
152+
}()
153+
}
154+
135155
features := feature.NewGate()
136156
assertNoError(features.Set(os.Getenv("PGO_FEATURE_GATES")))
137157
log.Info("feature gates enabled", "PGO_FEATURE_GATES", features.String())
138158

159+
// Initialize OpenTelemetry and flush data when there is a panic.
160+
otelFinish, err := initOpenTelemetry(starting)
161+
assertNoError(err)
162+
defer otelFinish(starting)
163+
139164
cfg, err := runtime.GetConfig()
140165
assertNoError(err)
141166

142167
cfg.Wrap(otelTransportWrapper())
143168

169+
// TODO(controller-runtime): Set config.WarningHandler instead after v0.19.0.
144170
// Configure client-go to suppress warnings when warning headers are encountered. This prevents
145171
// warnings from being logged over and over again during reconciliation (e.g. this will suppress
146172
// deprecation warnings when using an older version of a resource for backwards compatibility).
147173
rest.SetDefaultWarningHandler(rest.NoWarnings{})
148174

149175
k8s, err := kubernetes.NewDiscoveryRunner(cfg)
150176
assertNoError(err)
151-
assertNoError(k8s.Read(ctx))
177+
assertNoError(k8s.Read(starting))
152178

153-
log.Info("Connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())
179+
log.Info("connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())
154180

155-
options, err := initManager()
181+
options, err := initManager(starting)
156182
assertNoError(err)
157183

158184
// Add to the Context that Manager passes to Reconciler.Start, Runnable.Start,
@@ -168,7 +194,7 @@ func main() {
168194
assertNoError(err)
169195
assertNoError(mgr.Add(k8s))
170196

171-
registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), shutdown)
197+
registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), stopRunning)
172198
assertNoError(err)
173199
assertNoError(mgr.Add(registrar))
174200
token, _ := registrar.CheckToken()
@@ -206,10 +232,30 @@ func main() {
206232
assertNoError(mgr.AddHealthzCheck("health", healthz.Ping))
207233
assertNoError(mgr.AddReadyzCheck("check", healthz.Ping))
208234

209-
log.Info("starting controller runtime manager and will wait for signal to exit")
235+
// Start the manager and wait for its context to be canceled.
236+
stopped := make(chan error, 1)
237+
go func() { stopped <- mgr.Start(running) }()
238+
<-running.Done()
239+
240+
// Set a deadline for graceful termination.
241+
log.Info("shutting down")
242+
stopping, cancel := context.WithTimeout(context.Background(), 20*time.Second)
243+
defer cancel()
244+
245+
// Wait for the manager to return or the deadline to pass.
246+
select {
247+
case err = <-stopped:
248+
case <-stopping.Done():
249+
err = stopping.Err()
250+
}
210251

211-
assertNoError(mgr.Start(ctx))
212-
log.Info("signal received, exiting")
252+
// Flush any telemetry with the remaining time we have.
253+
otelFinish(stopping)
254+
if err != nil {
255+
log.Error(err, "shutdown failed")
256+
} else {
257+
log.Info("shutdown complete")
258+
}
213259
}
214260

215261
// addControllersToManager adds all PostgreSQL Operator controllers to the provided controller

cmd/postgres-operator/main_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package main
66

77
import (
8+
"context"
89
"reflect"
910
"testing"
1011
"time"
@@ -14,8 +15,10 @@ import (
1415
)
1516

1617
func TestInitManager(t *testing.T) {
18+
ctx := context.Background()
19+
1720
t.Run("Defaults", func(t *testing.T) {
18-
options, err := initManager()
21+
options, err := initManager(ctx)
1922
assert.NilError(t, err)
2023

2124
if assert.Check(t, options.Cache.SyncPeriod != nil) {
@@ -48,7 +51,7 @@ func TestInitManager(t *testing.T) {
4851
t.Run("Invalid", func(t *testing.T) {
4952
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "INVALID_NAME")
5053

51-
options, err := initManager()
54+
options, err := initManager(ctx)
5255
assert.ErrorContains(t, err, "PGO_CONTROLLER_LEASE_NAME")
5356
assert.ErrorContains(t, err, "invalid")
5457

@@ -59,7 +62,7 @@ func TestInitManager(t *testing.T) {
5962
t.Run("Valid", func(t *testing.T) {
6063
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "valid-name")
6164

62-
options, err := initManager()
65+
options, err := initManager(ctx)
6366
assert.NilError(t, err)
6467
assert.Assert(t, options.LeaderElection == true)
6568
assert.Equal(t, options.LeaderElectionNamespace, "test-namespace")
@@ -70,7 +73,7 @@ func TestInitManager(t *testing.T) {
7073
t.Run("PGO_TARGET_NAMESPACE", func(t *testing.T) {
7174
t.Setenv("PGO_TARGET_NAMESPACE", "some-such")
7275

73-
options, err := initManager()
76+
options, err := initManager(ctx)
7477
assert.NilError(t, err)
7578
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 1),
7679
"expected only one configured namespace")
@@ -81,7 +84,7 @@ func TestInitManager(t *testing.T) {
8184
t.Run("PGO_TARGET_NAMESPACES", func(t *testing.T) {
8285
t.Setenv("PGO_TARGET_NAMESPACES", "some-such,another-one")
8386

84-
options, err := initManager()
87+
options, err := initManager(ctx)
8588
assert.NilError(t, err)
8689
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 2),
8790
"expect two configured namespaces")
@@ -95,7 +98,7 @@ func TestInitManager(t *testing.T) {
9598
for _, v := range []string{"-3", "0", "3.14"} {
9699
t.Setenv("PGO_WORKERS", v)
97100

98-
options, err := initManager()
101+
options, err := initManager(ctx)
99102
assert.NilError(t, err)
100103
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
101104
map[string]int{
@@ -107,7 +110,7 @@ func TestInitManager(t *testing.T) {
107110
t.Run("Valid", func(t *testing.T) {
108111
t.Setenv("PGO_WORKERS", "19")
109112

110-
options, err := initManager()
113+
options, err := initManager(ctx)
111114
assert.NilError(t, err)
112115
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
113116
map[string]int{

cmd/postgres-operator/open_telemetry.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"go.opentelemetry.io/otel/sdk/trace"
2020
)
2121

22-
func initOpenTelemetry() (func(), error) {
22+
func initOpenTelemetry(ctx context.Context) (func(context.Context), error) {
2323
// At the time of this writing, the SDK (go.opentelemetry.io/otel@v1.2.0)
2424
// does not automatically initialize any exporter. We import the OTLP and
2525
// stdout exporters and configure them below. Much of the OTLP exporter can
@@ -49,8 +49,8 @@ func initOpenTelemetry() (func(), error) {
4949
}
5050

5151
provider := trace.NewTracerProvider(trace.WithBatcher(exporter))
52-
flush := func() {
53-
_ = provider.Shutdown(context.TODO())
52+
flush := func(ctx context.Context) {
53+
_ = provider.Shutdown(ctx)
5454
if closer != nil {
5555
_ = closer.Close()
5656
}
@@ -67,8 +67,8 @@ func initOpenTelemetry() (func(), error) {
6767
}
6868

6969
provider := trace.NewTracerProvider(trace.WithBatcher(exporter))
70-
flush := func() {
71-
_ = provider.Shutdown(context.TODO())
70+
flush := func(ctx context.Context) {
71+
_ = provider.Shutdown(ctx)
7272
}
7373

7474
otel.SetTracerProvider(provider)
@@ -78,7 +78,7 @@ func initOpenTelemetry() (func(), error) {
7878
// $OTEL_TRACES_EXPORTER is unset or unknown, so no TracerProvider has been assigned.
7979
// The default at this time is a single "no-op" tracer.
8080

81-
return func() {}, nil
81+
return func(context.Context) {}, nil
8282
}
8383

8484
// otelTransportWrapper creates a function that wraps the provided net/http.RoundTripper

internal/controller/runtime/runtime.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@
55
package runtime
66

77
import (
8-
"context"
9-
108
"k8s.io/apimachinery/pkg/runtime"
119
"k8s.io/client-go/kubernetes/scheme"
1210
"k8s.io/client-go/rest"
1311
"sigs.k8s.io/controller-runtime/pkg/cache"
1412
"sigs.k8s.io/controller-runtime/pkg/client/config"
1513
"sigs.k8s.io/controller-runtime/pkg/log"
1614
"sigs.k8s.io/controller-runtime/pkg/manager"
17-
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
1815

1916
"github.com/crunchydata/postgres-operator/internal/logging"
2017
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
@@ -71,6 +68,3 @@ func NewManager(config *rest.Config, options manager.Options) (manager.Manager,
7168

7269
// SetLogger assigns the default Logger used by [sigs.k8s.io/controller-runtime].
7370
func SetLogger(logger logging.Logger) { log.SetLogger(logger) }
74-
75-
// SignalHandler returns a Context that is canceled on SIGINT or SIGTERM.
76-
func SignalHandler() context.Context { return signals.SetupSignalHandler() }

0 commit comments

Comments
 (0)