diff --git a/pkg/controllers/workapplier/availability_tracker.go b/pkg/controllers/workapplier/availability_tracker.go index 021c7babd..725ca3a9e 100644 --- a/pkg/controllers/workapplier/availability_tracker.go +++ b/pkg/controllers/workapplier/availability_tracker.go @@ -35,8 +35,8 @@ import ( "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" ) -// trackInMemberClusterObjAvailability tracks the availability of an applied objects in the member cluster. -func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) { +// trackInMemberClusterObjAvailability tracks the availability of applied objects in the member cluster. +func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) error { // Track the availability of all the applied objects in the member cluster in parallel. // // This is concurrency-safe as the bundles slice has been pre-allocated. @@ -83,6 +83,17 @@ func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bu // Run the availability check in parallel. r.parallelizer.ParallelizeUntil(childCtx, len(bundles), doWork, "trackInMemberClusterObjAvailability") + + // Unlike some other steps in the reconciliation loop, the availability checking step does not end + // with a contextual API call; consequently, if the context has been cancelled during this step, + // some checks might not run at all, and passing such bundles to the next step may trigger + // unexpected behaviors. To address this, at the end of this step the work applier checks for context + // cancellation directly. + if err := ctx.Err(); err != nil { + klog.V(2).InfoS("availability checking has been interrupted as the main context has been cancelled") + return fmt.Errorf("availability checking has been interrupted: %w", err) + } + return nil } // trackInMemberClusterObjAvailabilityByGVR tracks the availability of an object in the member cluster based diff --git a/pkg/controllers/workapplier/availability_tracker_test.go b/pkg/controllers/workapplier/availability_tracker_test.go index 03a212f83..1bc43c328 100644 --- a/pkg/controllers/workapplier/availability_tracker_test.go +++ b/pkg/controllers/workapplier/availability_tracker_test.go @@ -1126,7 +1126,10 @@ func TestTrackInMemberClusterObjAvailability(t *testing.T) { parallelizer: parallelizer.NewParallelizer(2), } - r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef) + if err := r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef); err != nil { + // Normally this would never occur. + t.Fatalf("trackInMemberClusterObjAvailability() = %v, want no error", err) + } // A special less func to sort the bundles by their ordinal. lessFuncManifestProcessingBundle := func(i, j *manifestProcessingBundle) bool { diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index 393867554..c4e4df323 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -479,18 +479,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // c) report configuration differences if applicable; // d) check for configuration drifts if applicable; // e) apply each manifest. - r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef) + if err := r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef); err != nil { + klog.ErrorS(err, "Failed to process the manifests", "work", workRef) + return ctrl.Result{}, err + } // Track the availability information. - r.trackInMemberClusterObjAvailability(ctx, bundles, workRef) + if err := r.trackInMemberClusterObjAvailability(ctx, bundles, workRef); err != nil { + klog.ErrorS(err, "Failed to check for object availability", "work", workRef) + return ctrl.Result{}, err + } // Refresh the status of the Work object. if err := r.refreshWorkStatus(ctx, work, bundles); err != nil { + klog.ErrorS(err, "Failed to refresh work object status", "work", workRef) return ctrl.Result{}, err } // Refresh the status of the AppliedWork object. if err := r.refreshAppliedWorkStatus(ctx, appliedWork, bundles); err != nil { + klog.ErrorS(err, "Failed to refresh appliedWork object status", "appliedWork", klog.KObj(appliedWork)) return ctrl.Result{}, err } diff --git a/pkg/controllers/workapplier/process.go b/pkg/controllers/workapplier/process.go index 82dc5ce18..1dba64f55 100644 --- a/pkg/controllers/workapplier/process.go +++ b/pkg/controllers/workapplier/process.go @@ -36,7 +36,7 @@ func (r *Reconciler) processManifests( bundles []*manifestProcessingBundle, work *fleetv1beta1.Work, expectedAppliedWorkOwnerRef *metav1.OwnerReference, -) { +) error { // Process all manifests in parallel. // // There are cases where certain groups of manifests should not be processed in parallel with @@ -58,7 +58,17 @@ func (r *Reconciler) processManifests( } r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode") - return + + // Unlike some other steps in the reconciliation loop, the manifest processing step does not end + // with a contextual API call; consequently, if the context has been cancelled during this step, + // some manifest might not get processed at all, and passing such bundles to the next step may trigger + // unexpected behaviors. To address this, at the end of this step the work applier checks for context + // cancellation directly. + if err := ctx.Err(); err != nil { + klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled") + return fmt.Errorf("manifest processing has been interrupted: %w", err) + } + return nil } // Organize the bundles into different waves of bundles for parallel processing based on their @@ -83,7 +93,18 @@ func (r *Reconciler) processManifests( } r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx)) + + // Unlike some other steps in the reconciliation loop, the manifest processing step does not end + // with a contextual API call; consequently, if the context has been cancelled during this step, + // some manifest might not get processed at all, and passing such bundles to the next step may trigger + // unexpected behaviors. To address this, at the end of this step the work applier checks for context + // cancellation directly. + if err := ctx.Err(); err != nil { + klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled") + return fmt.Errorf("manifest processing has been interrupted: %w", err) + } } + return nil } // processOneManifest processes a manifest (in the JSON format) embedded in the Work object. diff --git a/pkg/utils/parallelizer/parallelizer.go b/pkg/utils/parallelizer/parallelizer.go index 4c26fb43d..ca473e171 100644 --- a/pkg/utils/parallelizer/parallelizer.go +++ b/pkg/utils/parallelizer/parallelizer.go @@ -56,4 +56,13 @@ func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork } workqueue.ParallelizeUntil(ctx, p.numOfWorkers, pieces, doWorkWithLogs) + + // Note (chenyu1): the ParallelizeUntil method is essentially a thin wrapper around the + // workqueue.ParallelizeUntil method. Note that the workqueue.ParallelizeUntil method + // right now does not return any error; it returns when the context is cancelled, possibly + // in a willingly manner. Some of the KubeFleet code makes use of this to facilitate a + // fail-fast pattern (i.e., pass in a child context to the parallelizer; if one worker + // has exited, cancel the child context in the worker and consequently the whole parallelization). + // As only the caller knows why a context is cancelled (willingly by a worker or not), we leave it to the + // caller to inspect the context after this method returns rather than trying to do it here. }