Skip to content

Commit 8f26e19

Browse files
committed
Remove postgres metrics that are specified in the instrumentation API.
1 parent dfa361d commit 8f26e19

File tree

2 files changed

+215
-0
lines changed

2 files changed

+215
-0
lines changed

internal/collector/postgres_metrics.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func EnablePostgresMetrics(ctx context.Context, inCluster *v1beta1.PostgresClust
4242
// We must create a copy of the fiveSecondMetrics variable, otherwise we
4343
// will continually append to it and blow up our ConfigMap
4444
fiveSecondMetricsClone := slices.Clone(fiveSecondMetrics)
45+
fiveMinuteMetricsClone := slices.Clone(fiveMinuteMetrics)
4546

4647
if inCluster.Spec.PostgresVersion >= 17 {
4748
fiveSecondMetricsClone, _ = appendToJSONArray(fiveSecondMetricsClone, gtePG17)
@@ -55,6 +56,46 @@ func EnablePostgresMetrics(ctx context.Context, inCluster *v1beta1.PostgresClust
5556
fiveSecondMetricsClone, _ = appendToJSONArray(fiveSecondMetricsClone, ltPG16)
5657
}
5758

59+
// Remove any queries that user has specified in the spec
60+
if inCluster.Spec.Instrumentation != nil &&
61+
inCluster.Spec.Instrumentation.Metrics != nil &&
62+
inCluster.Spec.Instrumentation.Metrics.CustomQueries != nil &&
63+
inCluster.Spec.Instrumentation.Metrics.CustomQueries.Remove != nil {
64+
// Convert json to array of maps
65+
var fiveSecondMetricsArr []map[string]any
66+
err := json.Unmarshal(fiveSecondMetricsClone, &fiveSecondMetricsArr)
67+
if err != nil {
68+
// TODO: handle this error better
69+
panic(err)
70+
}
71+
// Remove any specified metrics from the five second metrics
72+
fiveSecondMetricsArr = removeMetricsFromQueries(inCluster.Spec.Instrumentation.Metrics.CustomQueries.Remove, fiveSecondMetricsArr)
73+
74+
// Convert json to array of maps
75+
var fiveMinuteMetricsArr []map[string]any
76+
err = json.Unmarshal(fiveMinuteMetricsClone, &fiveMinuteMetricsArr)
77+
if err != nil {
78+
// TODO: handle this error better
79+
panic(err)
80+
}
81+
// Remove any specified metrics from the five minute metrics
82+
fiveMinuteMetricsArr = removeMetricsFromQueries(inCluster.Spec.Instrumentation.Metrics.CustomQueries.Remove, fiveMinuteMetricsArr)
83+
84+
// Convert back to json data
85+
fiveSecondMetricsClone, err = json.Marshal(fiveSecondMetricsArr)
86+
if err != nil {
87+
// TODO: handle this error better
88+
panic(err)
89+
}
90+
91+
// Convert back to json data
92+
fiveMinuteMetricsClone, err = json.Marshal(fiveMinuteMetricsArr)
93+
if err != nil {
94+
// TODO: handle this error better
95+
panic(err)
96+
}
97+
}
98+
5899
// Add Prometheus exporter
59100
config.Exporters[Prometheus] = map[string]any{
60101
"endpoint": "0.0.0.0:" + strconv.Itoa(PrometheusPort),
@@ -136,3 +177,51 @@ func appendToJSONArray(a1, a2 json.RawMessage) (json.RawMessage, error) {
136177

137178
return merged, nil
138179
}
180+
181+
func removeMetricsFromQueries(metricsToRemove []string,
182+
queryMetricsArr []map[string]any,
183+
) []map[string]any {
184+
// Iterate over the metrics that should be removed
185+
Outer:
186+
for _, metricToRemove := range metricsToRemove {
187+
// Iterate over array of query/metrics maps
188+
for j, queryAndMetrics := range queryMetricsArr {
189+
// Assert that the metrics key in the query/metrics map holds an array
190+
if metricsArr, ok := queryAndMetrics["metrics"].([]any); ok {
191+
// Iterate over the metrics array
192+
for k, metric := range metricsArr {
193+
// Assert that element in the metrics array is a map[string]any
194+
if metricMap, ok := metric.(map[string]any); ok {
195+
// Check to see if the metric_name matches the metricToRemove
196+
if metricMap["metric_name"] == metricToRemove {
197+
// Remove the metric. Since there won't ever be any
198+
// duplicates, we will be exiting this loop early and
199+
// therefore don't care about the order of the metrics
200+
// array.
201+
metricsArr[k] = metricsArr[len(metricsArr)-1]
202+
metricsArr = metricsArr[:len(metricsArr)-1]
203+
queryAndMetrics["metrics"] = metricsArr
204+
205+
// If the metrics array is empty, remove the query/metrics
206+
// map entirely. Again, we don't care about order.
207+
if len(metricsArr) == 0 {
208+
queryMetricsArr[j] = queryMetricsArr[len(queryMetricsArr)-1]
209+
queryMetricsArr = queryMetricsArr[:len(queryMetricsArr)-1]
210+
}
211+
212+
// We found and deleted the metric, so we can continue
213+
// to the next iteration of the Outer loop.
214+
continue Outer
215+
}
216+
} else {
217+
fmt.Println("data is not a map[string]any")
218+
}
219+
}
220+
} else {
221+
fmt.Println("data is not a []any")
222+
}
223+
}
224+
}
225+
226+
return queryMetricsArr
227+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package collector
6+
7+
import (
8+
"encoding/json"
9+
"testing"
10+
11+
"gotest.tools/v3/assert"
12+
)
13+
14+
func TestRemoveMetricsFromQueries(t *testing.T) {
15+
// Convert json to map
16+
var fiveMinuteMetricsArr []map[string]any
17+
err := json.Unmarshal(fiveMinuteMetrics, &fiveMinuteMetricsArr)
18+
assert.NilError(t, err)
19+
20+
assert.Equal(t, len(fiveMinuteMetricsArr), 3)
21+
newArr := removeMetricsFromQueries([]string{"ccp_database_size_bytes"}, fiveMinuteMetricsArr)
22+
assert.Equal(t, len(newArr), 2)
23+
24+
t.Run("DeleteOneMetric", func(t *testing.T) {
25+
sqlMetricsData := `[
26+
{
27+
"metrics": [
28+
{
29+
"description": "Count of sequences that have reached greater than or equal to 75% of their max available numbers.\nFunction monitor.sequence_status() can provide more details if run directly on system.\n",
30+
"metric_name": "ccp_sequence_exhaustion_count",
31+
"static_attributes": { "server": "localhost:5432" },
32+
"value_column": "count"
33+
}
34+
],
35+
"sql": "SELECT count(*) AS count FROM (\n SELECT CEIL((s.max_value-min_value::NUMERIC+1)/s.increment_by::NUMERIC) AS slots\n , CEIL((COALESCE(s.last_value,s.min_value)-s.min_value::NUMERIC+1)/s.increment_by::NUMERIC) AS used\n FROM pg_catalog.pg_sequences s\n) x WHERE (ROUND(used/slots*100)::int) \u003e 75;\n"
36+
},
37+
{
38+
"metrics": [
39+
{
40+
"attribute_columns": ["dbname"],
41+
"description": "Number of times disk blocks were found already in the buffer cache, so that a read was not necessary",
42+
"metric_name": "ccp_stat_database_blks_hit",
43+
"static_attributes": { "server": "localhost:5432" },
44+
"value_column": "blks_hit"
45+
},
46+
{
47+
"attribute_columns": ["dbname"],
48+
"description": "Number of disk blocks read in this database",
49+
"metric_name": "ccp_stat_database_blks_read",
50+
"static_attributes": { "server": "localhost:5432" },
51+
"value_column": "blks_read"
52+
}
53+
],
54+
"sql": "SELECT s.datname AS dbname , s.xact_commit , s.xact_rollback , s.blks_read , s.blks_hit , s.tup_returned , s.tup_fetched , s.tup_inserted , s.tup_updated , s.tup_deleted , s.conflicts , s.temp_files , s.temp_bytes , s.deadlocks FROM pg_catalog.pg_stat_database s JOIN pg_catalog.pg_database d ON d.datname = s.datname WHERE d.datistemplate = false;\n"
55+
}
56+
]`
57+
var sqlMetricsArr []map[string]any
58+
err := json.Unmarshal([]byte(sqlMetricsData), &sqlMetricsArr)
59+
assert.NilError(t, err)
60+
61+
assert.Equal(t, len(sqlMetricsArr), 2)
62+
metricsArr, ok := sqlMetricsArr[1]["metrics"].([]any)
63+
assert.Equal(t, ok, true)
64+
assert.Equal(t, len(metricsArr), 2)
65+
66+
refinedSqlMetricsArr := removeMetricsFromQueries([]string{"ccp_stat_database_blks_hit"}, sqlMetricsArr)
67+
assert.Equal(t, len(refinedSqlMetricsArr), 2)
68+
metricsArr, ok = refinedSqlMetricsArr[1]["metrics"].([]any)
69+
assert.Equal(t, ok, true)
70+
assert.Equal(t, len(metricsArr), 1)
71+
remainingMetric, ok := metricsArr[0].(map[string]any)
72+
assert.Equal(t, ok, true)
73+
assert.Equal(t, remainingMetric["metric_name"], "ccp_stat_database_blks_read")
74+
})
75+
76+
t.Run("DeleteQueryMetricSet", func(t *testing.T) {
77+
sqlMetricsData := `[
78+
{
79+
"metrics": [
80+
{
81+
"description": "Count of sequences that have reached greater than or equal to 75% of their max available numbers.\nFunction monitor.sequence_status() can provide more details if run directly on system.\n",
82+
"metric_name": "ccp_sequence_exhaustion_count",
83+
"static_attributes": { "server": "localhost:5432" },
84+
"value_column": "count"
85+
}
86+
],
87+
"sql": "SELECT count(*) AS count FROM (\n SELECT CEIL((s.max_value-min_value::NUMERIC+1)/s.increment_by::NUMERIC) AS slots\n , CEIL((COALESCE(s.last_value,s.min_value)-s.min_value::NUMERIC+1)/s.increment_by::NUMERIC) AS used\n FROM pg_catalog.pg_sequences s\n) x WHERE (ROUND(used/slots*100)::int) \u003e 75;\n"
88+
},
89+
{
90+
"metrics": [
91+
{
92+
"attribute_columns": ["dbname"],
93+
"description": "Number of times disk blocks were found already in the buffer cache, so that a read was not necessary",
94+
"metric_name": "ccp_stat_database_blks_hit",
95+
"static_attributes": { "server": "localhost:5432" },
96+
"value_column": "blks_hit"
97+
},
98+
{
99+
"attribute_columns": ["dbname"],
100+
"description": "Number of disk blocks read in this database",
101+
"metric_name": "ccp_stat_database_blks_read",
102+
"static_attributes": { "server": "localhost:5432" },
103+
"value_column": "blks_read"
104+
}
105+
],
106+
"sql": "SELECT s.datname AS dbname , s.xact_commit , s.xact_rollback , s.blks_read , s.blks_hit , s.tup_returned , s.tup_fetched , s.tup_inserted , s.tup_updated , s.tup_deleted , s.conflicts , s.temp_files , s.temp_bytes , s.deadlocks FROM pg_catalog.pg_stat_database s JOIN pg_catalog.pg_database d ON d.datname = s.datname WHERE d.datistemplate = false;\n"
107+
}
108+
]`
109+
var sqlMetricsArr []map[string]any
110+
err := json.Unmarshal([]byte(sqlMetricsData), &sqlMetricsArr)
111+
assert.NilError(t, err)
112+
113+
assert.Equal(t, len(sqlMetricsArr), 2)
114+
metricsArr, ok := sqlMetricsArr[1]["metrics"].([]any)
115+
assert.Equal(t, ok, true)
116+
assert.Equal(t, len(metricsArr), 2)
117+
118+
refinedSqlMetricsArr := removeMetricsFromQueries([]string{"ccp_stat_database_blks_hit",
119+
"ccp_stat_database_blks_read"}, sqlMetricsArr)
120+
assert.Equal(t, len(refinedSqlMetricsArr), 1)
121+
metricsArr, ok = sqlMetricsArr[0]["metrics"].([]any)
122+
assert.Equal(t, ok, true)
123+
assert.Equal(t, len(metricsArr), 1)
124+
})
125+
126+
}

0 commit comments

Comments
 (0)