Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/controllers/workapplier/availability_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
)

// trackInMemberClusterObjAvailability tracks the availability of an applied objects in the member cluster.
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) {
// trackInMemberClusterObjAvailability tracks the availability of applied objects in the member cluster.
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) error {
// Track the availability of all the applied objects in the member cluster in parallel.
//
// This is concurrency-safe as the bundles slice has been pre-allocated.
Expand Down Expand Up @@ -83,6 +83,17 @@ func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bu

// Run the availability check in parallel.
r.parallelizer.ParallelizeUntil(childCtx, len(bundles), doWork, "trackInMemberClusterObjAvailability")

// Unlike some other steps in the reconciliation loop, the availability checking step does not end
// with a contextual API call; consequently, if the context has been cancelled during this step,
// some checks might not run at all, and passing such bundles to the next step may trigger
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
// cancellation directly.
if err := ctx.Err(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this check in framework.go and preprocess.go?

If this is applicable everywhere, maybe do this check and return error from

func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string)

whenever ctx is cancelled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, ParallelizeUtil is an interface we own. We can probably return error from there.

Copy link
Collaborator

@weng271190436 weng271190436 Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that I raised this originally is "can we make it easier for the next person using ParallelizeUntil so that they won't forget to check for context cancellation if we can encapsulate the handling inside the parallelizer package." But it seems that there are some nuances so okay with this approach.

For example, in framework.go we are already handling the context cancellation error separately

Copy link
Collaborator Author

@michaelawyu michaelawyu Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei and Wantong! Yeah, actually that was my first attempt as well. The concern I had was that, there are cases where we run ParallelizeUntil in its own child context (primarily for the reason that the child context can be cancelled on its own to terminate the parallelization promptly); and if we do the checking with our wrapper, e.g.,

func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) {
	...
	workqueue.ParallelizeUntil(ctx, p.numOfWorkers, pieces, doWorkWithLogs)

        if err := ctx.Done(); err != nil {...}
}

two possibilities exist:

a) the child context is cancelled willingly by us, and the method returns an error;
b) the parent context gets cancelled by factors outside our control, which in turn cancels the child context, and we also receive an error

The complication is that, we couldn't really tell between the two possibilities by looking at the error, so we still need to inspect the parent context at the caller level to find out why things fail, which kind of defeats the purpose of putting the logic inside the wrapper. This is why in this PR the check is done at the caller level instead at the moment. If there's any concern or things that I've missed, please let me know 🙏

klog.V(2).InfoS("availability checking has been interrupted as the main context has been cancelled")
return fmt.Errorf("availability checking has been interrupted: %w", err)
}
return nil
}

// trackInMemberClusterObjAvailabilityByGVR tracks the availability of an object in the member cluster based
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/workapplier/availability_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,10 @@ func TestTrackInMemberClusterObjAvailability(t *testing.T) {
parallelizer: parallelizer.NewParallelizer(2),
}

r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef)
if err := r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef); err != nil {
// Normally this would never occur.
t.Fatalf("trackInMemberClusterObjAvailability() = %v, want no error", err)
}

// A special less func to sort the bundles by their ordinal.
lessFuncManifestProcessingBundle := func(i, j *manifestProcessingBundle) bool {
Expand Down
12 changes: 10 additions & 2 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// c) report configuration differences if applicable;
// d) check for configuration drifts if applicable;
// e) apply each manifest.
r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef)
if err := r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef); err != nil {
klog.ErrorS(err, "Failed to process the manifests", "work", workRef)
return ctrl.Result{}, err
}

// Track the availability information.
r.trackInMemberClusterObjAvailability(ctx, bundles, workRef)
if err := r.trackInMemberClusterObjAvailability(ctx, bundles, workRef); err != nil {
klog.ErrorS(err, "Failed to check for object availability", "work", workRef)
return ctrl.Result{}, err
}

// Refresh the status of the Work object.
if err := r.refreshWorkStatus(ctx, work, bundles); err != nil {
klog.ErrorS(err, "Failed to refresh work object status", "work", workRef)
return ctrl.Result{}, err
}

// Refresh the status of the AppliedWork object.
if err := r.refreshAppliedWorkStatus(ctx, appliedWork, bundles); err != nil {
klog.ErrorS(err, "Failed to refresh appliedWork object status", "appliedWork", klog.KObj(appliedWork))
return ctrl.Result{}, err
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/controllers/workapplier/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *Reconciler) processManifests(
bundles []*manifestProcessingBundle,
work *fleetv1beta1.Work,
expectedAppliedWorkOwnerRef *metav1.OwnerReference,
) {
) error {
// Process all manifests in parallel.
//
// There are cases where certain groups of manifests should not be processed in parallel with
Expand All @@ -58,7 +58,17 @@ func (r *Reconciler) processManifests(
}

r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode")
return

// Unlike some other steps in the reconciliation loop, the manifest processing step does not end
// with a contextual API call; consequently, if the context has been cancelled during this step,
// some manifest might not get processed at all, and passing such bundles to the next step may trigger
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
// cancellation directly.
if err := ctx.Err(); err != nil {
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
return fmt.Errorf("manifest processing has been interrupted: %w", err)
}
return nil
}

// Organize the bundles into different waves of bundles for parallel processing based on their
Expand All @@ -83,7 +93,18 @@ func (r *Reconciler) processManifests(
}

r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx))

// Unlike some other steps in the reconciliation loop, the manifest processing step does not end
// with a contextual API call; consequently, if the context has been cancelled during this step,
// some manifest might not get processed at all, and passing such bundles to the next step may trigger
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
// cancellation directly.
if err := ctx.Err(); err != nil {
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
return fmt.Errorf("manifest processing has been interrupted: %w", err)
}
}
return nil
}

// processOneManifest processes a manifest (in the JSON format) embedded in the Work object.
Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/parallelizer/parallelizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork
}

workqueue.ParallelizeUntil(ctx, p.numOfWorkers, pieces, doWorkWithLogs)

// Note (chenyu1): the ParallelizeUntil method is essentially a thin wrapper around the
// workqueue.ParallelizeUntil method. Note that the workqueue.ParallelizeUntil method
// right now does not return any error; it returns when the context is cancelled, possibly
// in a willingly manner. Some of the KubeFleet code makes use of this to facilitate a
// fail-fast pattern (i.e., pass in a child context to the parallelizer; if one worker
// has exited, cancel the child context in the worker and consequently the whole parallelization).
// As only the caller knows why a context is cancelled (willingly by a worker or not), we leave it to the
// caller to inspect the context after this method returns rather than trying to do it here.
}
Loading