Skip to content

Commit 2e59c1b

Browse files
dsessler7cbandy
authored andcommitted
Parse PgBouncer logs using the OTel Collector
Issue: PGO-2056
1 parent 08ab9a4 commit 2e59c1b

File tree

5 files changed

+220
-1
lines changed

5 files changed

+220
-1
lines changed

internal/collector/pgbouncer.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"slices"
1313

1414
"github.com/crunchydata/postgres-operator/internal/feature"
15+
"github.com/crunchydata/postgres-operator/internal/naming"
1516
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1617
)
1718

@@ -33,11 +34,120 @@ func NewConfigForPgBouncerPod(
3334

3435
config := NewConfig()
3536

37+
EnablePgBouncerLogging(ctx, cluster, config)
3638
EnablePgBouncerMetrics(ctx, config, sqlQueryUsername)
3739

3840
return config
3941
}
4042

43+
// EnablePgBouncerLogging adds necessary configuration to the collector config to collect
44+
// logs from pgBouncer when the OpenTelemetryLogging feature flag is enabled.
45+
func EnablePgBouncerLogging(ctx context.Context,
46+
inCluster *v1beta1.PostgresCluster,
47+
outConfig *Config) {
48+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
49+
directory := naming.PGBouncerLogPath
50+
51+
// Keep track of what log records and files have been processed.
52+
// Use a subdirectory of the logs directory to stay within the same failure domain.
53+
//
54+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/extension/storage/filestorage#readme
55+
outConfig.Extensions["file_storage/pgbouncer_logs"] = map[string]any{
56+
"directory": directory + "/receiver",
57+
"create_directory": true,
58+
"fsync": true,
59+
}
60+
61+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/receiver/filelogreceiver#readme
62+
outConfig.Receivers["filelog/pgbouncer_log"] = map[string]any{
63+
// Read the log files and keep track of what has been processed.
64+
"include": []string{directory + "/*.log"},
65+
"storage": "file_storage/pgbouncer_logs",
66+
}
67+
68+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/resourceprocessor#readme
69+
outConfig.Processors["resource/pgbouncer"] = map[string]any{
70+
"attributes": []map[string]any{
71+
// Container and Namespace names need no escaping because they are DNS labels.
72+
// Pod names need no escaping because they are DNS subdomains.
73+
//
74+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
75+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
76+
{"action": "insert", "key": "k8s.container.name", "value": naming.ContainerPGBouncer},
77+
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
78+
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
79+
},
80+
}
81+
82+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
83+
outConfig.Processors["transform/pgbouncer_logs"] = map[string]any{
84+
"log_statements": []map[string]any{{
85+
"context": "log",
86+
"statements": []string{
87+
// Set instrumentation scope
88+
`set(instrumentation_scope.name, "pgbouncer")`,
89+
90+
// Extract timestamp, pid, log level, and message and store in cache.
91+
`merge_maps(cache, ExtractPatterns(body, ` +
92+
`"^(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) ` +
93+
`\\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+) (?<msg>.*$)"), "insert")`,
94+
95+
// https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitytext
96+
`set(severity_text, cache["log_level"])`,
97+
98+
// Map pgBouncer (libusual) "logging levels" to OpenTelemetry severity levels.
99+
//
100+
// https://github.com/libusual/libusual/blob/master/usual/logging.c
101+
// https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
102+
// https://opentelemetry.io/docs/specs/otel/logs/data-model-appendix/#appendix-b-severitynumber-example-mappings
103+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/pkg/ottl/contexts/ottllog#enums
104+
`set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "NOISE" or severity_text == "DEBUG"`,
105+
`set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "LOG"`,
106+
`set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"`,
107+
`set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"`,
108+
`set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "FATAL"`,
109+
110+
// Parse the timestamp.
111+
// The format is neither RFC 3339 nor ISO 8601:
112+
//
113+
// The date and time are separated by a single space U+0020,
114+
// followed by a dot U+002E, milliseconds, another space U+0020,
115+
// then a timezone abbreviation.
116+
//
117+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/pkg/stanza/docs/types/timestamp.md
118+
`set(time, Time(cache["timestamp"], "%F %T.%L %Z"))`,
119+
120+
// Keep the unparsed log record in a standard attribute, and replace
121+
// the log record body with the message field.
122+
//
123+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/logs.md
124+
`set(attributes["log.record.original"], body)`,
125+
126+
// Set pid as attribute
127+
`set(attributes["process.pid"], cache["pid"])`,
128+
129+
// Set the log message to body.
130+
`set(body, cache["msg"])`,
131+
},
132+
}},
133+
}
134+
135+
outConfig.Pipelines["logs/pgbouncer"] = Pipeline{
136+
Extensions: []ComponentID{"file_storage/pgbouncer_logs"},
137+
Receivers: []ComponentID{"filelog/pgbouncer_log"},
138+
Processors: []ComponentID{
139+
"resource/pgbouncer",
140+
"transform/pgbouncer_logs",
141+
SubSecondBatchProcessor,
142+
CompactingProcessor,
143+
},
144+
Exporters: []ComponentID{DebugExporter},
145+
}
146+
}
147+
}
148+
149+
// EnablePgBouncerMetrics adds necessary configuration to the collector config to scrape
150+
// metrics from pgBouncer when the OpenTelemetryMetrics feature flag is enabled.
41151
func EnablePgBouncerMetrics(ctx context.Context, config *Config, sqlQueryUsername string) {
42152
if feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
43153
// Add Prometheus exporter
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"context"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
13+
"github.com/crunchydata/postgres-operator/internal/feature"
14+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
15+
)
16+
17+
func TestEnablePgBouncerLogging(t *testing.T) {
18+
t.Run("Enabled", func(t *testing.T) {
19+
gate := feature.NewGate()
20+
assert.NilError(t, gate.SetFromMap(map[string]bool{
21+
feature.OpenTelemetryLogs: true,
22+
}))
23+
ctx := feature.NewContext(context.Background(), gate)
24+
25+
config := NewConfig()
26+
27+
EnablePgBouncerLogging(ctx, new(v1beta1.PostgresCluster), config)
28+
29+
result, err := config.ToYAML()
30+
assert.NilError(t, err)
31+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
32+
# Your changes will not be saved.
33+
exporters:
34+
debug:
35+
verbosity: detailed
36+
extensions:
37+
file_storage/pgbouncer_logs:
38+
create_directory: true
39+
directory: /tmp/receiver
40+
fsync: true
41+
processors:
42+
batch/1s:
43+
timeout: 1s
44+
batch/200ms:
45+
timeout: 200ms
46+
groupbyattrs/compact: {}
47+
resource/pgbouncer:
48+
attributes:
49+
- action: insert
50+
key: k8s.container.name
51+
value: pgbouncer
52+
- action: insert
53+
key: k8s.namespace.name
54+
value: ${env:K8S_POD_NAMESPACE}
55+
- action: insert
56+
key: k8s.pod.name
57+
value: ${env:K8S_POD_NAME}
58+
transform/pgbouncer_logs:
59+
log_statements:
60+
- context: log
61+
statements:
62+
- set(instrumentation_scope.name, "pgbouncer")
63+
- merge_maps(cache, ExtractPatterns(body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}
64+
\\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) \\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+)
65+
(?<msg>.*$)"), "insert")
66+
- set(severity_text, cache["log_level"])
67+
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "NOISE"
68+
or severity_text == "DEBUG"
69+
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "LOG"
70+
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"
71+
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
72+
- set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "FATAL"
73+
- set(time, Time(cache["timestamp"], "%F %T.%L %Z"))
74+
- set(attributes["log.record.original"], body)
75+
- set(attributes["process.pid"], cache["pid"])
76+
- set(body, cache["msg"])
77+
receivers:
78+
filelog/pgbouncer_log:
79+
include:
80+
- /tmp/*.log
81+
storage: file_storage/pgbouncer_logs
82+
service:
83+
extensions:
84+
- file_storage/pgbouncer_logs
85+
pipelines:
86+
logs/pgbouncer:
87+
exporters:
88+
- debug
89+
processors:
90+
- resource/pgbouncer
91+
- transform/pgbouncer_logs
92+
- batch/200ms
93+
- groupbyattrs/compact
94+
receivers:
95+
- filelog/pgbouncer_log
96+
`)
97+
})
98+
}

internal/controller/postgrescluster/pgbouncer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ func (r *Reconciler) generatePGBouncerDeployment(
465465
pgbouncer.Pod(ctx, cluster, configmap, primaryCertificate, secret, &deploy.Spec.Template.Spec)
466466
}
467467

468+
// Add tmp directory and volume for log files
469+
addTMPEmptyDir(&deploy.Spec.Template)
470+
468471
return deploy, true, err
469472
}
470473

internal/naming/names.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ const (
155155
// dedicated repo host, if configured.
156156
PGBackRestRepoLogPath = "/pgbackrest/%s/log"
157157

158+
// PGBouncerLogPath is the pgBouncer default log path configuration
159+
PGBouncerLogPath = "/tmp"
160+
158161
// suffix used with postgrescluster name for associated configmap.
159162
// for instance, if the cluster is named 'mycluster', the
160163
// configmap will be named 'mycluster-pgbackrest-config'

internal/pgbouncer/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ func clusterINI(ctx context.Context, cluster *v1beta1.PostgresCluster) string {
126126
"unix_socket_dir": "",
127127
}
128128

129-
// When OTel metrics are enabled, allow pgbouncer's postgres user
129+
// If OpenTelemetryLogs feature is enabled, enable logging to file
130+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
131+
global["logfile"] = naming.PGBouncerLogPath + "/pgbouncer.log"
132+
}
133+
134+
// When OTel metrics are enabled, allow pgBouncer's postgres user
130135
// to run read-only console queries on pgBouncer's virtual db
131136
if feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
132137
global["stats_users"] = PostgresqlUser

0 commit comments

Comments
 (0)