Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/controllers/workapplier/availability_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
)

// trackInMemberClusterObjAvailability tracks the availability of an applied objects in the member cluster.
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) {
// trackInMemberClusterObjAvailability tracks the availability of applied objects in the member cluster.
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) error {
// Track the availability of all the applied objects in the member cluster in parallel.
//
// This is concurrency-safe as the bundles slice has been pre-allocated.
Expand Down Expand Up @@ -83,6 +83,15 @@ func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bu

// Run the availability check in parallel.
r.parallelizer.ParallelizeUntil(childCtx, len(bundles), doWork, "trackInMemberClusterObjAvailability")

// The workqueue.ParallelizeUntil utility does not return errors even if its context has been
// cancelled (and some availability checks might not be completed yet); to catch such
// premature termination, the work applier checks for context cancellation directly.
if err := ctx.Err(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this check in framework.go and preprocess.go?

If this is applicable everywhere, maybe do this check and return error from

func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string)

whenever ctx is cancelled?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, ParallelizeUtil is an interface we own. We can probably return error from there.

Copy link
Collaborator

@weng271190436 weng271190436 Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that I raised this originally is "can we make it easier for the next person using ParallelizeUntil so that they won't forget to check for context cancellation if we can encapsulate the handling inside the parallelizer package." But it seems that there are some nuances so okay with this approach.

For example, in framework.go we are already handling the context cancellation error separately

Copy link
Collaborator Author

@michaelawyu michaelawyu Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Wei and Wantong! Yeah, actually that was my first attempt as well. The concern I had was that, there are cases where we run ParallelizeUntil in its own child context (primarily for the reason that the child context can be cancelled on its own to terminate the parallelization promptly); and if we do the checking with our wrapper, e.g.,

func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) {
	...
	workqueue.ParallelizeUntil(ctx, p.numOfWorkers, pieces, doWorkWithLogs)

        if err := ctx.Done(); err != nil {...}
}

two possibilities exist:

a) the child context is cancelled willingly by us, and the method returns an error;
b) the parent context gets cancelled by factors outside our control, which in turn cancels the child context, and we also receive an error

The complication is that, we couldn't really tell between the two possibilities by looking at the error, so we still need to inspect the parent context at the caller level to find out why things fail, which kind of defeats the purpose of putting the logic inside the wrapper. This is why in this PR the check is done at the caller level instead at the moment. If there's any concern or things that I've missed, please let me know 🙏

klog.V(2).InfoS("availability checking has been interrupted as the main context has been cancelled")
return fmt.Errorf("availability checking has been interrupted: %w", err)
}
return nil
}

// trackInMemberClusterObjAvailabilityByGVR tracks the availability of an object in the member cluster based
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/workapplier/availability_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,10 @@ func TestTrackInMemberClusterObjAvailability(t *testing.T) {
parallelizer: parallelizer.NewParallelizer(2),
}

r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef)
if err := r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef); err != nil {
// Normally this would never occur.
t.Fatalf("trackInMemberClusterObjAvailability() = %v, want no error", err)
}

// A special less func to sort the bundles by their ordinal.
lessFuncManifestProcessingBundle := func(i, j *manifestProcessingBundle) bool {
Expand Down
12 changes: 10 additions & 2 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// c) report configuration differences if applicable;
// d) check for configuration drifts if applicable;
// e) apply each manifest.
r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef)
if err := r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef); err != nil {
klog.ErrorS(err, "Failed to process the manifests", "work", workRef)
return ctrl.Result{}, err
}

// Track the availability information.
r.trackInMemberClusterObjAvailability(ctx, bundles, workRef)
if err := r.trackInMemberClusterObjAvailability(ctx, bundles, workRef); err != nil {
klog.ErrorS(err, "Failed to check for object availability", "work", workRef)
return ctrl.Result{}, err
}

// Refresh the status of the Work object.
if err := r.refreshWorkStatus(ctx, work, bundles); err != nil {
klog.ErrorS(err, "Failed to refresh work object status", "work", workRef)
return ctrl.Result{}, err
}

// Refresh the status of the AppliedWork object.
if err := r.refreshAppliedWorkStatus(ctx, appliedWork, bundles); err != nil {
klog.ErrorS(err, "Failed to refresh appliedWork object status", "appliedWork", klog.KObj(appliedWork))
return ctrl.Result{}, err
}

Expand Down
21 changes: 19 additions & 2 deletions pkg/controllers/workapplier/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *Reconciler) processManifests(
bundles []*manifestProcessingBundle,
work *fleetv1beta1.Work,
expectedAppliedWorkOwnerRef *metav1.OwnerReference,
) {
) error {
// Process all manifests in parallel.
//
// There are cases where certain groups of manifests should not be processed in parallel with
Expand All @@ -58,7 +58,15 @@ func (r *Reconciler) processManifests(
}

r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode")
return

// The workqueue.ParallelizeUntil utility does not return errors even if its context has been
// cancelled (and some manifest might not be fully processed yet); to catch such
// premature termination, the work applier checks for context cancellation directly.
if err := ctx.Err(); err != nil {
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
return fmt.Errorf("manifest processing has been interrupted: %w", err)
}
return nil
}

// Organize the bundles into different waves of bundles for parallel processing based on their
Expand All @@ -83,7 +91,16 @@ func (r *Reconciler) processManifests(
}

r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx))

// The workqueue.ParallelizeUntil utility does not return errors even if its context has been
// cancelled (and some manifest might not be fully processed yet); to catch such
// premature termination, the work applier checks for context cancellation directly.
if err := ctx.Err(); err != nil {
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
return fmt.Errorf("manifest processing has been interrupted: %w", err)
}
}
return nil
}

// processOneManifest processes a manifest (in the JSON format) embedded in the Work object.
Expand Down
Loading