Skip to content

Commit 376572c

Browse files
authored
feat: add metrics for updaterun (#25)
1 parent 9987f94 commit 376572c

File tree

8 files changed

+444
-10
lines changed

8 files changed

+444
-10
lines changed

pkg/controllers/updaterun/controller.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"strconv"
2425
"time"
2526

27+
"github.com/prometheus/client_golang/prometheus"
2628
"k8s.io/apimachinery/pkg/api/meta"
2729
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2830
"k8s.io/apimachinery/pkg/types"
@@ -42,6 +44,7 @@ import (
4244
"github.com/kubefleet-dev/kubefleet/pkg/utils"
4345
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
4446
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
47+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller/metrics"
4548
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
4649
)
4750

@@ -95,6 +98,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
9598
return runtime.Result{}, err
9699
}
97100

101+
// Emit the update run status metric based on status conditions in the updateRun.
102+
defer emitUpdateRunStatusMetric(&updateRun)
103+
98104
var updatingStageIndex int
99105
var toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
100106
var err error
@@ -169,13 +175,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
169175
// We delete all the dependent resources, including approvalRequest objects, of the clusterStagedUpdateRun object.
170176
func (r *Reconciler) handleDelete(ctx context.Context, updateRun *placementv1beta1.ClusterStagedUpdateRun) (bool, time.Duration, error) {
171177
runObjRef := klog.KObj(updateRun)
172-
// delete all the associated approvalRequests.
178+
// Delete all the associated approvalRequests.
173179
approvalRequest := &placementv1beta1.ClusterApprovalRequest{}
174180
if err := r.Client.DeleteAllOf(ctx, approvalRequest, client.MatchingLabels{placementv1beta1.TargetUpdateRunLabel: updateRun.GetName()}); err != nil {
175181
klog.ErrorS(err, "Failed to delete all associated approvalRequests", "clusterStagedUpdateRun", runObjRef)
176182
return false, 0, controller.NewAPIServerError(false, err)
177183
}
178184
klog.V(2).InfoS("Deleted all approvalRequests associated with the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)
185+
186+
// Delete the update run status metric.
187+
metrics.FleetUpdateRunStatusLastTimestampSeconds.DeletePartialMatch(prometheus.Labels{"name": updateRun.GetName()})
188+
179189
controllerutil.RemoveFinalizer(updateRun, placementv1beta1.ClusterStagedUpdateRunFinalizer)
180190
if err := r.Client.Update(ctx, updateRun); err != nil {
181191
klog.ErrorS(err, "Failed to remove updateRun finalizer", "clusterStagedUpdateRun", runObjRef)
@@ -301,3 +311,33 @@ func handleClusterApprovalRequest(oldObj, newObj client.Object, q workqueue.Type
301311
NamespacedName: types.NamespacedName{Name: updateRun},
302312
})
303313
}
314+
315+
// emitUpdateRunStatusMetric emits the update run status metric based on status conditions in the updateRun.
316+
func emitUpdateRunStatusMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
317+
generation := updateRun.Generation
318+
genStr := strconv.FormatInt(generation, 10)
319+
320+
succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
321+
if succeedCond != nil && succeedCond.ObservedGeneration == generation {
322+
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
323+
string(placementv1beta1.StagedUpdateRunConditionSucceeded), string(succeedCond.Status), succeedCond.Reason).SetToCurrentTime()
324+
return
325+
}
326+
327+
progressingCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
328+
if progressingCond != nil && progressingCond.ObservedGeneration == generation {
329+
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
330+
string(placementv1beta1.StagedUpdateRunConditionProgressing), string(progressingCond.Status), progressingCond.Reason).SetToCurrentTime()
331+
return
332+
}
333+
334+
initializedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
335+
if initializedCond != nil && initializedCond.ObservedGeneration == generation {
336+
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
337+
string(placementv1beta1.StagedUpdateRunConditionInitialized), string(initializedCond.Status), initializedCond.Reason).SetToCurrentTime()
338+
return
339+
}
340+
341+
// We should rarely reach here, it can only happen when updating updateRun status fails.
342+
klog.V(2).InfoS("There's no valid status condition on updateRun, status updating failed possibly", "updateRun", klog.KObj(updateRun))
343+
}

pkg/controllers/updaterun/controller_integration_test.go

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import (
2222
"strconv"
2323
"time"
2424

25+
"github.com/google/go-cmp/cmp"
2526
. "github.com/onsi/ginkgo/v2"
2627
. "github.com/onsi/gomega"
28+
"github.com/prometheus/client_golang/prometheus"
29+
prometheusclientmodel "github.com/prometheus/client_model/go"
2730

2831
rbacv1 "k8s.io/api/rbac/v1"
2932
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -32,6 +35,7 @@ import (
3235
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3336
"k8s.io/apimachinery/pkg/runtime"
3437
"k8s.io/apimachinery/pkg/types"
38+
"k8s.io/utils/ptr"
3539
"sigs.k8s.io/controller-runtime/pkg/client"
3640
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3741

@@ -40,6 +44,8 @@ import (
4044
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
4145
"github.com/kubefleet-dev/kubefleet/pkg/utils"
4246
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
47+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller/metrics"
48+
metricsutils "github.com/kubefleet-dev/kubefleet/test/utils/metrics"
4349
)
4450

4551
const (
@@ -69,6 +75,7 @@ var (
6975
testCROName string
7076
updateRunNamespacedName types.NamespacedName
7177
testNamespace []byte
78+
customRegistry *prometheus.Registry
7279
)
7380

7481
var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
@@ -80,6 +87,15 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
8087
testUpdateStrategyName = "updatestrategy-" + utils.RandStr()
8188
testCROName = "cro-" + utils.RandStr()
8289
updateRunNamespacedName = types.NamespacedName{Name: testUpdateRunName}
90+
91+
customRegistry = initializeUpdateRunMetricsRegistry()
92+
})
93+
94+
AfterEach(func() {
95+
By("Checking the update run status metrics are removed")
96+
// No metrics are emitted as all are removed after updateRun is deleted.
97+
validateUpdateRunMetricsEmitted(customRegistry)
98+
unregisterUpdateRunMetrics(customRegistry)
8399
})
84100

85101
Context("Test reconciling a clusterStagedUpdateRun", func() {
@@ -223,6 +239,114 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
223239
})
224240
})
225241

242+
func initializeUpdateRunMetricsRegistry() *prometheus.Registry {
243+
// Create a test registry
244+
customRegistry := prometheus.NewRegistry()
245+
Expect(customRegistry.Register(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(Succeed())
246+
// Reset metrics before each test
247+
metrics.FleetUpdateRunStatusLastTimestampSeconds.Reset()
248+
return customRegistry
249+
}
250+
251+
func unregisterUpdateRunMetrics(registry *prometheus.Registry) {
252+
Expect(registry.Unregister(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(BeTrue())
253+
}
254+
255+
// validateUpdateRunMetricsEmitted validates the update run status metrics are emitted and are emitted in the correct order.
256+
func validateUpdateRunMetricsEmitted(registry *prometheus.Registry, wantMetrics ...*prometheusclientmodel.Metric) {
257+
Eventually(func() error {
258+
metricFamilies, err := registry.Gather()
259+
if err != nil {
260+
return fmt.Errorf("failed to gather metrics: %w", err)
261+
}
262+
var gotMetrics []*prometheusclientmodel.Metric
263+
for _, mf := range metricFamilies {
264+
if mf.GetName() == "fleet_workload_update_run_status_last_timestamp_seconds" {
265+
gotMetrics = mf.GetMetric()
266+
}
267+
}
268+
269+
if diff := cmp.Diff(gotMetrics, wantMetrics, metricsutils.MetricsCmpOptions...); diff != "" {
270+
return fmt.Errorf("update run status metrics mismatch (-got, +want):\n%s", diff)
271+
}
272+
273+
return nil
274+
}, timeout, interval).Should(Succeed(), "failed to validate the update run status metrics")
275+
}
276+
277+
func generateMetricsLabels(
278+
updateRun *placementv1beta1.ClusterStagedUpdateRun,
279+
condition, status, reason string,
280+
) []*prometheusclientmodel.LabelPair {
281+
return []*prometheusclientmodel.LabelPair{
282+
{Name: ptr.To("name"), Value: &updateRun.Name},
283+
{Name: ptr.To("generation"), Value: ptr.To(strconv.FormatInt(updateRun.Generation, 10))},
284+
{Name: ptr.To("condition"), Value: ptr.To(condition)},
285+
{Name: ptr.To("status"), Value: ptr.To(status)},
286+
{Name: ptr.To("reason"), Value: ptr.To(reason)},
287+
}
288+
}
289+
290+
func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
291+
return &prometheusclientmodel.Metric{
292+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
293+
string(metav1.ConditionFalse), condition.UpdateRunInitializeFailedReason),
294+
Gauge: &prometheusclientmodel.Gauge{
295+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
296+
},
297+
}
298+
}
299+
300+
func generateProgressingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
301+
return &prometheusclientmodel.Metric{
302+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
303+
string(metav1.ConditionTrue), condition.UpdateRunProgressingReason),
304+
Gauge: &prometheusclientmodel.Gauge{
305+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
306+
},
307+
}
308+
}
309+
310+
func generateWaitingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
311+
return &prometheusclientmodel.Metric{
312+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
313+
string(metav1.ConditionFalse), condition.UpdateRunWaitingReason),
314+
Gauge: &prometheusclientmodel.Gauge{
315+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
316+
},
317+
}
318+
}
319+
320+
func generateStuckMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
321+
return &prometheusclientmodel.Metric{
322+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
323+
string(metav1.ConditionFalse), condition.UpdateRunStuckReason),
324+
Gauge: &prometheusclientmodel.Gauge{
325+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
326+
},
327+
}
328+
}
329+
330+
func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
331+
return &prometheusclientmodel.Metric{
332+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
333+
string(metav1.ConditionFalse), condition.UpdateRunFailedReason),
334+
Gauge: &prometheusclientmodel.Gauge{
335+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
336+
},
337+
}
338+
}
339+
340+
func generateSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
341+
return &prometheusclientmodel.Metric{
342+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
343+
string(metav1.ConditionTrue), condition.UpdateRunSucceededReason),
344+
Gauge: &prometheusclientmodel.Gauge{
345+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
346+
},
347+
}
348+
}
349+
226350
func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateRun {
227351
return &placementv1beta1.ClusterStagedUpdateRun{
228352
ObjectMeta: metav1.ObjectMeta{
@@ -503,7 +627,7 @@ func generateTrueCondition(obj client.Object, condType any) metav1.Condition {
503627
case placementv1beta1.StagedUpdateRunConditionInitialized:
504628
reason = condition.UpdateRunInitializeSucceededReason
505629
case placementv1beta1.StagedUpdateRunConditionProgressing:
506-
reason = condition.UpdateRunStartedReason
630+
reason = condition.UpdateRunProgressingReason
507631
case placementv1beta1.StagedUpdateRunConditionSucceeded:
508632
reason = condition.UpdateRunSucceededReason
509633
}
@@ -564,6 +688,8 @@ func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
564688
reason = condition.UpdateRunInitializeFailedReason
565689
case placementv1beta1.StagedUpdateRunConditionSucceeded:
566690
reason = condition.UpdateRunFailedReason
691+
case placementv1beta1.StagedUpdateRunConditionProgressing:
692+
reason = condition.UpdateRunWaitingReason
567693
}
568694
typeStr = string(cond)
569695
case placementv1beta1.StageUpdatingConditionType:

pkg/controllers/updaterun/execution.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ var (
4444
// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
4545
// Put it as a variable for convenient testing.
4646
stageUpdatingWaitTime = 60 * time.Second
47+
48+
// updateRunStuckThreshold is the time to wait on a single cluster update before marking update run as stuck.
49+
// TODO(wantjian): make this configurable
50+
updateRunStuckThreshold = 5 * time.Minute
4751
)
4852

4953
// execute executes the update run by updating the clusters in the updating stage specified by updatingStageIndex.
@@ -55,8 +59,10 @@ func (r *Reconciler) execute(
5559
updatingStageIndex int,
5660
toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
5761
) (bool, time.Duration, error) {
58-
// Mark the update run as started regardless of whether it's already marked.
59-
markUpdateRunStarted(updateRun)
62+
// Mark updateRun as progressing if it's not already marked as waiting or stuck.
63+
// This avoids triggering an unnecessary in-memory transition from stuck (waiting) -> progressing -> stuck (waiting),
64+
// which would update the lastTransitionTime even though the status hasn't effectively changed.
65+
markUpdateRunProgressingIfNotWaitingOrStuck(updateRun)
6066

6167
if updatingStageIndex < len(updateRun.Status.StagesStatus) {
6268
updatingStage := &updateRun.Status.StagesStatus[updatingStageIndex]
@@ -172,17 +178,27 @@ func (r *Reconciler) executeUpdatingStage(
172178
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
173179
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
174180
}
181+
175182
finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
176183
if finished {
177184
finishedClusterCount++
185+
markUpdateRunProgressing(updateRun)
178186
continue
187+
} else {
188+
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
189+
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
190+
if timeElapsed > updateRunStuckThreshold {
191+
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
192+
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
193+
}
179194
}
180195
// No need to continue as we only support one cluster updating at a time for now.
181196
return clusterUpdatingWaitTime, updateErr
182197
}
183198

184199
if finishedClusterCount == len(updatingStageStatus.Clusters) {
185200
// All the clusters in the stage have been updated.
201+
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
186202
markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation)
187203
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
188204
// Check if the after stage tasks are ready.
@@ -191,6 +207,7 @@ func (r *Reconciler) executeUpdatingStage(
191207
return 0, err
192208
}
193209
if approved {
210+
markUpdateRunProgressing(updateRun)
194211
markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation)
195212
// No need to wait to get to the next stage.
196213
return 0, nil
@@ -448,14 +465,47 @@ func checkClusterUpdateResult(
448465
return false, nil
449466
}
450467

451-
// markUpdateRunStarted marks the update run as started in memory.
452-
func markUpdateRunStarted(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
468+
// markUpdateRunProgressing marks the update run as progressing in memory.
469+
func markUpdateRunProgressing(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
453470
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
454471
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
455472
Status: metav1.ConditionTrue,
456473
ObservedGeneration: updateRun.Generation,
457-
Reason: condition.UpdateRunStartedReason,
458-
Message: "The stages started updating",
474+
Reason: condition.UpdateRunProgressingReason,
475+
Message: "The update run is making progress",
476+
})
477+
}
478+
479+
// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as proegressing in memory if it's not marked as waiting or stuck already.
480+
func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
481+
progressingCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
482+
if condition.IsConditionStatusFalse(progressingCond, updateRun.Generation) &&
483+
(progressingCond.Reason == condition.UpdateRunWaitingReason || progressingCond.Reason == condition.UpdateRunStuckReason) {
484+
// The updateRun is waiting or stuck, no need to mark it as started.
485+
return
486+
}
487+
markUpdateRunProgressing(updateRun)
488+
}
489+
490+
// markUpdateRunStuck marks the updateRun as stuck in memory.
491+
func markUpdateRunStuck(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName, clusterName string) {
492+
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
493+
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
494+
Status: metav1.ConditionFalse,
495+
ObservedGeneration: updateRun.Generation,
496+
Reason: condition.UpdateRunStuckReason,
497+
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check crp status for potential errors", clusterName, stageName),
498+
})
499+
}
500+
501+
// markUpdateRunWaiting marks the updateRun as waiting in memory.
502+
func markUpdateRunWaiting(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName string) {
503+
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
504+
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
505+
Status: metav1.ConditionFalse,
506+
ObservedGeneration: updateRun.Generation,
507+
Reason: condition.UpdateRunWaitingReason,
508+
Message: fmt.Sprintf("The updateRun is waiting for after-stage tasks in stage %s to complete", stageName),
459509
})
460510
}
461511

0 commit comments

Comments
 (0)