Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions internal/collector/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ func PostgreSQLParameters(ctx context.Context,
// https://www.postgresql.org/docs/current/runtime-config-logging.html
outParameters.Mandatory.Add("logging_collector", "on")

// PostgreSQL v8.3 adds support for CSV logging, and
// PostgreSQL v15 adds support for JSON logging.
// The latter is preferred because newlines are escaped as "\n", U+005C + U+006E.
if version >= 15 {
outParameters.Mandatory.Add("log_destination", "jsonlog")
} else {
// Enable structured logging. This setting is combined with any specified on the cluster.
//
// The JSON format of PostgreSQL v15 delimits messages with newline U+000A
// and escapes newlines in message content as "\n", U+005C + U+006E.
// JSON keys take up space on disk, but newline-delimited is easy to parse.
outParameters.Mandatory.Add("log_destination", "jsonlog")

// The only structured format prior to PostgreSQL v15 is the CSV format, added in PostgreSQL v8.3.
// This format does *not* escape newlines, so the Collector must search for the beginning of each message.
// Forcing the UTC timezone ensures a consistent beginning to each message.
if version < 15 {
outParameters.Mandatory.Add("log_destination", "csvlog")
outParameters.Mandatory.Add("log_timezone", "UTC")
}

// Log in a timezone the OpenTelemetry Collector understands.
Expand Down Expand Up @@ -211,7 +217,7 @@ func EnablePostgresLogging(

// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/database#readme
{"action": "insert", "key": "db.system", "value": "postgresql"},
{"action": "insert", "key": "db.version", "value": fmt.Sprint(inCluster.Spec.PostgresVersion)},
{"action": "insert", "key": "db.version", "value": fmt.Sprint(version)},
},
}

Expand All @@ -227,13 +233,15 @@ func EnablePostgresLogging(
exporters = slices.Clone(spec.Exporters)
}

// JSON logs are preferable since PostgreSQL v15. These are enabled in [PostgreSQLParameters].
receivers := []ComponentID{"filelog/postgres_jsonlog"}
if version < 15 {
receivers = []ComponentID{"filelog/postgres_csvlog"}
}

outConfig.Pipelines["logs/postgres"] = Pipeline{
Extensions: []ComponentID{"file_storage/postgres_logs"},
// TODO(logs): Choose only one receiver, maybe?
Receivers: []ComponentID{
"filelog/postgres_csvlog",
"filelog/postgres_jsonlog",
},
Receivers: receivers,
Processors: []ComponentID{
"resource/postgres",
"transform/postgres_logs",
Expand Down
71 changes: 69 additions & 2 deletions internal/collector/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ service:
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/postgres_csvlog
- filelog/postgres_jsonlog
`)
})
Expand Down Expand Up @@ -542,7 +541,6 @@ service:
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/postgres_csvlog
- filelog/postgres_jsonlog
`)
})
Expand Down Expand Up @@ -678,6 +676,75 @@ service:
- sqlquery/5s
- sqlquery/300s
`)
})
}

func TestPostgresParameters(t *testing.T) {
t.Run("NoInstrumentation", func(t *testing.T) {
cluster := new(v1beta1.PostgresCluster)
cluster.Spec.PostgresVersion = 99

before := postgres.NewParameters()
params := postgres.NewParameters()
PostgreSQLParameters(t.Context(), cluster, &params)

assert.DeepEqual(t, before, params)
})

t.Run("Specified", func(t *testing.T) {
cluster := new(v1beta1.PostgresCluster)
cluster.Spec.PostgresVersion = 99
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)

// Feature disabled
{
before := postgres.NewParameters()
params := postgres.NewParameters()
PostgreSQLParameters(t.Context(), cluster, &params)

assert.DeepEqual(t, before, params)
}

// Feature enabled
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(t.Context(), gate)

params := postgres.NewParameters()
PostgreSQLParameters(ctx, cluster, &params)

assert.Equal(t, params.Mandatory.Value("log_destination"), "jsonlog")
assert.Assert(t, params.Mandatory.Value("log_filename") != "")
assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "")
assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "")
assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC")
assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on")
assert.Equal(t, params.Mandatory.Value("logging_collector"), "on")
})

t.Run("OldPostgres", func(t *testing.T) {
cluster := new(v1beta1.PostgresCluster)
cluster.Spec.PostgresVersion = 10
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)

// Feature enabled
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(t.Context(), gate)

params := postgres.NewParameters()
PostgreSQLParameters(ctx, cluster, &params)

assert.Equal(t, params.Mandatory.Value("log_destination"), "csvlog")
assert.Assert(t, params.Mandatory.Value("log_filename") != "")
assert.Assert(t, params.Mandatory.Value("log_rotation_age") != "")
assert.Assert(t, params.Mandatory.Value("log_rotation_size") != "")
assert.Equal(t, params.Mandatory.Value("log_timezone"), "UTC")
assert.Equal(t, params.Mandatory.Value("log_truncate_on_rotation"), "on")
assert.Equal(t, params.Mandatory.Value("logging_collector"), "on")
})
}
6 changes: 5 additions & 1 deletion internal/controller/postgrescluster/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,19 @@ func (r *Reconciler) generatePostgresParameters(

// Overwrite the above with mandatory values.
if builtin.Mandatory != nil {
// This parameter is a comma-separated list. Rather than overwrite the
// These parameters are comma-separated lists. Rather than overwrite the
// user-defined value, we want to combine it with the mandatory one.
destination := result.Value("log_destination")
preload := result.Value("shared_preload_libraries")

for k, v := range builtin.Mandatory.AsMap() {
// Load mandatory libraries ahead of user-defined libraries.
if k == "shared_preload_libraries" && len(v) > 0 && len(preload) > 0 {
v = v + "," + preload
}
if k == "log_destination" && len(v) > 0 && len(destination) > 0 {
v = v + "," + destination
}

result.Add(k, v)
}
Expand Down
35 changes: 34 additions & 1 deletion internal/controller/postgrescluster/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestGeneratePostgresHBAs(t *testing.T) {
}

func TestGeneratePostgresParameters(t *testing.T) {
ctx := context.Background()
ctx := t.Context()
reconciler := &Reconciler{}

builtin := reconciler.generatePostgresParameters(ctx, v1beta1.NewPostgresCluster(), false)
Expand Down Expand Up @@ -244,6 +244,39 @@ func TestGeneratePostgresParameters(t *testing.T) {
assert.Equal(t, result.Value("jit"), "on") // Config
})

t.Run("log_destination", func(t *testing.T) {
t.Run("passthrough without instrumentation", func(t *testing.T) {
cluster := v1beta1.NewPostgresCluster()
require.UnmarshalInto(t, &cluster.Spec.Config, `{
parameters: {
log_destination: stderr
},
}`)

result := reconciler.generatePostgresParameters(ctx, cluster, false)
assert.Equal(t, result.Value("log_destination"), "stderr")
})

t.Run("combine with intrumentation", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(t.Context(), gate)

cluster := v1beta1.NewPostgresCluster()
require.UnmarshalInto(t, &cluster.Spec.Instrumentation, `{}`)
require.UnmarshalInto(t, &cluster.Spec.Config, `{
parameters: {
log_destination: stderr
},
}`)

result := reconciler.generatePostgresParameters(ctx, cluster, false)
assert.Equal(t, result.Value("log_destination"), "csvlog,stderr")
})
})

t.Run("shared_preload_libraries", func(t *testing.T) {
t.Run("NumericIncluded", func(t *testing.T) {
cluster := v1beta1.NewPostgresCluster()
Expand Down
11 changes: 11 additions & 0 deletions internal/postgres/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ func (ps *ParameterSet) DeepCopy() *ParameterSet {
}
}

func (ps *ParameterSet) Equal(other *ParameterSet) bool {
if ps == nil && other == nil {
return true
}
if ps == nil || other == nil {
return false
}

return maps.Equal(ps.values, other.values)
}

// Add sets parameter name to value.
func (ps *ParameterSet) Add(name, value string) {
ps.values[ps.normalize(name)] = value
Expand Down
40 changes: 40 additions & 0 deletions internal/postgres/parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,43 @@ func TestParameterSetAppendToList(t *testing.T) {
ps.AppendToList("full", "a", "cd", `"e"`)
assert.Equal(t, ps.Value("full"), `a,b,a,cd,"e"`)
}

func TestParameterSetEqual(t *testing.T) {
var Nil *ParameterSet
ps1 := NewParameterSet()
ps2 := NewParameterSet()

// nil equals nil, and empty does not equal nil
assert.Assert(t, Nil.Equal(nil))
assert.Assert(t, !Nil.Equal(ps1))
assert.Assert(t, !ps1.Equal(nil))

// empty equals empty
assert.Assert(t, ps1.Equal(ps2))
assert.Assert(t, ps2.Equal(ps1))

// different keys are not equal
ps1.Add("a", "b")
assert.Assert(t, !ps1.Equal(nil))
assert.Assert(t, !Nil.Equal(ps1))
assert.Assert(t, !ps1.Equal(ps2))
assert.Assert(t, !ps2.Equal(ps1))

// different values are not equal
ps2.Add("a", "c")
assert.Assert(t, !ps1.Equal(ps2))
assert.Assert(t, !ps2.Equal(ps1))

// normalized keys+values are equal
ps1.Add("A", "c")
assert.Assert(t, ps1.Equal(ps2))
assert.Assert(t, ps2.Equal(ps1))

// [assert.DeepEqual] can only compare exported fields.
// When present, the `(T) Equal(T) bool` method is used instead.
//
// https://pkg.go.dev/github.com/google/go-cmp/cmp#Equal
t.Run("DeepEqual", func(t *testing.T) {
assert.DeepEqual(t, NewParameterSet(), NewParameterSet())
})
}
Loading