Skip to content

Commit a3b0337

Browse files
committed
Create initial API for OTel instrumentation. Allow users to configure exporters via API and add them to logs pipelines.
1 parent 286a94f commit a3b0337

26 files changed

+1761
-38
lines changed

config/crd/bases/postgres-operator.crunchydata.com_pgadmins.yaml

Lines changed: 421 additions & 0 deletions
Large diffs are not rendered by default.

config/crd/bases/postgres-operator.crunchydata.com_postgresclusters.yaml

Lines changed: 421 additions & 0 deletions
Large diffs are not rendered by default.

internal/collector/config.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@ package collector
77
import (
88
"k8s.io/apimachinery/pkg/util/sets"
99
"sigs.k8s.io/yaml"
10+
11+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1012
)
1113

1214
// ComponentID represents a component identifier within an OpenTelemetry
1315
// Collector YAML configuration. Each value is a "type" followed by an optional
1416
// slash-then-name: `type[/name]`
15-
type ComponentID string
17+
type ComponentID = string
18+
19+
// PipelineID represents a pipeline identifier within an OpenTelemetry Collector
20+
// YAML configuration. Each value is a signal followed by an optional
21+
// slash-then-name: `signal[/name]`
22+
type PipelineID = string
1623

1724
// Config represents an OpenTelemetry Collector YAML configuration.
1825
// See: https://opentelemetry.io/docs/collector/configuration
@@ -35,11 +42,6 @@ type Pipeline struct {
3542
Receivers []ComponentID
3643
}
3744

38-
// PipelineID represents a pipeline identifier within an OpenTelemetry Collector
39-
// YAML configuration. Each value is a signal followed by an optional
40-
// slash-then-name: `signal[/name]`
41-
type PipelineID string
42-
4345
func (c *Config) ToYAML() (string, error) {
4446
const yamlGeneratedWarning = "" +
4547
"# Generated by postgres-operator. DO NOT EDIT.\n" +
@@ -71,8 +73,8 @@ func (c *Config) ToYAML() (string, error) {
7173
}
7274

7375
// NewConfig creates a base config for an OTel collector container
74-
func NewConfig() *Config {
75-
return &Config{
76+
func NewConfig(spec *v1beta1.InstrumentationSpec) *Config {
77+
config := &Config{
7678
Exporters: map[ComponentID]any{
7779
// TODO: Do we want a DebugExporter outside of development?
7880
// https://pkg.go.dev/go.opentelemetry.io/collector/exporter/debugexporter#section-readme
@@ -90,4 +92,13 @@ func NewConfig() *Config {
9092
Receivers: map[ComponentID]any{},
9193
Pipelines: map[PipelineID]Pipeline{},
9294
}
95+
96+
// If there are exporters defined in the spec, add them to the config.
97+
if spec != nil && spec.Config != nil && spec.Config.Exporters != nil {
98+
for k, v := range spec.Config.Exporters {
99+
config.Exporters[k] = v
100+
}
101+
}
102+
103+
return config
93104
}

internal/collector/config_test.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ import (
1111
)
1212

1313
func TestConfigToYAML(t *testing.T) {
14-
result, err := NewConfig().ToYAML()
15-
assert.NilError(t, err)
16-
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
14+
t.Run("NilInstrumentationSpec", func(t *testing.T) {
15+
result, err := NewConfig(nil).ToYAML()
16+
assert.NilError(t, err)
17+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
1718
# Your changes will not be saved.
1819
exporters:
1920
debug:
@@ -30,4 +31,33 @@ service:
3031
extensions: []
3132
pipelines: {}
3233
`)
34+
})
35+
36+
t.Run("InstrumentationSpecDefined", func(t *testing.T) {
37+
spec := testInstrumentationSpec()
38+
39+
result, err := NewConfig(spec).ToYAML()
40+
assert.NilError(t, err)
41+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
42+
# Your changes will not be saved.
43+
exporters:
44+
debug:
45+
verbosity: detailed
46+
googlecloud:
47+
log:
48+
default_log_name: opentelemetry.io/collector-exported-log
49+
project: google-project-name
50+
extensions: {}
51+
processors:
52+
batch/1s:
53+
timeout: 1s
54+
batch/200ms:
55+
timeout: 200ms
56+
groupbyattrs/compact: {}
57+
receivers: {}
58+
service:
59+
extensions: []
60+
pipelines: {}
61+
`)
62+
})
3363
}

internal/collector/helpers_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
9+
)
10+
11+
func testInstrumentationSpec() *v1beta1.InstrumentationSpec {
12+
spec := v1beta1.InstrumentationSpec{
13+
Config: &v1beta1.InstrumentationConfigSpec{
14+
Exporters: map[string]any{
15+
"googlecloud": map[string]any{
16+
"log": map[string]any{
17+
"default_log_name": "opentelemetry.io/collector-exported-log",
18+
},
19+
"project": "google-project-name",
20+
},
21+
},
22+
},
23+
Logs: &v1beta1.InstrumentationLogsSpec{
24+
Exporters: []string{"googlecloud"},
25+
},
26+
}
27+
28+
return spec.DeepCopy()
29+
}

internal/collector/instance.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/crunchydata/postgres-operator/internal/feature"
1313
"github.com/crunchydata/postgres-operator/internal/initialize"
1414
"github.com/crunchydata/postgres-operator/internal/naming"
15+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1516
)
1617

1718
// AddToConfigMap populates the shared ConfigMap with fields needed to run the Collector.
@@ -33,6 +34,7 @@ func AddToConfigMap(
3334
// AddToPod adds the OpenTelemetry collector container to a given Pod
3435
func AddToPod(
3536
ctx context.Context,
37+
spec *v1beta1.InstrumentationSpec,
3638
pullPolicy corev1.PullPolicy,
3739
inInstanceConfigMap *corev1.ConfigMap,
3840
outPod *corev1.PodSpec,
@@ -63,6 +65,11 @@ func AddToPod(
6365
}},
6466
}
6567

68+
// If the user has specified files to be mounted in the spec, add them to the projected config volume
69+
if spec != nil && spec.Config != nil && spec.Config.Files != nil {
70+
configVolume.Projected.Sources = append(configVolume.Projected.Sources, spec.Config.Files...)
71+
}
72+
6673
container := corev1.Container{
6774
Name: naming.ContainerCollector,
6875
Image: "ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.117.0",

internal/collector/patroni.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ func EnablePatroniLogging(ctx context.Context,
100100
}},
101101
}
102102

103+
// If there are exporters to be added to the logs pipelines defined in
104+
// the spec, add them to the pipeline. Otherwise, add the DebugExporter.
105+
var exporters []ComponentID
106+
if inCluster.Spec.Instrumentation != nil &&
107+
inCluster.Spec.Instrumentation.Logs != nil &&
108+
inCluster.Spec.Instrumentation.Logs.Exporters != nil {
109+
exporters = inCluster.Spec.Instrumentation.Logs.Exporters
110+
} else {
111+
exporters = []ComponentID{DebugExporter}
112+
}
113+
103114
outConfig.Pipelines["logs/patroni"] = Pipeline{
104115
Extensions: []ComponentID{"file_storage/patroni_logs"},
105116
Receivers: []ComponentID{"filelog/patroni_jsonlog"},
@@ -109,7 +120,7 @@ func EnablePatroniLogging(ctx context.Context,
109120
SubSecondBatchProcessor,
110121
CompactingProcessor,
111122
},
112-
Exporters: []ComponentID{DebugExporter},
123+
Exporters: exporters,
113124
}
114125
}
115126
}

internal/collector/patroni_test.go

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import (
1515
)
1616

1717
func TestEnablePatroniLogging(t *testing.T) {
18-
t.Run("Enabled", func(t *testing.T) {
18+
t.Run("NilInstrumentationSpec", func(t *testing.T) {
1919
gate := feature.NewGate()
2020
assert.NilError(t, gate.SetFromMap(map[string]bool{
2121
feature.OpenTelemetryLogs: true,
2222
}))
2323
ctx := feature.NewContext(context.Background(), gate)
2424

25-
config := NewConfig()
25+
config := NewConfig(nil)
2626

2727
EnablePatroniLogging(ctx, new(v1beta1.PostgresCluster), config)
2828

@@ -93,6 +93,93 @@ service:
9393
- groupbyattrs/compact
9494
receivers:
9595
- filelog/patroni_jsonlog
96+
`)
97+
})
98+
99+
t.Run("InstrumentationSpecDefined", func(t *testing.T) {
100+
gate := feature.NewGate()
101+
assert.NilError(t, gate.SetFromMap(map[string]bool{
102+
feature.OpenTelemetryLogs: true,
103+
}))
104+
ctx := feature.NewContext(context.Background(), gate)
105+
106+
cluster := new(v1beta1.PostgresCluster)
107+
cluster.Spec.Instrumentation = testInstrumentationSpec()
108+
config := NewConfig(cluster.Spec.Instrumentation)
109+
110+
EnablePatroniLogging(ctx, cluster, config)
111+
112+
result, err := config.ToYAML()
113+
assert.NilError(t, err)
114+
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
115+
# Your changes will not be saved.
116+
exporters:
117+
debug:
118+
verbosity: detailed
119+
googlecloud:
120+
log:
121+
default_log_name: opentelemetry.io/collector-exported-log
122+
project: google-project-name
123+
extensions:
124+
file_storage/patroni_logs:
125+
create_directory: true
126+
directory: /pgdata/patroni/log/receiver
127+
fsync: true
128+
processors:
129+
batch/1s:
130+
timeout: 1s
131+
batch/200ms:
132+
timeout: 200ms
133+
groupbyattrs/compact: {}
134+
resource/patroni:
135+
attributes:
136+
- action: insert
137+
key: k8s.container.name
138+
value: database
139+
- action: insert
140+
key: k8s.namespace.name
141+
value: ${env:K8S_POD_NAMESPACE}
142+
- action: insert
143+
key: k8s.pod.name
144+
value: ${env:K8S_POD_NAME}
145+
transform/patroni_logs:
146+
log_statements:
147+
- context: log
148+
statements:
149+
- set(instrumentation_scope.name, "patroni")
150+
- set(cache, ParseJSON(body["original"]))
151+
- set(severity_text, cache["levelname"])
152+
- set(severity_number, SEVERITY_NUMBER_DEBUG) where severity_text == "DEBUG"
153+
- set(severity_number, SEVERITY_NUMBER_INFO) where severity_text == "INFO"
154+
- set(severity_number, SEVERITY_NUMBER_WARN) where severity_text == "WARNING"
155+
- set(severity_number, SEVERITY_NUMBER_ERROR) where severity_text == "ERROR"
156+
- set(severity_number, SEVERITY_NUMBER_FATAL) where severity_text == "CRITICAL"
157+
- set(time, Time(cache["asctime"], "%F %T,%L"))
158+
- set(attributes["log.record.original"], body["original"])
159+
- set(body, cache["message"])
160+
receivers:
161+
filelog/patroni_jsonlog:
162+
include:
163+
- /pgdata/patroni/log/*.log
164+
operators:
165+
- from: body
166+
to: body.original
167+
type: move
168+
storage: file_storage/patroni_logs
169+
service:
170+
extensions:
171+
- file_storage/patroni_logs
172+
pipelines:
173+
logs/patroni:
174+
exporters:
175+
- googlecloud
176+
processors:
177+
- resource/patroni
178+
- transform/patroni_logs
179+
- batch/200ms
180+
- groupbyattrs/compact
181+
receivers:
182+
- filelog/patroni_jsonlog
96183
`)
97184
})
98185
}

internal/collector/pgadmin.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ import (
1111

1212
"github.com/crunchydata/postgres-operator/internal/feature"
1313
"github.com/crunchydata/postgres-operator/internal/naming"
14+
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
1415
)
1516

16-
func EnablePgAdminLogging(ctx context.Context, configmap *corev1.ConfigMap) error {
17+
func EnablePgAdminLogging(ctx context.Context, spec *v1beta1.InstrumentationSpec,
18+
configmap *corev1.ConfigMap,
19+
) error {
1720
if !feature.Enabled(ctx, feature.OpenTelemetryLogs) {
1821
return nil
1922
}
20-
otelConfig := NewConfig()
23+
otelConfig := NewConfig(spec)
2124
otelConfig.Extensions["file_storage/pgadmin"] = map[string]any{
2225
"directory": "/var/log/pgadmin/receiver",
2326
"create_directory": true,
@@ -28,6 +31,7 @@ func EnablePgAdminLogging(ctx context.Context, configmap *corev1.ConfigMap) erro
2831
"create_directory": true,
2932
"fsync": true,
3033
}
34+
3135
otelConfig.Receivers["filelog/pgadmin"] = map[string]any{
3236
"include": []string{"/var/lib/pgadmin/logs/pgadmin.log"},
3337
"storage": "file_storage/pgadmin",
@@ -70,6 +74,15 @@ func EnablePgAdminLogging(ctx context.Context, configmap *corev1.ConfigMap) erro
7074
},
7175
}
7276

77+
// If there are exporters to be added to the logs pipelines defined in
78+
// the spec, add them to the pipeline. Otherwise, add the DebugExporter.
79+
var exporters []ComponentID
80+
if spec != nil && spec.Logs != nil && spec.Logs.Exporters != nil {
81+
exporters = spec.Logs.Exporters
82+
} else {
83+
exporters = []ComponentID{DebugExporter}
84+
}
85+
7386
otelConfig.Pipelines["logs/pgadmin"] = Pipeline{
7487
Extensions: []ComponentID{"file_storage/pgadmin"},
7588
Receivers: []ComponentID{"filelog/pgadmin"},
@@ -79,7 +92,7 @@ func EnablePgAdminLogging(ctx context.Context, configmap *corev1.ConfigMap) erro
7992
SubSecondBatchProcessor,
8093
CompactingProcessor,
8194
},
82-
Exporters: []ComponentID{DebugExporter},
95+
Exporters: exporters,
8396
}
8497

8598
otelConfig.Pipelines["logs/gunicorn"] = Pipeline{
@@ -91,7 +104,7 @@ func EnablePgAdminLogging(ctx context.Context, configmap *corev1.ConfigMap) erro
91104
SubSecondBatchProcessor,
92105
CompactingProcessor,
93106
},
94-
Exporters: []ComponentID{DebugExporter},
107+
Exporters: exporters,
95108
}
96109

97110
otelYAML, err := otelConfig.ToYAML()

0 commit comments

Comments
 (0)