Skip to content

Commit 2b1e9a7

Browse files
authored
feat: implement initialize and execute states within update run (#346)
1 parent 2e50010 commit 2b1e9a7

File tree

11 files changed

+649
-202
lines changed

11 files changed

+649
-202
lines changed

apis/placement/v1beta1/stageupdate_types.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,14 @@ func (c *ClusterStagedUpdateRun) SetUpdateRunStatus(status UpdateRunStatus) {
152152
type State string
153153

154154
const (
155-
// StateNotStarted describes user intent to initialize but not execute the update run.
155+
// StateInitialized describes user intent to initialize but not execute the update run.
156156
// This is the default state when an update run is created.
157-
StateNotStarted State = "Initialize"
157+
// Users can subsequently set the state to Execute or Abandon.
158+
StateInitialized State = "Initialize"
158159

159-
// StateStarted describes user intent to execute (or resume execution if paused).
160+
// StateExecuted describes user intent to execute (or resume execution if paused).
160161
// Users can subsequently set the state to Pause or Abandon.
161-
StateStarted State = "Execute"
162+
StateExecuted State = "Execute"
162163

163164
// StateStopped describes user intent to pause the update run.
164165
// Users can subsequently set the state to Execute or Abandon.
@@ -426,7 +427,6 @@ const (
426427
// Its condition status can be one of the following:
427428
// - "True": The staged update run is initialized successfully.
428429
// - "False": The staged update run encountered an error during initialization and aborted.
429-
// - "Unknown": The staged update run initialization has started.
430430
StagedUpdateRunConditionInitialized StagedUpdateRunConditionType = "Initialized"
431431

432432
// StagedUpdateRunConditionProgressing indicates whether the staged update run is making progress.

pkg/controllers/updaterun/controller.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
104104
// Emit the update run status metric based on status conditions in the updateRun.
105105
defer emitUpdateRunStatusMetric(updateRun)
106106

107+
state := updateRun.GetUpdateRunSpec().State
108+
107109
var updatingStageIndex int
108110
var toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj
109111
updateRunStatus := updateRun.GetUpdateRunStatus()
110112
initCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
111-
if !condition.IsConditionStatusTrue(initCond, updateRun.GetGeneration()) {
112-
if condition.IsConditionStatusFalse(initCond, updateRun.GetGeneration()) {
113+
// Check if initialized regardless of generation.
114+
// The updateRun spec fields are immutable except for the state field. When the state changes,
115+
// the update run generation increments, but we don't need to reinitialize since initialization is a one-time setup.
116+
if !(initCond != nil && initCond.Status == metav1.ConditionTrue) {
117+
// Check if initialization failed for the current generation.
118+
if initCond != nil && initCond.Status == metav1.ConditionFalse {
113119
klog.V(2).InfoS("The updateRun has failed to initialize", "errorMsg", initCond.Message, "updateRun", runObjRef)
114120
return runtime.Result{}, nil
115121
}
122+
123+
// Initialize the updateRun.
116124
var initErr error
117125
if toBeUpdatedBindings, toBeDeletedBindings, initErr = r.initialize(ctx, updateRun); initErr != nil {
118126
klog.ErrorS(initErr, "Failed to initialize the updateRun", "updateRun", runObjRef)
@@ -122,10 +130,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
122130
}
123131
return runtime.Result{}, initErr
124132
}
125-
updatingStageIndex = 0 // start from the first stage.
126-
klog.V(2).InfoS("Initialized the updateRun", "updateRun", runObjRef)
133+
updatingStageIndex = 0 // start from the first stage (typically for Initialize or Execute states).
134+
klog.V(2).InfoS("Initialized the updateRun", "state", state, "updateRun", runObjRef)
127135
} else {
128-
klog.V(2).InfoS("The updateRun is initialized", "updateRun", runObjRef)
136+
klog.V(2).InfoS("The updateRun is initialized", "state", state, "updateRun", runObjRef)
129137
// Check if the updateRun is finished.
130138
finishedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
131139
if condition.IsConditionStatusTrue(finishedCond, updateRun.GetGeneration()) || condition.IsConditionStatusFalse(finishedCond, updateRun.GetGeneration()) {
@@ -151,28 +159,32 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
151159
}
152160

153161
// Execute the updateRun.
154-
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
155-
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
156-
if errors.Is(execErr, errStagedUpdatedAborted) {
157-
// errStagedUpdatedAborted cannot be retried.
158-
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
159-
}
162+
if state == placementv1beta1.StateExecuted {
163+
klog.V(2).InfoS("Continue to execute the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
164+
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
165+
if errors.Is(execErr, errStagedUpdatedAborted) {
166+
// errStagedUpdatedAborted cannot be retried.
167+
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
168+
}
160169

161-
if finished {
162-
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
163-
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
164-
}
170+
if finished {
171+
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
172+
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
173+
}
165174

166-
// The execution is not finished yet or it encounters a retriable error.
167-
// We need to record the status and requeue.
168-
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
169-
return runtime.Result{}, updateErr
170-
}
171-
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
172-
if execErr != nil {
173-
return runtime.Result{}, execErr
175+
// The execution is not finished yet or it encounters a retriable error.
176+
// We need to record the status and requeue.
177+
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
178+
return runtime.Result{}, updateErr
179+
}
180+
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
181+
if execErr != nil {
182+
return runtime.Result{}, execErr
183+
}
184+
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
174185
}
175-
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
186+
klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef)
187+
return runtime.Result{}, nil
176188
}
177189

178190
// handleDelete handles the deletion of the updateRun object.

pkg/controllers/updaterun/controller_integration_test.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,16 @@ func generateMetricsLabels(
272272
}
273273
}
274274

275+
func generateInitializationSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
276+
return &prometheusclientmodel.Metric{
277+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
278+
string(metav1.ConditionTrue), condition.UpdateRunInitializeSucceededReason),
279+
Gauge: &prometheusclientmodel.Gauge{
280+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
281+
},
282+
}
283+
}
284+
275285
func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
276286
return &prometheusclientmodel.Metric{
277287
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
@@ -341,6 +351,7 @@ func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateR
341351
PlacementName: testCRPName,
342352
ResourceSnapshotIndex: testResourceSnapshotIndex,
343353
StagedUpdateStrategyName: testUpdateStrategyName,
354+
State: placementv1beta1.StateExecuted,
344355
},
345356
}
346357
}
@@ -807,23 +818,8 @@ func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
807818
}
808819
}
809820

810-
func generateFalseProgressingCondition(obj client.Object, condType any, succeeded bool) metav1.Condition {
821+
func generateFalseProgressingCondition(obj client.Object, condType any, reason string) metav1.Condition {
811822
falseCond := generateFalseCondition(obj, condType)
812-
reason := ""
813-
switch condType {
814-
case placementv1beta1.StagedUpdateRunConditionProgressing:
815-
if succeeded {
816-
reason = condition.UpdateRunSucceededReason
817-
} else {
818-
reason = condition.UpdateRunFailedReason
819-
}
820-
case placementv1beta1.StageUpdatingConditionProgressing:
821-
if succeeded {
822-
reason = condition.StageUpdatingSucceededReason
823-
} else {
824-
reason = condition.StageUpdatingFailedReason
825-
}
826-
}
827823
falseCond.Reason = reason
828824
return falseCond
829825
}

pkg/controllers/updaterun/execution.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -285,31 +285,45 @@ func (r *Reconciler) executeUpdatingStage(
285285
}
286286

287287
if finishedClusterCount == len(updatingStageStatus.Clusters) {
288-
// All the clusters in the stage have been updated.
289-
markUpdateRunWaiting(updateRun, fmt.Sprintf(condition.UpdateRunWaitingMessageFmt, "after-stage", updatingStageStatus.StageName))
290-
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration(), "All clusters in the stage are updated, waiting for after-stage tasks to complete")
291-
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
292-
// Check if the after stage tasks are ready.
293-
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
294-
if err != nil {
295-
return 0, err
296-
}
297-
if approved {
298-
markUpdateRunProgressing(updateRun)
299-
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
300-
// No need to wait to get to the next stage.
301-
return 0, nil
302-
}
303-
// The after stage tasks are not ready yet.
304-
if waitTime < 0 {
305-
waitTime = stageUpdatingWaitTime
306-
}
307-
return waitTime, nil
288+
return r.handleStageCompletion(ctx, updatingStageIndex, updateRun, updatingStageStatus)
308289
}
290+
309291
// Some clusters are still updating.
310292
return clusterUpdatingWaitTime, nil
311293
}
312294

295+
// handleStageCompletion handles the completion logic when all clusters in a stage are finished.
296+
// Returns the wait time and any error encountered.
297+
func (r *Reconciler) handleStageCompletion(
298+
ctx context.Context,
299+
updatingStageIndex int,
300+
updateRun placementv1beta1.UpdateRunObj,
301+
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
302+
) (time.Duration, error) {
303+
updateRunRef := klog.KObj(updateRun)
304+
305+
// All the clusters in the stage have been updated.
306+
markUpdateRunWaiting(updateRun, fmt.Sprintf(condition.UpdateRunWaitingMessageFmt, "after-stage", updatingStageStatus.StageName))
307+
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration(), "All clusters in the stage are updated, waiting for after-stage tasks to complete")
308+
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
309+
// Check if the after stage tasks are ready.
310+
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
311+
if err != nil {
312+
return 0, err
313+
}
314+
if approved {
315+
markUpdateRunProgressing(updateRun)
316+
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
317+
// No need to wait to get to the next stage.
318+
return 0, nil
319+
}
320+
// The after stage tasks are not ready yet.
321+
if waitTime < 0 {
322+
waitTime = stageUpdatingWaitTime
323+
}
324+
return waitTime, nil
325+
}
326+
313327
// executeDeleteStage executes the delete stage by deleting the bindings.
314328
func (r *Reconciler) executeDeleteStage(
315329
ctx context.Context,

0 commit comments

Comments
 (0)