diff --git a/internal/collector/postgres.go b/internal/collector/postgres.go index a279be33c..5bfdb03bc 100644 --- a/internal/collector/postgres.go +++ b/internal/collector/postgres.go @@ -54,13 +54,19 @@ func PostgreSQLParameters(ctx context.Context, // https://www.postgresql.org/docs/current/runtime-config-logging.html outParameters.Mandatory.Add("logging_collector", "on") - // PostgreSQL v8.3 adds support for CSV logging, and - // PostgreSQL v15 adds support for JSON logging. - // The latter is preferred because newlines are escaped as "\n", U+005C + U+006E. - if version >= 15 { - outParameters.Mandatory.Add("log_destination", "jsonlog") - } else { + // Enable structured logging. This setting is combined with any specified on the cluster. + // + // The JSON format of PostgreSQL v15 delimits messages with newline U+000A + // and escapes newlines in message content as "\n", U+005C + U+006E. + // JSON keys take up space on disk, but newline-delimited is easy to parse. + outParameters.Mandatory.Add("log_destination", "jsonlog") + + // The only structured format prior to PostgreSQL v15 is the CSV format, added in PostgreSQL v8.3. + // This format does *not* escape newlines, so the Collector must search for the beginning of each message. + // Forcing the UTC timezone ensures a consistent beginning to each message. + if version < 15 { outParameters.Mandatory.Add("log_destination", "csvlog") + outParameters.Mandatory.Add("log_timezone", "UTC") } // Log in a timezone the OpenTelemetry Collector understands. @@ -211,7 +217,7 @@ func EnablePostgresLogging( // https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/database#readme {"action": "insert", "key": "db.system", "value": "postgresql"}, - {"action": "insert", "key": "db.version", "value": fmt.Sprint(inCluster.Spec.PostgresVersion)}, + {"action": "insert", "key": "db.version", "value": fmt.Sprint(version)}, }, } @@ -227,13 +233,15 @@ func EnablePostgresLogging( exporters = slices.Clone(spec.Exporters) } + // JSON logs are preferable since PostgreSQL v15. These are enabled in [PostgreSQLParameters]. + receivers := []ComponentID{"filelog/postgres_jsonlog"} + if version < 15 { + receivers = []ComponentID{"filelog/postgres_csvlog"} + } + outConfig.Pipelines["logs/postgres"] = Pipeline{ Extensions: []ComponentID{"file_storage/postgres_logs"}, - // TODO(logs): Choose only one receiver, maybe? - Receivers: []ComponentID{ - "filelog/postgres_csvlog", - "filelog/postgres_jsonlog", - }, + Receivers: receivers, Processors: []ComponentID{ "resource/postgres", "transform/postgres_logs", diff --git a/internal/collector/postgres_test.go b/internal/collector/postgres_test.go index 378e74feb..907fcdd95 100644 --- a/internal/collector/postgres_test.go +++ b/internal/collector/postgres_test.go @@ -276,7 +276,6 @@ service: - batch/logs - groupbyattrs/compact receivers: - - filelog/postgres_csvlog - filelog/postgres_jsonlog `) }) @@ -542,7 +541,6 @@ service: - batch/logs - groupbyattrs/compact receivers: - - filelog/postgres_csvlog - filelog/postgres_jsonlog `) }) @@ -678,6 +676,75 @@ service: - sqlquery/5s - sqlquery/300s `) + }) +} + +func TestPostgresParameters(t *testing.T) { + t.Run("NoInstrumentation", func(t *testing.T) { + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 99 + + before := postgres.NewParameters() + params := postgres.NewParameters() + PostgreSQLParameters(t.Context(), cluster, ¶ms) + + assert.DeepEqual(t, before, params) + }) + + t.Run("Specified", func(t *testing.T) { + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 99 + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`) + + // Feature disabled + { + before := postgres.NewParameters() + params := postgres.NewParameters() + PostgreSQLParameters(t.Context(), cluster, ¶ms) + + assert.DeepEqual(t, before, params) + } + + // Feature enabled + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(t.Context(), gate) + + params := postgres.NewParameters() + PostgreSQLParameters(ctx, cluster, ¶ms) + + assert.Equal(t, params.Mandatory.Value("log_destination"), "jsonlog") + assert.Assert(t, params.Mandatory.Value("log_filename") != "") + assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "") + assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "") + assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC") + assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on") + assert.Equal(t, params.Mandatory.Value("logging_collector"), "on") + }) + + t.Run("OldPostgres", func(t *testing.T) { + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 10 + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`) + + // Feature enabled + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(t.Context(), gate) + + params := postgres.NewParameters() + PostgreSQLParameters(ctx, cluster, ¶ms) + assert.Equal(t, params.Mandatory.Value("log_destination"), "csvlog") + assert.Assert(t, params.Mandatory.Value("log_filename") != "") + assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "") + assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "") + assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC") + assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on") + assert.Equal(t, params.Mandatory.Value("logging_collector"), "on") }) } diff --git a/internal/controller/postgrescluster/postgres.go b/internal/controller/postgrescluster/postgres.go index 33907043f..f24720012 100644 --- a/internal/controller/postgrescluster/postgres.go +++ b/internal/controller/postgrescluster/postgres.go @@ -153,8 +153,9 @@ func (r *Reconciler) generatePostgresParameters( // Overwrite the above with mandatory values. if builtin.Mandatory != nil { - // This parameter is a comma-separated list. Rather than overwrite the + // These parameters are comma-separated lists. Rather than overwrite the // user-defined value, we want to combine it with the mandatory one. + destination := result.Value("log_destination") preload := result.Value("shared_preload_libraries") for k, v := range builtin.Mandatory.AsMap() { @@ -162,6 +163,9 @@ func (r *Reconciler) generatePostgresParameters( if k == "shared_preload_libraries" && len(v) > 0 && len(preload) > 0 { v = v + "," + preload } + if k == "log_destination" && len(v) > 0 && len(destination) > 0 { + v = v + "," + destination + } result.Add(k, v) } diff --git a/internal/controller/postgrescluster/postgres_test.go b/internal/controller/postgrescluster/postgres_test.go index 7754f73c4..7c45ecab9 100644 --- a/internal/controller/postgrescluster/postgres_test.go +++ b/internal/controller/postgrescluster/postgres_test.go @@ -162,7 +162,7 @@ func TestGeneratePostgresHBAs(t *testing.T) { } func TestGeneratePostgresParameters(t *testing.T) { - ctx := context.Background() + ctx := t.Context() reconciler := &Reconciler{} builtin := reconciler.generatePostgresParameters(ctx, v1beta1.NewPostgresCluster(), false) @@ -244,6 +244,39 @@ func TestGeneratePostgresParameters(t *testing.T) { assert.Equal(t, result.Value("jit"), "on") // Config }) + t.Run("log_destination", func(t *testing.T) { + t.Run("passthrough without instrumentation", func(t *testing.T) { + cluster := v1beta1.NewPostgresCluster() + require.UnmarshalInto(t, &cluster.Spec.Config, `{ + parameters: { + log_destination: stderr + }, + }`) + + result := reconciler.generatePostgresParameters(ctx, cluster, false) + assert.Equal(t, result.Value("log_destination"), "stderr") + }) + + t.Run("combine with intrumentation", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(t.Context(), gate) + + cluster := v1beta1.NewPostgresCluster() + require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`) + require.UnmarshalInto(t, &cluster.Spec.Config, `{ + parameters: { + log_destination: stderr + }, + }`) + + result := reconciler.generatePostgresParameters(ctx, cluster, false) + assert.Equal(t, result.Value("log_destination"), "csvlog,stderr") + }) + }) + t.Run("shared_preload_libraries", func(t *testing.T) { t.Run("NumericIncluded", func(t *testing.T) { cluster := v1beta1.NewPostgresCluster() diff --git a/internal/postgres/parameters.go b/internal/postgres/parameters.go index 79080bfc6..fabc99790 100644 --- a/internal/postgres/parameters.go +++ b/internal/postgres/parameters.go @@ -106,6 +106,17 @@ func (ps *ParameterSet) DeepCopy() *ParameterSet { } } +func (ps *ParameterSet) Equal(other *ParameterSet) bool { + if ps == nil && other == nil { + return true + } + if ps == nil || other == nil { + return false + } + + return maps.Equal(ps.values, other.values) +} + // Add sets parameter name to value. func (ps *ParameterSet) Add(name, value string) { ps.values[ps.normalize(name)] = value diff --git a/internal/postgres/parameters_test.go b/internal/postgres/parameters_test.go index 54d299f3c..8f0f8c246 100644 --- a/internal/postgres/parameters_test.go +++ b/internal/postgres/parameters_test.go @@ -97,3 +97,43 @@ func TestParameterSetAppendToList(t *testing.T) { ps.AppendToList("full", "a", "cd", `"e"`) assert.Equal(t, ps.Value("full"), `a,b,a,cd,"e"`) } + +func TestParameterSetEqual(t *testing.T) { + var Nil *ParameterSet + ps1 := NewParameterSet() + ps2 := NewParameterSet() + + // nil equals nil, and empty does not equal nil + assert.Assert(t, Nil.Equal(nil)) + assert.Assert(t, !Nil.Equal(ps1)) + assert.Assert(t, !ps1.Equal(nil)) + + // empty equals empty + assert.Assert(t, ps1.Equal(ps2)) + assert.Assert(t, ps2.Equal(ps1)) + + // different keys are not equal + ps1.Add("a", "b") + assert.Assert(t, !ps1.Equal(nil)) + assert.Assert(t, !Nil.Equal(ps1)) + assert.Assert(t, !ps1.Equal(ps2)) + assert.Assert(t, !ps2.Equal(ps1)) + + // different values are not equal + ps2.Add("a", "c") + assert.Assert(t, !ps1.Equal(ps2)) + assert.Assert(t, !ps2.Equal(ps1)) + + // normalized keys+values are equal + ps1.Add("A", "c") + assert.Assert(t, ps1.Equal(ps2)) + assert.Assert(t, ps2.Equal(ps1)) + + // [assert.DeepEqual] can only compare exported fields. + // When present, the `(T) Equal(T) bool` method is used instead. + // + // https://pkg.go.dev/github.com/google/go-cmp/cmp#Equal + t.Run("DeepEqual", func(t *testing.T) { + assert.DeepEqual(t, NewParameterSet(), NewParameterSet()) + }) +}