Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/kubefleet-dev/kubefleet/pkg/utils"
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller/metrics"
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
)

Expand Down Expand Up @@ -95,6 +98,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, err
}

// Emit the update run status metric based on status conditions in the updateRun.
defer emitUpdateRunStatusMetric(&updateRun)

var updatingStageIndex int
var toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
var err error
Expand Down Expand Up @@ -169,13 +175,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// We delete all the dependent resources, including approvalRequest objects, of the clusterStagedUpdateRun object.
func (r *Reconciler) handleDelete(ctx context.Context, updateRun *placementv1beta1.ClusterStagedUpdateRun) (bool, time.Duration, error) {
runObjRef := klog.KObj(updateRun)
// delete all the associated approvalRequests.
// Delete all the associated approvalRequests.
approvalRequest := &placementv1beta1.ClusterApprovalRequest{}
if err := r.Client.DeleteAllOf(ctx, approvalRequest, client.MatchingLabels{placementv1beta1.TargetUpdateRunLabel: updateRun.GetName()}); err != nil {
klog.ErrorS(err, "Failed to delete all associated approvalRequests", "clusterStagedUpdateRun", runObjRef)
return false, 0, controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Deleted all approvalRequests associated with the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)

// Delete the update run status metric.
metrics.FleetUpdateRunStatusLastTimestampSeconds.DeletePartialMatch(prometheus.Labels{"name": updateRun.GetName()})

controllerutil.RemoveFinalizer(updateRun, placementv1beta1.ClusterStagedUpdateRunFinalizer)
if err := r.Client.Update(ctx, updateRun); err != nil {
klog.ErrorS(err, "Failed to remove updateRun finalizer", "clusterStagedUpdateRun", runObjRef)
Expand Down Expand Up @@ -301,3 +311,33 @@ func handleClusterApprovalRequest(oldObj, newObj client.Object, q workqueue.Type
NamespacedName: types.NamespacedName{Name: updateRun},
})
}

// emitUpdateRunStatusMetric emits the update run status metric based on status conditions in the updateRun.
func emitUpdateRunStatusMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
generation := updateRun.Generation
genStr := strconv.FormatInt(generation, 10)

succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
if succeedCond != nil && succeedCond.ObservedGeneration == generation {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionSucceeded), string(succeedCond.Status), succeedCond.Reason).SetToCurrentTime()
return
}

progressingCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
if progressingCond != nil && progressingCond.ObservedGeneration == generation {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionProgressing), string(progressingCond.Status), progressingCond.Reason).SetToCurrentTime()
return
}

initializedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if initializedCond != nil && initializedCond.ObservedGeneration == generation {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionInitialized), string(initializedCond.Status), initializedCond.Reason).SetToCurrentTime()
return
}

// We should rarely reach here, it can only happen when updating updateRun status fails.
klog.V(2).InfoS("There's no valid status condition on updateRun, status updating failed possibly", "updateRun", klog.KObj(updateRun))
}
128 changes: 127 additions & 1 deletion pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"strconv"
"time"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
prometheusclientmodel "github.com/prometheus/client_model/go"

rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -32,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -40,6 +44,8 @@ import (
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
"github.com/kubefleet-dev/kubefleet/pkg/utils"
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller/metrics"
metricsutils "github.com/kubefleet-dev/kubefleet/test/utils/metrics"
)

const (
Expand Down Expand Up @@ -69,6 +75,7 @@ var (
testCROName string
updateRunNamespacedName types.NamespacedName
testNamespace []byte
customRegistry *prometheus.Registry
)

var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
Expand All @@ -80,6 +87,15 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
testUpdateStrategyName = "updatestrategy-" + utils.RandStr()
testCROName = "cro-" + utils.RandStr()
updateRunNamespacedName = types.NamespacedName{Name: testUpdateRunName}

customRegistry = initializeUpdateRunMetricsRegistry()
})

AfterEach(func() {
By("Checking the update run status metrics are removed")
// No metrics are emitted as all are removed after updateRun is deleted.
validateUpdateRunMetricsEmitted(customRegistry)
unregisterUpdateRunMetrics(customRegistry)
})

Context("Test reconciling a clusterStagedUpdateRun", func() {
Expand Down Expand Up @@ -223,6 +239,114 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
})
})

func initializeUpdateRunMetricsRegistry() *prometheus.Registry {
// Create a test registry
customRegistry := prometheus.NewRegistry()
Expect(customRegistry.Register(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(Succeed())
// Reset metrics before each test
metrics.FleetUpdateRunStatusLastTimestampSeconds.Reset()
return customRegistry
}

func unregisterUpdateRunMetrics(registry *prometheus.Registry) {
Expect(registry.Unregister(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(BeTrue())
}

// validateUpdateRunMetricsEmitted validates the update run status metrics are emitted and are emitted in the correct order.
func validateUpdateRunMetricsEmitted(registry *prometheus.Registry, wantMetrics ...*prometheusclientmodel.Metric) {
Eventually(func() error {
metricFamilies, err := registry.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %w", err)
}
var gotMetrics []*prometheusclientmodel.Metric
for _, mf := range metricFamilies {
if mf.GetName() == "fleet_workload_update_run_status_last_timestamp_seconds" {
gotMetrics = mf.GetMetric()
}
}

if diff := cmp.Diff(gotMetrics, wantMetrics, metricsutils.MetricsCmpOptions...); diff != "" {
return fmt.Errorf("update run status metrics mismatch (-got, +want):\n%s", diff)
}

return nil
}, timeout, interval).Should(Succeed(), "failed to validate the update run status metrics")
}

func generateMetricsLabels(
updateRun *placementv1beta1.ClusterStagedUpdateRun,
condition, status, reason string,
) []*prometheusclientmodel.LabelPair {
return []*prometheusclientmodel.LabelPair{
{Name: ptr.To("name"), Value: &updateRun.Name},
{Name: ptr.To("generation"), Value: ptr.To(strconv.FormatInt(updateRun.Generation, 10))},
{Name: ptr.To("condition"), Value: ptr.To(condition)},
{Name: ptr.To("status"), Value: ptr.To(status)},
{Name: ptr.To("reason"), Value: ptr.To(reason)},
}
}

func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
string(metav1.ConditionFalse), condition.UpdateRunInitializeFailedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateProgressingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionTrue), condition.UpdateRunProgressingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateWaitingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionFalse), condition.UpdateRunWaitingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateStuckMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionFalse), condition.UpdateRunStuckReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
string(metav1.ConditionFalse), condition.UpdateRunFailedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
string(metav1.ConditionTrue), condition.UpdateRunSucceededReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateRun {
return &placementv1beta1.ClusterStagedUpdateRun{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -503,7 +627,7 @@ func generateTrueCondition(obj client.Object, condType any) metav1.Condition {
case placementv1beta1.StagedUpdateRunConditionInitialized:
reason = condition.UpdateRunInitializeSucceededReason
case placementv1beta1.StagedUpdateRunConditionProgressing:
reason = condition.UpdateRunStartedReason
reason = condition.UpdateRunProgressingReason
case placementv1beta1.StagedUpdateRunConditionSucceeded:
reason = condition.UpdateRunSucceededReason
}
Expand Down Expand Up @@ -564,6 +688,8 @@ func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
reason = condition.UpdateRunInitializeFailedReason
case placementv1beta1.StagedUpdateRunConditionSucceeded:
reason = condition.UpdateRunFailedReason
case placementv1beta1.StagedUpdateRunConditionProgressing:
reason = condition.UpdateRunWaitingReason
}
typeStr = string(cond)
case placementv1beta1.StageUpdatingConditionType:
Expand Down
62 changes: 56 additions & 6 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var (
// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
// Put it as a variable for convenient testing.
stageUpdatingWaitTime = 60 * time.Second

// updateRunStuckThreshold is the time to wait on a single cluster update before marking update run as stuck.
// TODO(wantjian): make this configurable
updateRunStuckThreshold = 5 * time.Minute
)

// execute executes the update run by updating the clusters in the updating stage specified by updatingStageIndex.
Expand All @@ -55,8 +59,10 @@ func (r *Reconciler) execute(
updatingStageIndex int,
toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
) (bool, time.Duration, error) {
// Mark the update run as started regardless of whether it's already marked.
markUpdateRunStarted(updateRun)
// Mark updateRun as progressing if it's not already marked as waiting or stuck.
// This avoids triggering an unnecessary in-memory transition from stuck (waiting) -> progressing -> stuck (waiting),
// which would update the lastTransitionTime even though the status hasn't effectively changed.
markUpdateRunProgressingIfNotWaitingOrStuck(updateRun)

if updatingStageIndex < len(updateRun.Status.StagesStatus) {
updatingStage := &updateRun.Status.StagesStatus[updatingStageIndex]
Expand Down Expand Up @@ -172,17 +178,27 @@ func (r *Reconciler) executeUpdatingStage(
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}

finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if finished {
finishedClusterCount++
markUpdateRunProgressing(updateRun)
continue
} else {
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
}

if finishedClusterCount == len(updatingStageStatus.Clusters) {
// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation)
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
// Check if the after stage tasks are ready.
Expand All @@ -191,6 +207,7 @@ func (r *Reconciler) executeUpdatingStage(
return 0, err
}
if approved {
markUpdateRunProgressing(updateRun)
markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation)
// No need to wait to get to the next stage.
return 0, nil
Expand Down Expand Up @@ -448,14 +465,47 @@ func checkClusterUpdateResult(
return false, nil
}

// markUpdateRunStarted marks the update run as started in memory.
func markUpdateRunStarted(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
// markUpdateRunProgressing marks the update run as progressing in memory.
func markUpdateRunProgressing(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionTrue,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunStartedReason,
Message: "The stages started updating",
Reason: condition.UpdateRunProgressingReason,
Message: "The update run is making progress",
})
}

// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as proegressing in memory if it's not marked as waiting or stuck already.
func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
progressingCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
if condition.IsConditionStatusFalse(progressingCond, updateRun.Generation) &&
(progressingCond.Reason == condition.UpdateRunWaitingReason || progressingCond.Reason == condition.UpdateRunStuckReason) {
// The updateRun is waiting or stuck, no need to mark it as started.
return
}
markUpdateRunProgressing(updateRun)
}

// markUpdateRunStuck marks the updateRun as stuck in memory.
func markUpdateRunStuck(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName, clusterName string) {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunStuckReason,
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check crp status for potential errors", clusterName, stageName),
})
}

// markUpdateRunWaiting marks the updateRun as waiting in memory.
func markUpdateRunWaiting(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName string) {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunWaitingReason,
Message: fmt.Sprintf("The updateRun is waiting for after-stage tasks in stage %s to complete", stageName),
})
}

Expand Down
Loading
Loading