Skip to content
2 changes: 2 additions & 0 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var (
driftDetectionInterval = flag.Int("drift-detection-interval", 15, "The interval in seconds between attempts to detect configuration drifts in the cluster.")
watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates")
watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile")
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
Expand Down Expand Up @@ -395,6 +396,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
parallelizer.DefaultNumOfWorkers,
time.Second*time.Duration(*availabilityCheckInterval),
time.Second*time.Duration(*driftDetectionInterval),
time.Minute*time.Duration(*deletionWaitTime),
*watchWorkWithPriorityQueue,
*watchWorkReconcileAgeMinutes,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5, true, 60)
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5, time.Minute, true, 60)

propertyProvider1 = &manuallyUpdatedProvider{}
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
Expand All @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5, true, 60)
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5, time.Minute, true, 60)

member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/workapplier/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package workapplier
import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
Expand Down Expand Up @@ -518,7 +517,7 @@ func validateOwnerReferences(
// expected AppliedWork object. For safety reasons, Fleet will still do a sanity check.
found := false
for _, ownerRef := range inMemberClusterObjOwnerRefs {
if reflect.DeepEqual(ownerRef, *expectedAppliedWorkOwnerRef) {
if areOwnerRefsEqual(&ownerRef, expectedAppliedWorkOwnerRef) {
found = true
break
}
Expand Down
133 changes: 102 additions & 31 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Copyright 2025 The KubeFleet Authors.

Expand All @@ -34,6 +18,7 @@

import (
"context"
"fmt"
"time"

"go.uber.org/atomic"
Expand Down Expand Up @@ -212,6 +197,7 @@

availabilityCheckRequeueAfter time.Duration
driftCheckRequeueAfter time.Duration
deletionWaitTime time.Duration
}

func NewReconciler(
Expand All @@ -222,6 +208,7 @@
workerCount int,
availabilityCheckRequestAfter time.Duration,
driftCheckRequestAfter time.Duration,
deletionWaitTime time.Duration,
watchWorkWithPriorityQueue bool,
watchWorkReconcileAgeMinutes int,
) *Reconciler {
Expand Down Expand Up @@ -251,6 +238,7 @@
joined: atomic.NewBool(false),
availabilityCheckRequeueAfter: acRequestAfter,
driftCheckRequeueAfter: dcRequestAfter,
deletionWaitTime: deletionWaitTime,
}
}

Expand Down Expand Up @@ -417,7 +405,7 @@
Kind: fleetv1beta1.AppliedWorkKind,
Name: appliedWork.GetName(),
UID: appliedWork.GetUID(),
BlockOwnerDeletion: ptr.To(false),
BlockOwnerDeletion: ptr.To(true),
}

// Set the default values for the Work object to avoid additional validation logic in the
Expand Down Expand Up @@ -485,29 +473,112 @@
return ctrl.Result{RequeueAfter: r.driftCheckRequeueAfter}, nil
}

// garbageCollectAppliedWork deletes the appliedWork and all the manifests associated with it from the cluster.
func (r *Reconciler) garbageCollectAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (ctrl.Result, error) {
deletePolicy := metav1.DeletePropagationBackground
deletePolicy := metav1.DeletePropagationForeground
if !controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) {
return ctrl.Result{}, nil
}
// delete the appliedWork which will remove all the manifests associated with it
// TODO: allow orphaned manifest
appliedWork := fleetv1beta1.AppliedWork{
appliedWork := &fleetv1beta1.AppliedWork{
ObjectMeta: metav1.ObjectMeta{Name: work.Name},
}
err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
switch {
case apierrors.IsNotFound(err):
klog.V(2).InfoS("The appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name)
// Get the AppliedWork object
if err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork); err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).InfoS("The appliedWork is already deleted, removing the finalizer from the work", "appliedWork", work.Name)
return r.removeWorkFinalizer(ctx, work)
}
klog.ErrorS(err, "Failed to get AppliedWork", "appliedWork", work.Name)

Check warning on line 490 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L490

Added line #L490 was not covered by tests
return ctrl.Result{}, controller.NewAPIServerError(false, err)
default:
klog.InfoS("Successfully deleted the appliedWork", "appliedWork", work.Name)
}
controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer)

// Handle stuck deletion after 5 minutes where the other owner references might not exist or are invalid.
if !appliedWork.DeletionTimestamp.IsZero() && time.Since(appliedWork.DeletionTimestamp.Time) >= r.deletionWaitTime {
klog.V(2).InfoS("AppliedWork deletion appears stuck; attempting to patch owner references", "appliedWork", work.Name)
if err := r.updateOwnerReference(ctx, work, appliedWork); err != nil {
klog.ErrorS(err, "Failed to update owner references for AppliedWork", "appliedWork", work.Name)
return ctrl.Result{}, controller.NewAPIServerError(false, err)
}

Check warning on line 500 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L498-L500

Added lines #L498 - L500 were not covered by tests
return ctrl.Result{}, fmt.Errorf("AppliedWork %s is being deleted, waiting for the deletion to complete", work.Name)
}

if err := r.spokeClient.Delete(ctx, appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy}); err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).InfoS("AppliedWork already deleted", "appliedWork", work.Name)
return r.removeWorkFinalizer(ctx, work)
}
klog.V(2).ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, controller.NewAPIServerError(false, err)

Check warning on line 510 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L505-L510

Added lines #L505 - L510 were not covered by tests
}

klog.V(2).InfoS("AppliedWork deletion in progress", "appliedWork", work.Name)
return ctrl.Result{}, fmt.Errorf("AppliedWork %s is being deleted, waiting for the deletion to complete", work.Name)
}

// updateOwnerReference updates the AppliedWork owner reference in the manifest objects.
// It changes the blockOwnerDeletion field to false, so that the AppliedWork can be deleted in cases where
// the other owner references do not exist or are invalid.
// https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/#owner-references-in-object-specifications
func (r *Reconciler) updateOwnerReference(ctx context.Context, work *fleetv1beta1.Work, appliedWork *fleetv1beta1.AppliedWork) error {
appliedWorkOwnerRef := &metav1.OwnerReference{
APIVersion: fleetv1beta1.GroupVersion.String(),
Kind: "AppliedWork",
Name: appliedWork.Name,
UID: appliedWork.UID,
}

if err := r.hubClient.Get(ctx, types.NamespacedName{Name: work.Name, Namespace: work.Namespace}, work); err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).InfoS("Work object not found, skipping owner reference update", "work", work.Name, "namespace", work.Namespace)
return nil
}
klog.ErrorS(err, "Failed to get Work object for owner reference update", "work", work.Name, "namespace", work.Namespace)
return controller.NewAPIServerError(false, err)

Check warning on line 535 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L530-L535

Added lines #L530 - L535 were not covered by tests
}

for _, cond := range work.Status.ManifestConditions {
res := cond.Identifier
gvr := schema.GroupVersionResource{
Group: res.Group,
Version: res.Version,
Resource: res.Resource,
}

var obj *unstructured.Unstructured
var err error
if obj, err = r.spokeDynamicClient.Resource(gvr).Namespace(res.Namespace).Get(ctx, res.Name, metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
continue
}
klog.ErrorS(err, "Failed to get manifest", "gvr", gvr, "name", res.Name, "namespace", res.Namespace)
return err

Check warning on line 553 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L552-L553

Added lines #L552 - L553 were not covered by tests
}
// Check if there is more than one owner reference. If there is only one owner reference, it is the appliedWork itself.
// Otherwise, at least one other owner reference exists, and we need to leave resource alone.
if len(obj.GetOwnerReferences()) > 1 {
ownerRefs := obj.GetOwnerReferences()
updated := false
for idx := range ownerRefs {
if areOwnerRefsEqual(&ownerRefs[idx], appliedWorkOwnerRef) {
ownerRefs[idx].BlockOwnerDeletion = ptr.To(false)
updated = true
}
}
if updated {
obj.SetOwnerReferences(ownerRefs)
if _, err = r.spokeDynamicClient.Resource(gvr).Namespace(obj.GetNamespace()).Update(ctx, obj, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to update manifest owner references", "gvr", gvr, "name", res.Name, "namespace", res.Namespace)
return err
}

Check warning on line 571 in pkg/controllers/workapplier/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/workapplier/controller.go#L569-L571

Added lines #L569 - L571 were not covered by tests
klog.V(4).InfoS("Patched manifest owner references", "gvr", gvr, "name", res.Name, "namespace", res.Namespace)
}
}
}
return nil
}

// removeWorkFinalizer removes the finalizer from the work and updates it in the hub.
func (r *Reconciler) removeWorkFinalizer(ctx context.Context, work *fleetv1beta1.Work) (ctrl.Result, error) {
controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer)
if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to remove the finalizer from the work", "work", klog.KObj(work))
return ctrl.Result{}, controller.NewAPIServerError(false, err)
Expand Down
Loading
Loading