diff --git a/pkg/controllers/updaterun/controller.go b/pkg/controllers/updaterun/controller.go index efd1ced2c..46722f437 100644 --- a/pkg/controllers/updaterun/controller.go +++ b/pkg/controllers/updaterun/controller.go @@ -105,10 +105,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim defer emitUpdateRunStatusMetric(updateRun) state := updateRun.GetUpdateRunSpec().State + updateRunStatus := updateRun.GetUpdateRunStatus() + if state == placementv1beta1.StateAbandoned { + succeedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded)) + if succeedCond != nil && succeedCond.Reason == condition.UpdateRunAbandonedReason { + // Terminal state reached as updateRun cannot be restarted after being abandoned. + klog.V(2).InfoS("The updateRun is abandoned, terminating", "state", state, "updateRun", runObjRef) + return runtime.Result{}, nil + } + } var updatingStageIndex int var toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj - updateRunStatus := updateRun.GetUpdateRunStatus() initCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized)) // Check if initialized regardless of generation. // The updateRun spec fields are immutable except for the state field. When the state changes, @@ -158,35 +166,60 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun) } - // Execute the updateRun. - if state == placementv1beta1.StateExecuted { - klog.V(2).InfoS("Continue to execute the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef) + switch state { + case placementv1beta1.StateInitialized: + klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef) + return runtime.Result{}, nil + case placementv1beta1.StateExecuted: + // Execute the updateRun. + klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef) finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings) if errors.Is(execErr, errStagedUpdatedAborted) { // errStagedUpdatedAborted cannot be retried. return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error()) } - if finished { klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef) return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun) } - - // The execution is not finished yet or it encounters a retriable error. - // We need to record the status and requeue. - if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil { - return runtime.Result{}, updateErr + return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, execErr, state, runObjRef) + case placementv1beta1.StateAbandoned: + // Abandon the updateRun. + klog.V(2).InfoS("Abandoning the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef) + finished, waitTime, execErr := r.abandon(updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings) + if errors.Is(execErr, errStagedUpdatedAborted) { + // errStagedUpdatedAborted cannot be retried. + return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error()) } - klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef) - if execErr != nil { - return runtime.Result{}, execErr + if finished { + klog.V(2).InfoS("The updateRun is abandoned", "updateRun", runObjRef) + return runtime.Result{}, r.recordUpdateRunAbandoned(ctx, updateRun) } - return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil + return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, execErr, state, runObjRef) } - klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef) return runtime.Result{}, nil } +func (r *Reconciler) handleIncompleteUpdateRun(ctx context.Context, updateRun placementv1beta1.UpdateRunObj, waitTime time.Duration, execErr error, state placementv1beta1.State, runObjRef klog.ObjectRef) (runtime.Result, error) { + // The execution is not finished yet or it encounters a retriable error. + // We need to record the status and requeue. + if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil { + return runtime.Result{}, updateErr + } + + switch state { + case placementv1beta1.StateExecuted: + klog.V(2).InfoS("The updateRun is not finished yet", "state", state, "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef) + case placementv1beta1.StateAbandoned: + klog.V(2).InfoS("The updateRun is not finished abandoning yet", "state", state, "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef) + } + + if execErr != nil { + return runtime.Result{}, execErr + } + return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil +} + // handleDelete handles the deletion of the updateRun object. // We delete all the dependent resources, including approvalRequest objects, of the updateRun object. func (r *Reconciler) handleDelete(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) (bool, time.Duration, error) { @@ -277,6 +310,32 @@ func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun placem return nil } +// recordUpdateRunAbandoned records the succeeded and progressing condition as abandoned in the updateRun status. +func (r *Reconciler) recordUpdateRunAbandoned(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error { + updateRunStatus := updateRun.GetUpdateRunStatus() + meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{ + Type: string(placementv1beta1.StagedUpdateRunConditionProgressing), + Status: metav1.ConditionFalse, + ObservedGeneration: updateRun.GetGeneration(), + Reason: condition.UpdateRunAbandonedReason, + Message: "The update run has been abandoned", + }) + meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{ + Type: string(placementv1beta1.StagedUpdateRunConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: updateRun.GetGeneration(), + Reason: condition.UpdateRunAbandonedReason, + Message: "The update run has been abandoned", + }) + + if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { + klog.ErrorS(updateErr, "Failed to update the updateRun status as abandoned", "updateRun", klog.KObj(updateRun)) + // updateErr can be retried. + return controller.NewUpdateIgnoreConflictError(updateErr) + } + return nil +} + // recordUpdateRunStatus records the updateRun status. func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error { if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { diff --git a/pkg/controllers/updaterun/controller_integration_test.go b/pkg/controllers/updaterun/controller_integration_test.go index e017d170e..e6da46896 100644 --- a/pkg/controllers/updaterun/controller_integration_test.go +++ b/pkg/controllers/updaterun/controller_integration_test.go @@ -312,6 +312,26 @@ func generateWaitingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) * } } +func generateAbandoningMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric { + return &prometheusclientmodel.Metric{ + Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing), + string(metav1.ConditionFalse), condition.UpdateRunAbandoningReason), + Gauge: &prometheusclientmodel.Gauge{ + Value: ptr.To(float64(time.Now().UnixNano()) / 1e9), + }, + } +} + +func generateAbandonedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric { + return &prometheusclientmodel.Metric{ + Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded), + string(metav1.ConditionFalse), condition.UpdateRunAbandonedReason), + Gauge: &prometheusclientmodel.Gauge{ + Value: ptr.To(float64(time.Now().UnixNano()) / 1e9), + }, + } +} + func generateStuckMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric { return &prometheusclientmodel.Metric{ Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing), @@ -823,3 +843,9 @@ func generateFalseProgressingCondition(obj client.Object, condType any, reason s falseCond.Reason = reason return falseCond } + +func generateFalseSucceededCondition(obj client.Object, condType any, reason string) metav1.Condition { + falseCond := generateFalseCondition(obj, condType) + falseCond.Reason = reason + return falseCond +} diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 1180ae34a..69543c212 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -564,8 +564,14 @@ func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName if len(stuckClusterNames) > 0 { markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) } else { - // If there is no stuck cluster but some progress has been made, mark the update run as progressing. - markUpdateRunProgressing(updateRun) + switch updateRun.GetUpdateRunSpec().State { + case placementv1beta1.StateAbandoned: + // If the update run is being abandoned, mark it as abandoning. + markUpdateRunAbandoning(updateRun) + default: + // If there is no stuck cluster but some progress has been made, mark the update run as progressing. + markUpdateRunProgressing(updateRun) + } } } @@ -669,7 +675,7 @@ func markUpdateRunProgressing(updateRun placementv1beta1.UpdateRunObj) { }) } -// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as proegressing in memory if it's not marked as waiting or stuck already. +// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as progressing in memory if it's not marked as waiting or stuck already. func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.UpdateRunObj) { updateRunStatus := updateRun.GetUpdateRunStatus() progressingCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing)) diff --git a/pkg/controllers/updaterun/execution_integration_test.go b/pkg/controllers/updaterun/execution_integration_test.go index 3d2af3627..f3cc27d23 100644 --- a/pkg/controllers/updaterun/execution_integration_test.go +++ b/pkg/controllers/updaterun/execution_integration_test.go @@ -702,6 +702,123 @@ var _ = Describe("UpdateRun execution tests - double stages", func() { validateUpdateRunMetricsEmitted(generateWaitingMetric(updateRun), generateProgressingMetric(updateRun), generateStuckMetric(updateRun), generateFailedMetric(updateRun)) }) }) + + Context("Cluster staged update run should finish current updating clusters when abandoned", Ordered, func() { + var wantApprovalRequest *placementv1beta1.ClusterApprovalRequest + var wantMetrics []*promclient.Metric + BeforeAll(func() { + By("Creating a new clusterStagedUpdateRun") + updateRun.Spec.State = placementv1beta1.StateExecuted + Expect(k8sClient.Create(ctx, updateRun)).To(Succeed()) + + By("Validating the initialization succeeded and the execution has not started") + initialized := generateSucceededInitializationStatus(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy, clusterResourceOverride) + wantStatus = generateExecutionNotStartedStatus(updateRun, initialized) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Validating the first beforeStage approvalRequest has been created") + wantApprovalRequest = &placementv1beta1.ClusterApprovalRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: updateRun.Status.StagesStatus[0].BeforeStageTaskStatus[0].ApprovalRequestName, + Labels: map[string]string{ + placementv1beta1.TargetUpdatingStageNameLabel: updateRun.Status.StagesStatus[0].StageName, + placementv1beta1.TargetUpdateRunLabel: updateRun.Name, + placementv1beta1.IsLatestUpdateRunApprovalLabel: "true", + }, + }, + Spec: placementv1beta1.ApprovalRequestSpec{ + TargetUpdateRun: updateRun.Name, + TargetStage: updateRun.Status.StagesStatus[0].StageName, + }, + } + validateApprovalRequestCreated(wantApprovalRequest) + + By("Checking update run status metrics are emitted") + wantMetrics = []*promclient.Metric{generateWaitingMetric(updateRun)} + validateUpdateRunMetricsEmitted(wantMetrics...) + }) + + It("Should accept the approval request and start to rollout 1st stage", func() { + By("Approving the approvalRequest") + approveClusterApprovalRequest(ctx, wantApprovalRequest.Name) + + By("Validating the approvalRequest has ApprovalAccepted status") + Eventually(func() (bool, error) { + var approvalRequest placementv1beta1.ClusterApprovalRequest + if err := k8sClient.Get(ctx, types.NamespacedName{Name: wantApprovalRequest.Name}, &approvalRequest); err != nil { + return false, err + } + return condition.IsConditionStatusTrue(meta.FindStatusCondition(approvalRequest.Status.Conditions, string(placementv1beta1.ApprovalRequestConditionApprovalAccepted)), approvalRequest.Generation), nil + }, timeout, interval).Should(BeTrue(), "failed to validate the approvalRequest approval accepted") + // Approval task has been approved. + wantStatus.StagesStatus[0].BeforeStageTaskStatus[0].Conditions = append(wantStatus.StagesStatus[0].BeforeStageTaskStatus[0].Conditions, + generateTrueCondition(updateRun, placementv1beta1.StageTaskConditionApprovalRequestApproved)) + }) + + It("Should mark the 1st cluster in the 1st stage as succeeded after marking the binding available", func() { + By("Validating the 1st clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-1] // cluster-9 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 1st clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + // 1st stage started. + wantStatus = generateExecutionStartedStatus(updateRun, wantStatus) + + By("Validating the 1st cluster has succeeded and 2nd cluster has started") + wantStatus.StagesStatus[0].Clusters[0].Conditions = append(wantStatus.StagesStatus[0].Clusters[0].Conditions, generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Clusters[1].Conditions = append(wantStatus.StagesStatus[0].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Validating the 1st stage has startTime set") + Expect(updateRun.Status.StagesStatus[0].StartTime).ShouldNot(BeNil()) + + By("Checking update run status metrics are emitted") + wantMetrics = append(wantMetrics, generateProgressingMetric(updateRun)) + validateUpdateRunMetricsEmitted(wantMetrics...) + }) + + It("Should start abandoning the update run when state is Abandon", func() { + By("Updating updateRun state to Abandon") + updateRun.Spec.State = placementv1beta1.StateAbandoned + Expect(k8sClient.Update(ctx, updateRun)).Should(Succeed(), "failed to update the updateRun state") + + By("Validating update run is abandoning") + // Mark updateRun progressing condition as false with abandoning reason. + meta.SetStatusCondition(&wantStatus.StagesStatus[0].Conditions, generateFalseProgressingCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing, condition.StageUpdatingAbandoningReason)) + meta.SetStatusCondition(&wantStatus.Conditions, generateFalseProgressingCondition(updateRun, placementv1beta1.StagedUpdateRunConditionProgressing, condition.UpdateRunAbandoningReason)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Checking update run status metrics are emitted") + wantMetrics = append(wantMetrics, generateAbandoningMetric(updateRun)) + validateUpdateRunMetricsEmitted(wantMetrics...) + }) + + It("Should mark the 2nd cluster in the 1st stage as succeeded after abandoning the updateRun", func() { + By("Validating the 2nd clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-3] // cluster-7 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 2nd clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 2nd cluster has succeeded") + // Mark 2nd cluster as succeeded. + wantStatus.StagesStatus[0].Clusters[1].Conditions = append(wantStatus.StagesStatus[0].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1beta1.ClusterUpdatingConditionSucceeded)) + // Mark updateRun as abandoned. + meta.SetStatusCondition(&wantStatus.StagesStatus[0].Conditions, generateFalseProgressingCondition(updateRun, placementv1beta1.StageUpdatingConditionProgressing, condition.StageUpdatingAbandonedReason)) + meta.SetStatusCondition(&wantStatus.Conditions, generateFalseProgressingCondition(updateRun, placementv1beta1.StagedUpdateRunConditionProgressing, condition.UpdateRunAbandonedReason)) + meta.SetStatusCondition(&wantStatus.Conditions, generateFalseSucceededCondition(updateRun, placementv1beta1.StagedUpdateRunConditionSucceeded, condition.UpdateRunAbandonedReason)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Checking update run status metrics are emitted") + wantMetrics = append(wantMetrics, generateAbandonedMetric(updateRun)) + validateUpdateRunMetricsEmitted(wantMetrics...) + }) + }) }) var _ = Describe("UpdateRun execution tests - single stage", func() { diff --git a/pkg/controllers/updaterun/halt.go b/pkg/controllers/updaterun/halt.go new file mode 100644 index 000000000..909e0dee5 --- /dev/null +++ b/pkg/controllers/updaterun/halt.go @@ -0,0 +1,240 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package updaterun + +import ( + "errors" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" + "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" + "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" +) + +// haltUpdateRun handles the abandoning or pausing of the update run. +func (r *Reconciler) haltUpdateRun( + updateRun placementv1beta1.UpdateRunObj, + updatingStageIndex int, + toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj, +) (bool, time.Duration, error) { + if updateRun.GetUpdateRunSpec().State == placementv1beta1.StateAbandoned { + markUpdateRunAbandoning(updateRun) + } + + updateRunStatus := updateRun.GetUpdateRunStatus() + if updatingStageIndex < len(updateRunStatus.StagesStatus) { + maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex) + if err != nil { + return false, 0, err + } + updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex] + finished, waitTime, execErr := r.haltUpdatingStage(updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency) + if errors.Is(execErr, errStagedUpdatedAborted) { + markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error()) + return true, waitTime, execErr + } + return finished, waitTime, execErr + } + // All the stages have finished, halt the delete stage. + finished, execErr := r.haltDeleteStage(updateRun, toBeDeletedBindings) + if errors.Is(execErr, errStagedUpdatedAborted) { + markStageUpdatingFailed(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration(), execErr.Error()) + return true, 0, execErr + } + return finished, clusterUpdatingWaitTime, execErr +} + +// haltUpdatingStage halts the updating stage by letting the updating bindings finish and not starting new updates. +func (r *Reconciler) haltUpdatingStage( + updateRun placementv1beta1.UpdateRunObj, + updatingStageIndex int, + toBeUpdatedBindings []placementv1beta1.BindingObj, + maxConcurrency int, +) (bool, time.Duration, error) { + updateRunStatus := updateRun.GetUpdateRunStatus() + updatingStageStatus := &updateRunStatus.StagesStatus[updatingStageIndex] + updateRunRef := klog.KObj(updateRun) + // Create the map of the toBeUpdatedBindings. + toBeUpdatedBindingsMap := make(map[string]placementv1beta1.BindingObj, len(toBeUpdatedBindings)) + for _, binding := range toBeUpdatedBindings { + bindingSpec := binding.GetBindingSpec() + toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding + } + + clusterUpdatingCount := 0 + clusterUpdated := false + var stuckClusterNames []string + var clusterUpdateErrors []error + // Go through each cluster in the stage and check if it's updating/succeeded/failed/not started. + for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ { + clusterStatus := &updatingStageStatus.Clusters[i] + clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) + if clusterStartedCond == nil { + // Cluster has not started updating therefore no need to do anything. + continue + } + + clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) + if clusterUpdateSucceededCond != nil && (clusterUpdateSucceededCond.Status == metav1.ConditionFalse || clusterUpdateSucceededCond.Status == metav1.ConditionTrue) { + // The cluster has already been updated. + continue + } + + clusterUpdatingCount++ + + binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] + finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun) + if updateErr != nil { + clusterUpdateErrors = append(clusterUpdateErrors, updateErr) + } + if finished { + // The cluster has finished successfully, we can process another cluster in this round. + clusterUpdated = true + clusterUpdatingCount-- + } else { + // If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck. + timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time) + if timeElapsed > updateRunStuckThreshold { + klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) + stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName) + } + } + } + + // If there are stuck clusters, aggregate them into an error. + aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames) + + // Aggregate and return errors. + if len(clusterUpdateErrors) > 0 { + // Even though we aggregate errors, we can still check if one of the errors is a staged update aborted error by using errors.Is in the caller. + return false, 0, utilerrors.NewAggregate(clusterUpdateErrors) + } + + state := updateRun.GetUpdateRunSpec().State + if clusterUpdatingCount == 0 && clusterUpdated { + // All the clusters in the stage have finished updating or not started. + if state == placementv1beta1.StateAbandoned { + markStageUpdatingAbandoned(updatingStageStatus, updateRun.GetGeneration()) + } + klog.V(2).InfoS("The stage has finished all clusters updating", "state", state, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) + return true, 0, nil + } else if clusterUpdatingCount == 0 && !clusterUpdated { + // No clusters needed to be updated in this round, meaning all remaining clusters have not started yet or succeeded. + klog.V(2).InfoS("No clusters needed to be updated", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) + return true, 0, nil + } + // Some clusters are still updating. + markStageUpdatingAbandoning(updatingStageStatus, updateRun.GetGeneration()) + return false, clusterUpdatingWaitTime, nil +} + +// haltDeleteStage halts the delete stage by letting the deleting bindings finish. +func (r *Reconciler) haltDeleteStage( + updateRun placementv1beta1.UpdateRunObj, + toBeDeletedBindings []placementv1beta1.BindingObj, +) (bool, error) { + updateRunRef := klog.KObj(updateRun) + updateRunStatus := updateRun.GetUpdateRunStatus() + existingDeleteStageStatus := updateRunStatus.DeletionStageStatus + existingDeleteStageClusterMap := make(map[string]*placementv1beta1.ClusterUpdatingStatus, len(existingDeleteStageStatus.Clusters)) + for i := range existingDeleteStageStatus.Clusters { + existingDeleteStageClusterMap[existingDeleteStageStatus.Clusters[i].ClusterName] = &existingDeleteStageStatus.Clusters[i] + } + // Mark the delete stage as abandoning in case it's not. + markStageUpdatingAbandoning(existingDeleteStageStatus, updateRun.GetGeneration()) + for _, binding := range toBeDeletedBindings { + bindingSpec := binding.GetBindingSpec() + curCluster, exist := existingDeleteStageClusterMap[bindingSpec.TargetCluster] + if !exist { + // The cluster is not in the delete stage. This happens when the update run is abandoned as delete stage starts. + continue + } + // In validation, we already check the binding must exist in the status. + delete(existingDeleteStageClusterMap, bindingSpec.TargetCluster) + if condition.IsConditionStatusTrue(meta.FindStatusCondition(curCluster.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)), updateRun.GetGeneration()) { + // The cluster status is marked as deleted. + continue + } + if condition.IsConditionStatusTrue(meta.FindStatusCondition(curCluster.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)), updateRun.GetGeneration()) { + // The cluster status is marked as being deleted. + if binding.GetDeletionTimestamp().IsZero() { + // The cluster is marked as deleting but the binding is not deleting. + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` in the deleting stage is marked as deleting but its corresponding binding is not deleting", curCluster.ClusterName)) + klog.ErrorS(unexpectedErr, "The binding should be deleting before we mark a cluster deleting", "clusterStatus", curCluster, "updateRun", updateRunRef) + return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } + return false, nil + } + } + klog.V(2).InfoS("The delete stage is abandoning", "numberOfDeletingClusters", len(toBeDeletedBindings), "updateRun", updateRunRef) + if len(toBeDeletedBindings) == 0 { + markStageUpdatingAbandoned(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration()) + } + return len(toBeDeletedBindings) == 0, nil +} + +// markUpdateRunAbandoning marks the update run as abandoning in memory. +func markUpdateRunAbandoning(updateRun placementv1beta1.UpdateRunObj) { + updateRunStatus := updateRun.GetUpdateRunStatus() + meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{ + Type: string(placementv1beta1.StagedUpdateRunConditionProgressing), + Status: metav1.ConditionFalse, + ObservedGeneration: updateRun.GetGeneration(), + Reason: condition.UpdateRunAbandoningReason, + Message: "The update run is the process of abandoning", + }) +} + +// markStageUpdatingAbandoning marks the stage updating status as abandoning in memory. +func markStageUpdatingAbandoning(stageUpdatingStatus *placementv1beta1.StageUpdatingStatus, generation int64) { + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1beta1.StageUpdatingConditionProgressing), + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: condition.StageUpdatingAbandoningReason, + Message: "Waiting for all the updating clusters to finish updating before completing the abandoning process", + }) +} + +// markStageUpdatingAbandoned marks the stage updating status as abandoned in memory. +func markStageUpdatingAbandoned(stageUpdatingStatus *placementv1beta1.StageUpdatingStatus, generation int64) { + if stageUpdatingStatus.EndTime == nil { + stageUpdatingStatus.EndTime = &metav1.Time{Time: time.Now()} + } + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1beta1.StageUpdatingConditionProgressing), + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: condition.StageUpdatingAbandonedReason, + Message: "All the updating clusters have finished updating and no new updates will be started", + }) +} + +// abandon is a wrapper function for backward compatibility that calls haltUpdateRun. +func (r *Reconciler) abandon( + updateRun placementv1beta1.UpdateRunObj, + updatingStageIndex int, + toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj, +) (bool, time.Duration, error) { + return r.haltUpdateRun(updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings) +} diff --git a/pkg/utils/condition/reason.go b/pkg/utils/condition/reason.go index 9566ee42e..eccada48c 100644 --- a/pkg/utils/condition/reason.go +++ b/pkg/utils/condition/reason.go @@ -173,6 +173,12 @@ const ( // UpdateRunSucceededReason is the reason string of condition if the staged update run succeeded. UpdateRunSucceededReason = "UpdateRunSucceeded" + // UpdateRunAbandoningReason is the reason string of condition if the staged update run is the process of being abandoned. + UpdateRunAbandoningReason = "UpdateRunAbandoning" + + // UpdateRunAbandonedReason is the reason string of condition if the staged update run is abandoned. + UpdateRunAbandonedReason = "UpdateRunAbandoned" + // StageUpdatingStartedReason is the reason string of condition if the stage updating has started. StageUpdatingStartedReason = "StageUpdatingStarted" @@ -182,6 +188,12 @@ const ( // StageUpdatingFailedReason is the reason string of condition if the stage updating failed. StageUpdatingFailedReason = "StageUpdatingFailed" + // StageUpdatingAbandoningReason is the reason string of condition if the stage updating is abandoning. + StageUpdatingAbandoningReason = "StageUpdatingAbandoning" + + // StageUpdatingAbandonedReason is the reason string of condition if the stage updating is abandoned. + StageUpdatingAbandonedReason = "StageUpdatingAbandoned" + // StageUpdatingSucceededReason is the reason string of condition if the stage updating succeeded. StageUpdatingSucceededReason = "StageUpdatingSucceeded"