Skip to content

Commit a468cb9

Browse files
dsessler7cbandy
authored andcommitted
Parse PgBouncer logs using the OTel Collector
Issue: PGO-2056
1 parent 81a44e5 commit a468cb9

File tree

5 files changed

+220
-1
lines changed

5 files changed

+220
-1
lines changed

internal/collector/pgbouncer.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"slices"
1313

1414
"github.com/crunchydata/postgres-operator/internal/feature"
15+
"github.com/crunchydata/postgres-operator/internal/naming"
1516
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1617
)
1718

@@ -32,11 +33,120 @@ func NewConfigForPgBouncerPod(
3233

3334
config := NewConfig()
3435

36+
EnablePgBouncerLogging(ctx, cluster, config)
3537
EnablePgBouncerMetrics(ctx, config, sqlQueryUsername)
3638

3739
return config
3840
}
3941

42+
// EnablePgBouncerLogging adds necessary configuration to the collector config to collect
43+
// logs from pgBouncer when the OpenTelemetryLogging feature flag is enabled.
44+
func EnablePgBouncerLogging(ctx context.Context,
45+
inCluster *v1beta1.PostgresCluster,
46+
outConfig *Config) {
47+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
48+
directory := naming.PGBouncerLogPath
49+
50+
// Keep track of what log records and files have been processed.
51+
// Use a subdirectory of the logs directory to stay within the same failure domain.
52+
//
53+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/extension/storage/filestorage#readme
54+
outConfig.Extensions["file_storage/pgbouncer_logs"] = map[string]any{
55+
"directory": directory + "/receiver",
56+
"create_directory": true,
57+
"fsync": true,
58+
}
59+
60+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/receiver/filelogreceiver#readme
61+
outConfig.Receivers["filelog/pgbouncer_log"] = map[string]any{
62+
// Read the log files and keep track of what has been processed.
63+
"include": []string{directory + "/*.log"},
64+
"storage": "file_storage/pgbouncer_logs",
65+
}
66+
67+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/resourceprocessor#readme
68+
outConfig.Processors["resource/pgbouncer"] = map[string]any{
69+
"attributes": []map[string]any{
70+
// Container and Namespace names need no escaping because they are DNS labels.
71+
// Pod names need no escaping because they are DNS subdomains.
72+
//
73+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
74+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
75+
{"action": "insert", "key": "k8s.container.name", "value": naming.ContainerPGBouncer},
76+
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
77+
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
78+
},
79+
}
80+
81+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
82+
outConfig.Processors["transform/pgbouncer_logs"] = map[string]any{
83+
"log_statements": []map[string]any{{
84+
"context": "log",
85+
"statements": []string{
86+
// Set instrumentation scope
87+
`set(instrumentation_scope.name, "pgbouncer")`,
88+
89+
// Extract timestamp, pid, log level, and message and store in cache.
90+
`merge_maps(cache, ExtractPatterns(body, ` +
91+
`"^(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) ` +
92+
`\\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+) (?<msg>.*$)"), "insert")`,
93+
94+
// https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitytext
95+
`set(severity_text, cache["log_level"])`,
96+
97+
// Map pgBouncer (libusual) "logging levels" to OpenTelemetry severity levels.
98+
//
99+
// https://github.com/libusual/libusual/blob/master/usual/logging.c
100+
// https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
101+
// https://opentelemetry.io/docs/specs/otel/logs/data-model-appendix/#appendix-b-severitynumber-example-mappings
102+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/pkg/ottl/contexts/ottllog#enums
103+
`set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "NOISE" or severity_text == "DEBUG"`,
104+
`set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "LOG"`,
105+
`set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"`,
106+
`set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"`,
107+
`set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "FATAL"`,
108+
109+
// Parse the timestamp.
110+
// The format is neither RFC 3339 nor ISO 8601:
111+
//
112+
// The date and time are separated by a single space U+0020,
113+
// followed by a dot U+002E, milliseconds, another space U+0020,
114+
// then a timezone abbreviation.
115+
//
116+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/pkg/stanza/docs/types/timestamp.md
117+
`set(time, Time(cache["timestamp"], "%F %T.%L %Z"))`,
118+
119+
// Keep the unparsed log record in a standard attribute, and replace
120+
// the log record body with the message field.
121+
//
122+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/logs.md
123+
`set(attributes["log.record.original"], body)`,
124+
125+
// Set pid as attribute
126+
`set(attributes["process.pid"], cache["pid"])`,
127+
128+
// Set the log message to body.
129+
`set(body, cache["msg"])`,
130+
},
131+
}},
132+
}
133+
134+
outConfig.Pipelines["logs/pgbouncer"] = Pipeline{
135+
Extensions: []ComponentID{"file_storage/pgbouncer_logs"},
136+
Receivers: []ComponentID{"filelog/pgbouncer_log"},
137+
Processors: []ComponentID{
138+
"resource/pgbouncer",
139+
"transform/pgbouncer_logs",
140+
SubSecondBatchProcessor,
141+
CompactingProcessor,
142+
},
143+
Exporters: []ComponentID{DebugExporter},
144+
}
145+
}
146+
}
147+
148+
// EnablePgBouncerMetrics adds necessary configuration to the collector config to scrape
149+
// metrics from pgBouncer when the OpenTelemetryMetrics feature flag is enabled.
40150
func EnablePgBouncerMetrics(ctx context.Context, config *Config, sqlQueryUsername string) {
41151
if feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
42152
// Add Prometheus exporter
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"context"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
13+
"github.com/crunchydata/postgres-operator/internal/feature"
14+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
15+
)
16+
17+
func TestEnablePgBouncerLogging(t *testing.T) {
18+
t.Run("Enabled", func(t *testing.T) {
19+
gate := feature.NewGate()
20+
assert.NilError(t, gate.SetFromMap(map[string]bool{
21+
feature.OpenTelemetryLogs: true,
22+
}))
23+
ctx := feature.NewContext(context.Background(), gate)
24+
25+
config := NewConfig()
26+
27+
EnablePgBouncerLogging(ctx, new(v1beta1.PostgresCluster), config)
28+
29+
result, err := config.ToYAML()
30+
assert.NilError(t, err)
31+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
32+
# Your changes will not be saved.
33+
exporters:
34+
debug:
35+
verbosity: detailed
36+
extensions:
37+
file_storage/pgbouncer_logs:
38+
create_directory: true
39+
directory: /tmp/receiver
40+
fsync: true
41+
processors:
42+
batch/1s:
43+
timeout: 1s
44+
batch/200ms:
45+
timeout: 200ms
46+
groupbyattrs/compact: {}
47+
resource/pgbouncer:
48+
attributes:
49+
- action: insert
50+
key: k8s.container.name
51+
value: pgbouncer
52+
- action: insert
53+
key: k8s.namespace.name
54+
value: ${env:K8S_POD_NAMESPACE}
55+
- action: insert
56+
key: k8s.pod.name
57+
value: ${env:K8S_POD_NAME}
58+
transform/pgbouncer_logs:
59+
log_statements:
60+
- context: log
61+
statements:
62+
- set(instrumentation_scope.name, "pgbouncer")
63+
- merge_maps(cache, ExtractPatterns(body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}
64+
\\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) \\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+)
65+
(?<msg>.*$)"), "insert")
66+
- set(severity_text, cache["log_level"])
67+
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "NOISE"
68+
or severity_text == "DEBUG"
69+
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "LOG"
70+
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"
71+
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
72+
- set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "FATAL"
73+
- set(time, Time(cache["timestamp"], "%F %T.%L %Z"))
74+
- set(attributes["log.record.original"], body)
75+
- set(attributes["process.pid"], cache["pid"])
76+
- set(body, cache["msg"])
77+
receivers:
78+
filelog/pgbouncer_log:
79+
include:
80+
- /tmp/*.log
81+
storage: file_storage/pgbouncer_logs
82+
service:
83+
extensions:
84+
- file_storage/pgbouncer_logs
85+
pipelines:
86+
logs/pgbouncer:
87+
exporters:
88+
- debug
89+
processors:
90+
- resource/pgbouncer
91+
- transform/pgbouncer_logs
92+
- batch/200ms
93+
- groupbyattrs/compact
94+
receivers:
95+
- filelog/pgbouncer_log
96+
`)
97+
})
98+
}

internal/controller/postgrescluster/pgbouncer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ func (r *Reconciler) generatePGBouncerDeployment(
465465
pgbouncer.Pod(ctx, cluster, configmap, primaryCertificate, secret, &deploy.Spec.Template.Spec)
466466
}
467467

468+
// Add tmp directory and volume for log files
469+
addTMPEmptyDir(&deploy.Spec.Template)
470+
468471
return deploy, true, err
469472
}
470473

internal/naming/names.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ const (
155155
// dedicated repo host, if configured.
156156
PGBackRestRepoLogPath = "/pgbackrest/%s/log"
157157

158+
// PGBouncerLogPath is the pgBouncer default log path configuration
159+
PGBouncerLogPath = "/tmp"
160+
158161
// suffix used with postgrescluster name for associated configmap.
159162
// for instance, if the cluster is named 'mycluster', the
160163
// configmap will be named 'mycluster-pgbackrest-config'

internal/pgbouncer/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ func clusterINI(ctx context.Context, cluster *v1beta1.PostgresCluster) string {
126126
"unix_socket_dir": "",
127127
}
128128

129-
// When OTel metrics are enabled, allow pgbouncer's postgres user
129+
// If OpenTelemetryLogs feature is enabled, enable logging to file
130+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
131+
global["logfile"] = naming.PGBouncerLogPath + "/pgbouncer.log"
132+
}
133+
134+
// When OTel metrics are enabled, allow pgBouncer's postgres user
130135
// to run read-only console queries on pgBouncer's virtual db
131136
if feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
132137
global["stats_users"] = PostgresqlUser

0 commit comments

Comments
 (0)