Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (r *Reconciler) execute(
updateRunStatus := updateRun.GetUpdateRunStatus()
if updatingStageIndex < len(updateRunStatus.StagesStatus) {
updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex]
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings)
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, 1)
if errors.Is(execErr, errStagedUpdatedAborted) {
markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error())
return true, waitTime, execErr
Expand All @@ -91,6 +92,7 @@ func (r *Reconciler) executeUpdatingStage(
updateRun placementv1beta1.UpdateRunObj,
updatingStageIndex int,
toBeUpdatedBindings []placementv1beta1.BindingObj,
maxConcurrency int,
) (time.Duration, error) {
updateRunStatus := updateRun.GetUpdateRunStatus()
updateRunSpec := updateRun.GetUpdateRunSpec()
Expand All @@ -106,23 +108,39 @@ func (r *Reconciler) executeUpdatingStage(
toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding
}
finishedClusterCount := 0
clusterUpdatingCount := 0

// Go through each cluster in the stage and check if it's updated.
for i := range updatingStageStatus.Clusters {
// List of clusters that need to be processed in parallel in this execution.
var clustersToProcess []*placementv1beta1.ClusterUpdatingStatus

// Go through each cluster in the stage and check if it's updating/succeeded/failed.
for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ {
clusterStatus := &updatingStageStatus.Clusters[i]
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster is marked as failed to update.
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
}
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster has been updated successfully.
finishedClusterCount++
continue
if clusterUpdateSucceededCond == nil {
// The cluster is either updating or not started yet.
clustersToProcess = append(clustersToProcess, &updatingStageStatus.Clusters[i])
clusterUpdatingCount++
} else {
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster is marked as failed to update.
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
}
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster has been updated successfully.
finishedClusterCount++
continue
}
}
}

var stuckClusterNames []string
// Now go through each cluster that needs to be processed.
for i := range clustersToProcess {
clusterStatus := clustersToProcess[i]
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
// The cluster is either updating or not started yet.
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) {
Expand Down Expand Up @@ -172,8 +190,8 @@ func (r *Reconciler) executeUpdatingStage(
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, nil
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
continue
}

// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
Expand All @@ -194,22 +212,26 @@ func (r *Reconciler) executeUpdatingStage(
}

finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if updateErr != nil {
return clusterUpdatingWaitTime, updateErr
}
if finished {
finishedClusterCount++
markUpdateRunProgressing(updateRun)
continue
} else {
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
}

// After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing.
aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount)

if finishedClusterCount == len(updatingStageStatus.Clusters) {
// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
Expand All @@ -235,6 +257,15 @@ func (r *Reconciler) executeUpdatingStage(
return clusterUpdatingWaitTime, nil
}

func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) {
if len(stuckClusterNames) > 0 {
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
} else if finishedClusterCount > 0 {
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
markUpdateRunProgressing(updateRun)
}
}

// executeDeleteStage executes the delete stage by deleting the bindings.
func (r *Reconciler) executeDeleteStage(
ctx context.Context,
Expand Down
17 changes: 0 additions & 17 deletions pkg/controllers/updaterun/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,6 @@ func validateClusterUpdatingStatus(
return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
updatingStageIndex = curStage
// Collect the updating clusters.
var updatingClusters []string
for j := range stageStatus.Clusters {
clusterStartedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
clusterFinishedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) &&
!(condition.IsConditionStatusTrue(clusterFinishedCond, updateRun.GetGeneration()) || condition.IsConditionStatusFalse(clusterFinishedCond, updateRun.GetGeneration())) {
updatingClusters = append(updatingClusters, stageStatus.Clusters[j].ClusterName)
}
}
// We don't allow more than one clusters to be updating at the same time.
// TODO(wantjian): support multiple clusters updating at the same time.
if len(updatingClusters) > 1 {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("more than one cluster is updating in the stage `%s`, clusters: %v", stageStatus.StageName, updatingClusters))
klog.ErrorS(unexpectedErr, "Detected more than one updating clusters in the stage", "updateRun", klog.KObj(updateRun))
return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
}
return updatingStageIndex, lastFinishedStageIndex, nil
}
Expand Down
Loading