Skip to content

Commit 5e966e3

Browse files
benjaminjbcbandy
andcommitted
Parse Postgres and pgAudit logs using the OTel Collector
Postgres can log in two structured formats: CSV and JSON since Postgres 15. The two formats are very similar semantically, so this parses them in a shared OTTL transform processor. Co-authored-by: Chris Bandy <chris.bandy@crunchydata.com> Issue: PGO-2033 Issue: PGO-2065
1 parent eee1b37 commit 5e966e3

File tree

14 files changed

+642
-14
lines changed

14 files changed

+642
-14
lines changed

internal/collector/generated/postgres_logs_transforms.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/collector/instance.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func AddToPod(
4040
volumeMounts []corev1.VolumeMount,
4141
sqlQueryPassword string,
4242
) {
43-
if !feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
43+
if !(feature.Enabled(ctx, feature.OpenTelemetryLogs) || feature.Enabled(ctx, feature.OpenTelemetryMetrics)) {
4444
return
4545
}
4646

@@ -67,10 +67,22 @@ func AddToPod(
6767
container := corev1.Container{
6868
Name: naming.ContainerCollector,
6969

70-
Image: "ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.116.1",
70+
Image: "ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.117.0",
7171
ImagePullPolicy: inCluster.Spec.ImagePullPolicy,
7272
Command: []string{"/otelcol-contrib", "--config", "/etc/otel-collector/config.yaml"},
7373
Env: []corev1.EnvVar{
74+
{
75+
Name: "K8S_POD_NAMESPACE",
76+
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{
77+
FieldPath: "metadata.namespace",
78+
}},
79+
},
80+
{
81+
Name: "K8S_POD_NAME",
82+
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{
83+
FieldPath: "metadata.name",
84+
}},
85+
},
7486
{
7587
Name: "PGPASSWORD",
7688
Value: sqlQueryPassword,
@@ -81,11 +93,13 @@ func AddToPod(
8193
VolumeMounts: append(volumeMounts, configVolumeMount),
8294
}
8395

84-
container.Ports = []corev1.ContainerPort{{
85-
ContainerPort: int32(8889),
86-
Name: "otel-metrics",
87-
Protocol: corev1.ProtocolTCP,
88-
}}
96+
if feature.Enabled(ctx, feature.OpenTelemetryMetrics) {
97+
container.Ports = []corev1.ContainerPort{{
98+
ContainerPort: int32(8889),
99+
Name: "otel-metrics",
100+
Protocol: corev1.ProtocolTCP,
101+
}}
102+
}
89103

90104
outPod.Containers = append(outPod.Containers, container)
91105
outPod.Volumes = append(outPod.Volumes, configVolume)

internal/collector/postgres.go

Lines changed: 187 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,200 @@ package collector
66

77
import (
88
"context"
9+
_ "embed"
10+
"encoding/json"
11+
"fmt"
12+
"slices"
913

14+
"github.com/crunchydata/postgres-operator/internal/feature"
15+
"github.com/crunchydata/postgres-operator/internal/naming"
16+
"github.com/crunchydata/postgres-operator/internal/postgres"
1017
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1118
)
1219

13-
func NewConfigForPostgresPod(ctx context.Context, inCluster *v1beta1.PostgresCluster) *Config {
20+
func NewConfigForPostgresPod(ctx context.Context,
21+
inCluster *v1beta1.PostgresCluster,
22+
outParameters *postgres.Parameters,
23+
) *Config {
1424
config := NewConfig()
1525

1626
EnablePatroniMetrics(ctx, inCluster, config)
27+
EnablePostgresLogging(ctx, inCluster, config, outParameters)
1728

1829
return config
1930
}
31+
32+
// The contents of "postgres_logs_transforms.yaml" as JSON.
33+
// See: https://pkg.go.dev/embed
34+
//
35+
//go:embed "generated/postgres_logs_transforms.json"
36+
var postgresLogsTransforms json.RawMessage
37+
38+
// postgresCSVNames returns the names of fields in the CSV logs for version.
39+
func postgresCSVNames(version int) string {
40+
// JSON is the preferred format, so use those names.
41+
// https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-JSONLOG
42+
43+
// https://www.postgresql.org/docs/8.3/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
44+
names := `timestamp,user,dbname,pid` +
45+
`,connection_from` + // NOTE: this contains the JSON "remote_host" and "remote_port" values
46+
`,session_id,line_num,ps,session_start,vxid,txid` +
47+
`,error_severity,state_code,message,detail,hint` +
48+
`,internal_query,internal_position,context,statement,cursor_position` +
49+
`,location` // NOTE: this contains the JSON "func_name", "file_name", and "file_line_num" values
50+
51+
// https://www.postgresql.org/docs/9.0/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
52+
if version >= 9 {
53+
names += `,application_name`
54+
}
55+
56+
// https://www.postgresql.org/docs/13/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
57+
if version >= 13 {
58+
names += `,backend_type`
59+
}
60+
61+
// https://www.postgresql.org/docs/14/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
62+
if version >= 14 {
63+
names += `,leader_pid,query_id`
64+
}
65+
66+
return names
67+
}
68+
69+
func EnablePostgresLogging(
70+
ctx context.Context,
71+
inCluster *v1beta1.PostgresCluster,
72+
outConfig *Config,
73+
outParameters *postgres.Parameters,
74+
) {
75+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
76+
directory := postgres.LogDirectory()
77+
78+
// https://www.postgresql.org/docs/current/runtime-config-logging.html
79+
outParameters.Mandatory.Add("logging_collector", "on")
80+
outParameters.Mandatory.Add("log_directory", directory)
81+
82+
// PostgreSQL v8.3 adds support for CSV logging, and
83+
// PostgreSQL v15 adds support for JSON logging. The latter is preferred
84+
// because newlines are escaped as "\n", U+005C + U+006E.
85+
if inCluster.Spec.PostgresVersion < 15 {
86+
outParameters.Mandatory.Add("log_destination", "csvlog")
87+
} else {
88+
outParameters.Mandatory.Add("log_destination", "jsonlog")
89+
}
90+
91+
// Keep seven days of logs named for the day of the week;
92+
// this has been the default produced by `initdb` for some time now.
93+
// NOTE: The automated portions of log_filename are *entirely* based
94+
// on time. There is no spelling that is guaranteed to be unique or
95+
// monotonically increasing.
96+
//
97+
// TODO(logs): Limit the size/bytes of logs without losing messages;
98+
// probably requires another process that deletes the oldest files.
99+
//
100+
// The ".log" suffix is replaced by ".json" for JSON log files.
101+
outParameters.Mandatory.Add("log_filename", "postgresql-%a.log")
102+
outParameters.Mandatory.Add("log_file_mode", "0660")
103+
outParameters.Mandatory.Add("log_rotation_age", "1d")
104+
outParameters.Mandatory.Add("log_rotation_size", "0")
105+
outParameters.Mandatory.Add("log_truncate_on_rotation", "on")
106+
107+
// Log in a timezone that the OpenTelemetry Collector will understand.
108+
outParameters.Mandatory.Add("log_timezone", "UTC")
109+
110+
// Keep track of what log records and files have been processed.
111+
// Use a subdirectory of the logs directory to stay within the same failure domain.
112+
//
113+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/extension/storage/filestorage#readme
114+
outConfig.Extensions["file_storage/postgres_logs"] = map[string]any{
115+
"directory": directory + "/receiver",
116+
"create_directory": true,
117+
"fsync": true,
118+
}
119+
120+
// TODO(postgres-14): We can stop parsing CSV logs when 14 is EOL.
121+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/receiver/filelogreceiver#readme
122+
outConfig.Receivers["filelog/postgres_csvlog"] = map[string]any{
123+
// Read the CSV files and keep track of what has been processed.
124+
"include": []string{directory + "/*.csv"},
125+
"storage": "file_storage/postgres_logs",
126+
127+
// Postgres does not escape newlines in its CSV log format. Search for
128+
// the beginning of every record, starting with an unquoted timestamp.
129+
// The 2nd through 5th fields are optional, so match through to the 7th field.
130+
// This should do a decent job of not matching the middle of some SQL statement.
131+
//
132+
// The number of fields has changed over the years, but the first few
133+
// are always formatted the same way.
134+
//
135+
// NOTE: This regexp is invoked in multi-line mode. https://go.dev/s/re2syntax
136+
"multiline": map[string]string{
137+
"line_start_pattern": `^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d.\d{3} UTC` + // 1st: timestamp
138+
`,(?:"[_\D](?:[^"]|"")*")?` + // 2nd: user name
139+
`,(?:"[_\D](?:[^"]|"")*")?` + // 3rd: database name
140+
`,\d*,(?:"(?:[^"]|"")+")?` + // 4–5th: process id, connection
141+
`,[0-9a-f]+[.][0-9a-f]+,\d+,`, // 6–7th: session id, session line
142+
},
143+
144+
// Differentiate these from the JSON ones below.
145+
"operators": []map[string]any{
146+
{"type": "move", "from": "body", "to": "body.original"},
147+
{"type": "add", "field": "body.format", "value": "csv"},
148+
{"type": "add", "field": "body.headers", "value": postgresCSVNames(inCluster.Spec.PostgresVersion)},
149+
},
150+
}
151+
152+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/receiver/filelogreceiver#readme
153+
outConfig.Receivers["filelog/postgres_jsonlog"] = map[string]any{
154+
// Read the JSON files and keep track of what has been processed.
155+
"include": []string{directory + "/*.json"},
156+
"storage": "file_storage/postgres_logs",
157+
158+
// Differentiate these from the CSV ones above.
159+
// TODO(postgres-14): We can stop parsing CSV logs when 14 is EOL.
160+
"operators": []map[string]any{
161+
{"type": "move", "from": "body", "to": "body.original"},
162+
{"type": "add", "field": "body.format", "value": "json"},
163+
},
164+
}
165+
166+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/resourceprocessor#readme
167+
outConfig.Processors["resource/postgres"] = map[string]any{
168+
"attributes": []map[string]any{
169+
// Container and Namespace names need no escaping because they are DNS labels.
170+
// Pod names need no escaping because they are DNS subdomains.
171+
//
172+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
173+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
174+
{"action": "insert", "key": "k8s.container.name", "value": naming.ContainerDatabase},
175+
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
176+
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
177+
178+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/database#readme
179+
{"action": "insert", "key": "db.system", "value": "postgresql"},
180+
{"action": "insert", "key": "db.version", "value": fmt.Sprint(inCluster.Spec.PostgresVersion)},
181+
},
182+
}
183+
184+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
185+
outConfig.Processors["transform/postgres_logs"] = map[string]any{
186+
"log_statements": slices.Clone(postgresLogsTransforms),
187+
}
188+
189+
outConfig.Pipelines["logs/postgres"] = Pipeline{
190+
Extensions: []ComponentID{"file_storage/postgres_logs"},
191+
// TODO(logs): Choose only one receiver, maybe?
192+
Receivers: []ComponentID{
193+
"filelog/postgres_csvlog",
194+
"filelog/postgres_jsonlog",
195+
},
196+
Processors: []ComponentID{
197+
"resource/postgres",
198+
"transform/postgres_logs",
199+
SubSecondBatchProcessor,
200+
CompactingProcessor,
201+
},
202+
Exporters: []ComponentID{DebugExporter},
203+
}
204+
}
205+
}

0 commit comments

Comments
 (0)