Skip to content

Commit 540ce54

Browse files
benjaminjbcbandy
authored andcommitted
Add pgBackRest repohost log collector
Issue: PGO-2058
1 parent 5857bcb commit 540ce54

File tree

9 files changed

+403
-13
lines changed

9 files changed

+403
-13
lines changed

internal/collector/generated/pgbackrest_logs_transforms.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/collector/pgbackrest.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"context"
9+
_ "embed"
10+
"encoding/json"
11+
"fmt"
12+
"slices"
13+
14+
"github.com/crunchydata/postgres-operator/internal/feature"
15+
"github.com/crunchydata/postgres-operator/internal/naming"
16+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
17+
)
18+
19+
// The contents of "pgbackrest_logs_transforms.yaml" as JSON.
20+
//
21+
//go:embed "generated/pgbackrest_logs_transforms.json"
22+
var pgBackRestLogsTransforms json.RawMessage
23+
24+
func NewConfigForPgBackrestRepoHostPod(
25+
ctx context.Context,
26+
repos []v1beta1.PGBackRestRepo,
27+
) *Config {
28+
config := NewConfig()
29+
30+
if feature.Enabled(ctx, feature.OpenTelemetryLogs) {
31+
32+
var directory string
33+
for _, repo := range repos {
34+
if repo.Volume != nil {
35+
directory = fmt.Sprintf(naming.PGBackRestRepoLogPath, repo.Name)
36+
break
37+
}
38+
}
39+
40+
// We should only enter this function if a PVC is assigned for a dedicated repohost
41+
// but if we don't have one, exit early.
42+
if directory == "" {
43+
return config
44+
}
45+
46+
// Keep track of what log records and files have been processed.
47+
// Use a subdirectory of the logs directory to stay within the same failure domain.
48+
config.Extensions["file_storage/pgbackrest_logs"] = map[string]any{
49+
"directory": directory + "/receiver",
50+
"create_directory": true,
51+
"fsync": true,
52+
}
53+
54+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/receiver/filelogreceiver#readme
55+
config.Receivers["filelog/pgbackrest_log"] = map[string]any{
56+
// Read the files and keep track of what has been processed.
57+
"include": []string{
58+
directory + "/*.log",
59+
},
60+
"storage": "file_storage/pgbackrest_logs",
61+
// pgBackRest prints logs with a log prefix, which includes a timestamp
62+
// as long as the timestamp is not turned off in the configuration.
63+
// When pgBackRest starts a process, it also will print a newline
64+
// (if the file has already been written to) and a process "banner"
65+
// which looks like "-------------------PROCESS START-------------------\n".
66+
// Therefore we break multiline on the timestamp or the 19 dashes that start the banner.
67+
// - https://github.com/pgbackrest/pgbackrest/blob/main/src/common/log.c#L451
68+
"multiline": map[string]string{
69+
"line_start_pattern": `^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19}`,
70+
},
71+
}
72+
73+
config.Processors["resource/pgbackrest"] = map[string]any{
74+
"attributes": []map[string]any{
75+
// Container and Namespace names need no escaping because they are DNS labels.
76+
// Pod names need no escaping because they are DNS subdomains.
77+
//
78+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
79+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
80+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/logs.md
81+
{"action": "insert", "key": "k8s.container.name", "value": naming.PGBackRestRepoContainerName},
82+
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
83+
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
84+
},
85+
}
86+
87+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
88+
config.Processors["transform/pgbackrest_logs"] = map[string]any{
89+
"log_statements": slices.Clone(pgBackRestLogsTransforms),
90+
}
91+
92+
config.Pipelines["logs/pgbackrest"] = Pipeline{
93+
Extensions: []ComponentID{"file_storage/pgbackrest_logs"},
94+
Receivers: []ComponentID{"filelog/pgbackrest_log"},
95+
Processors: []ComponentID{
96+
"resource/pgbackrest",
97+
"transform/pgbackrest_logs",
98+
SubSecondBatchProcessor,
99+
CompactingProcessor,
100+
},
101+
Exporters: []ComponentID{DebugExporter},
102+
}
103+
}
104+
return config
105+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# This list of transform statements configures an OTel Transform Processor to
2+
# parse pgbackrest logs.
3+
#
4+
# https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
5+
6+
- context: log
7+
statements:
8+
- set(instrumentation_scope.name, "pgbackrest")
9+
- set(instrumentation_scope.schema_url, "https://opentelemetry.io/schemas/1.29.0")
10+
11+
# Regex the pgbackrest log to capture the following groups:
12+
# 1) the timestamp (form YYYY-MM-DD HH:MM:SS.sss)
13+
# 2) the process id (form `P` + 2 or 3 digits)
14+
# 3) the log level (form INFO, WARN, etc.)
15+
# 4) the message (anything else, including newline -- we can do this because we have a multiline block on the receiver)
16+
- >-
17+
merge_maps(cache,
18+
ExtractPatterns(body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?<process_id>P\\d{2,3})\\s*(?<error_severity>\\S*): (?<message>(?s).*)$"),
19+
"insert")
20+
where Len(body) > 0
21+
22+
# The log severity is the "error_severity" field.
23+
# https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitytext
24+
# https://pgbackrest.org/configuration.html#section-log/option-log-level-file
25+
- set(severity_text, cache["error_severity"]) where IsString(cache["error_severity"])
26+
- set(severity_number, SEVERITY_NUMBER_TRACE) where severity_text == "TRACE"
27+
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "DEBUG"
28+
- set(severity_number, SEVERITY_NUMBER_DEBUG2) where severity_text == "DETAIL"
29+
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "INFO"
30+
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARN"
31+
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
32+
33+
# https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-timestamp
34+
- set(time, Time(cache["timestamp"], "%Y-%m-%d %H:%M:%S.%L")) where IsString(cache["timestamp"])
35+
36+
# https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/attributes-registry/process.md
37+
- set(attributes["process.pid"], cache["process_id"])
38+
39+
# Keep the unparsed log record in a standard attribute,
40+
# and replace the log record body with the message field.
41+
# https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/logs.md
42+
- set(attributes["log.record.original"], body)
43+
- set(body, cache["message"])
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"context"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
13+
"github.com/crunchydata/postgres-operator/internal/feature"
14+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
15+
)
16+
17+
func TestNewConfigForPgBackrestRepoHostPod(t *testing.T) {
18+
t.Run("Enabled", func(t *testing.T) {
19+
gate := feature.NewGate()
20+
assert.NilError(t, gate.SetFromMap(map[string]bool{
21+
feature.OpenTelemetryLogs: true,
22+
}))
23+
ctx := feature.NewContext(context.Background(), gate)
24+
repos := []v1beta1.PGBackRestRepo{
25+
{
26+
Name: "repo1",
27+
Volume: new(v1beta1.RepoPVC),
28+
},
29+
}
30+
31+
config := NewConfigForPgBackrestRepoHostPod(ctx, repos)
32+
33+
result, err := config.ToYAML()
34+
assert.NilError(t, err)
35+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
36+
# Your changes will not be saved.
37+
exporters:
38+
debug:
39+
verbosity: detailed
40+
extensions:
41+
file_storage/pgbackrest_logs:
42+
create_directory: true
43+
directory: /pgbackrest/repo1/log/receiver
44+
fsync: true
45+
processors:
46+
batch/1s:
47+
timeout: 1s
48+
batch/200ms:
49+
timeout: 200ms
50+
groupbyattrs/compact: {}
51+
resource/pgbackrest:
52+
attributes:
53+
- action: insert
54+
key: k8s.container.name
55+
value: pgbackrest
56+
- action: insert
57+
key: k8s.namespace.name
58+
value: ${env:K8S_POD_NAMESPACE}
59+
- action: insert
60+
key: k8s.pod.name
61+
value: ${env:K8S_POD_NAME}
62+
transform/pgbackrest_logs:
63+
log_statements:
64+
- context: log
65+
statements:
66+
- set(instrumentation_scope.name, "pgbackrest")
67+
- set(instrumentation_scope.schema_url, "https://opentelemetry.io/schemas/1.29.0")
68+
- 'merge_maps(cache, ExtractPatterns(body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}
69+
\\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?<process_id>P\\d{2,3})\\s*(?<error_severity>\\S*):
70+
(?<message>(?s).*)$"), "insert") where Len(body) > 0'
71+
- set(severity_text, cache["error_severity"]) where IsString(cache["error_severity"])
72+
- set(severity_number, SEVERITY_NUMBER_TRACE) where severity_text == "TRACE"
73+
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "DEBUG"
74+
- set(severity_number, SEVERITY_NUMBER_DEBUG2) where severity_text == "DETAIL"
75+
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "INFO"
76+
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARN"
77+
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
78+
- set(time, Time(cache["timestamp"], "%Y-%m-%d %H:%M:%S.%L")) where IsString(cache["timestamp"])
79+
- set(attributes["process.pid"], cache["process_id"])
80+
- set(attributes["log.record.original"], body)
81+
- set(body, cache["message"])
82+
receivers:
83+
filelog/pgbackrest_log:
84+
include:
85+
- /pgbackrest/repo1/log/*.log
86+
multiline:
87+
line_start_pattern: ^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19}
88+
storage: file_storage/pgbackrest_logs
89+
service:
90+
extensions:
91+
- file_storage/pgbackrest_logs
92+
pipelines:
93+
logs/pgbackrest:
94+
exporters:
95+
- debug
96+
processors:
97+
- resource/pgbackrest
98+
- transform/pgbackrest_logs
99+
- batch/200ms
100+
- groupbyattrs/compact
101+
receivers:
102+
- filelog/pgbackrest_log
103+
`)
104+
})
105+
}

internal/collector/postgres.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,5 +202,59 @@ func EnablePostgresLogging(
202202
},
203203
Exporters: []ComponentID{DebugExporter},
204204
}
205+
206+
// pgBackRest pipeline
207+
outConfig.Extensions["file_storage/pgbackrest_logs"] = map[string]any{
208+
"directory": naming.PGBackRestPGDataLogPath + "/receiver",
209+
"create_directory": true,
210+
"fsync": true,
211+
}
212+
213+
outConfig.Receivers["filelog/pgbackrest_log"] = map[string]any{
214+
"include": []string{naming.PGBackRestPGDataLogPath + "/*.log"},
215+
"storage": "file_storage/pgbackrest_logs",
216+
217+
// pgBackRest prints logs with a log prefix, which includes a timestamp
218+
// as long as the timestamp is not turned off in the configuration.
219+
// When pgBackRest starts a process, it also will print a newline
220+
// (if the file has already been written to) and a process "banner"
221+
// which looks like "-------------------PROCESS START-------------------\n".
222+
// Therefore we break multiline on the timestamp or the 19 dashes that start the banner.
223+
// - https://github.com/pgbackrest/pgbackrest/blob/main/src/common/log.c#L451
224+
"multiline": map[string]string{
225+
"line_start_pattern": `^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19}`,
226+
},
227+
}
228+
229+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/resourceprocessor#readme
230+
outConfig.Processors["resource/pgbackrest"] = map[string]any{
231+
"attributes": []map[string]any{
232+
// Container and Namespace names need no escaping because they are DNS labels.
233+
// Pod names need no escaping because they are DNS subdomains.
234+
//
235+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names
236+
// https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/resource/k8s.md
237+
{"action": "insert", "key": "k8s.container.name", "value": naming.ContainerDatabase},
238+
{"action": "insert", "key": "k8s.namespace.name", "value": "${env:K8S_POD_NAMESPACE}"},
239+
{"action": "insert", "key": "k8s.pod.name", "value": "${env:K8S_POD_NAME}"},
240+
},
241+
}
242+
243+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/-/processor/transformprocessor#readme
244+
outConfig.Processors["transform/pgbackrest_logs"] = map[string]any{
245+
"log_statements": slices.Clone(pgBackRestLogsTransforms),
246+
}
247+
248+
outConfig.Pipelines["logs/pgbackrest"] = Pipeline{
249+
Extensions: []ComponentID{"file_storage/pgbackrest_logs"},
250+
Receivers: []ComponentID{"filelog/pgbackrest_log"},
251+
Processors: []ComponentID{
252+
"resource/pgbackrest",
253+
"transform/pgbackrest_logs",
254+
SubSecondBatchProcessor,
255+
CompactingProcessor,
256+
},
257+
Exporters: []ComponentID{DebugExporter},
258+
}
205259
}
206260
}

0 commit comments

Comments
 (0)