diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 537a30bfc..e5521dbc9 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -38,11 +38,13 @@ import ( "github.com/kubernetes-csi/csi-lib-utils/standardflags" "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -178,6 +180,19 @@ func main() { Port: 9443, TLSOpts: tlsOpts, }), + Client: client.Options{ + Cache: &client.CacheOptions{ + DisableFor: []client.Object{ + // Pods are fetched by the CSIAddonsNode controller + // Since we do not do this frequently the cache for it can be disabled + // This benefits us a lot as there can be a large number of pods that are present in the cache + &corev1.Pod{}, + // Namespaces are fetched by the old PVC reconciler + // TODO: Remove this when the reconciler is phased out + &corev1.Namespace{}, + }, + }, + }, }) if err != nil { setupLog.Error(err, "unable to start manager") @@ -223,14 +238,26 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "ReclaimSpaceCronJob") os.Exit(1) } - if err = (&controllers.PersistentVolumeClaimReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConnPool: connPool, - SchedulePrecedence: cfg.SchedulePrecedence, - }).SetupWithManager(mgr, ctrlOptions); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeClaim") - os.Exit(1) + if cfg.SchedulePrecedence == util.ScheduleSC { + setupLog.Info("Using new PVC controller for precedence", "schedulePrecedence", cfg.SchedulePrecedence) + if err = (&controllers.PVCReconiler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConnPool: connPool, + }).SetupWithManager(mgr, ctrlOptions); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeClaim") + os.Exit(1) + } + } else { + if err = (&controllers.PersistentVolumeClaimReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConnPool: connPool, + SchedulePrecedence: cfg.SchedulePrecedence, + }).SetupWithManager(mgr, ctrlOptions); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeClaim") + os.Exit(1) + } } if err = (&replicationController.VolumeReplicationReconciler{ Client: mgr.GetClient(), diff --git a/internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go b/internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go index 61ec926ad..0aae32304 100644 --- a/internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go +++ b/internal/controller/csiaddons/encryptionkeyrotationcronjob_controller.go @@ -23,6 +23,7 @@ import ( "time" csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/go-logr/logr" "github.com/robfig/cron/v3" @@ -72,14 +73,14 @@ func (r *EncryptionKeyRotationCronJobReconciler) Reconcile(ctx context.Context, // Set default values for the optionals if krcJob.Spec.FailedJobsHistoryLimit == nil { - *krcJob.Spec.FailedJobsHistoryLimit = defaultFailedJobsHistoryLimit + *krcJob.Spec.FailedJobsHistoryLimit = utils.DefaultFailedJobsHistoryLimit } if krcJob.Spec.SuccessfulJobsHistoryLimit == nil { - *krcJob.Spec.SuccessfulJobsHistoryLimit = defaultSuccessfulJobsHistoryLimit + *krcJob.Spec.SuccessfulJobsHistoryLimit = utils.DefaultSuccessfulJobsHistoryLimit } var childJobs csiaddonsv1alpha1.EncryptionKeyRotationJobList - err = r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}) + err = r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{utils.JobOwnerKey: req.Name}) if err != nil { logger.Error(err, "failed to fetch list of encryptionkeyrotationjob") return ctrl.Result{}, err @@ -367,7 +368,7 @@ func (r *EncryptionKeyRotationCronJobReconciler) constructEncryptionKeyRotationJ // SetupWithManager sets up the controller with the Manager. func (r *EncryptionKeyRotationCronJobReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error { - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &csiaddonsv1alpha1.EncryptionKeyRotationJob{}, jobOwnerKey, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &csiaddonsv1alpha1.EncryptionKeyRotationJob{}, utils.JobOwnerKey, func(rawObj client.Object) []string { job, ok := rawObj.(*csiaddonsv1alpha1.EncryptionKeyRotationJob) if !ok { return nil diff --git a/internal/controller/csiaddons/encryptionkeyrotationjob_controller.go b/internal/controller/csiaddons/encryptionkeyrotationjob_controller.go index ca1e1a425..0774f101e 100644 --- a/internal/controller/csiaddons/encryptionkeyrotationjob_controller.go +++ b/internal/controller/csiaddons/encryptionkeyrotationjob_controller.go @@ -24,6 +24,7 @@ import ( csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/csi-addons/kubernetes-csi-addons/internal/proto" "github.com/csi-addons/kubernetes-csi-addons/internal/util" "github.com/csi-addons/spec/lib/go/identity" @@ -38,8 +39,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -333,6 +336,12 @@ func (r *EncryptionKeyRotationJobReconciler) SetupWithManager(mgr ctrl.Manager, return ctrl.NewControllerManagedBy(mgr). For(&csiaddonsv1alpha1.EncryptionKeyRotationJob{}). WithEventFilter(predicate.GenerationChangedPredicate{}). + // This is to avoid "stop-the-world" events and wait for cache sync when we list VA + Watches( + &scv1.VolumeAttachment{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(utils.SilentPredicate()), + ). WithOptions(ctrlOptions). Complete(r) } diff --git a/internal/controller/csiaddons/persistentvolumeclaim_controller.go b/internal/controller/csiaddons/persistentvolumeclaim_controller.go index dbb8396e7..50f047bbe 100644 --- a/internal/controller/csiaddons/persistentvolumeclaim_controller.go +++ b/internal/controller/csiaddons/persistentvolumeclaim_controller.go @@ -28,6 +28,7 @@ import ( csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/csi-addons/kubernetes-csi-addons/internal/util" "github.com/go-logr/logr" @@ -63,19 +64,8 @@ type PersistentVolumeClaimReconciler struct { type Operation string var ( - rsCronJobScheduleTimeAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/schedule" - rsCronJobNameAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/cronjob" - rsCSIAddonsDriverAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/drivers" - - krEnableAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/enable" - krcJobScheduleTimeAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/schedule" - krcJobNameAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/cronjob" - krCSIAddonsDriverAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/drivers" - - ErrConnNotFoundRequeueNeeded = errors.New("connection not found, requeue needed") - ErrScheduleNotFound = errors.New("schedule not found") - - csiAddonsStateAnnotation = csiaddonsv1alpha1.GroupVersion.Group + "/state" + rsCSIAddonsDriverAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/drivers" + krCSIAddonsDriverAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/drivers" ) const ( @@ -83,9 +73,6 @@ const ( relciamSpaceOp Operation = "reclaimspace" keyRotationOp Operation = "keyrotation" - - // Represents the CRs that are managed by the PVC controller - csiAddonsStateManaged = "managed" ) //+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;patch @@ -250,7 +237,7 @@ func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue( return "", err } - return "", ErrScheduleNotFound + return "", utils.ErrScheduleNotFound } // Check on PVC @@ -263,7 +250,7 @@ func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue( if schedule, err = r.getScheduleFromNS(ctx, pvc, logger, driverName, annotationKey); schedule != "" { return schedule, nil } - if !errors.Is(err, ErrScheduleNotFound) { + if !errors.Is(err, utils.ErrScheduleNotFound) { return "", err } @@ -276,7 +263,7 @@ func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue( } // If nothing matched, we did not find schedule - return "", ErrScheduleNotFound + return "", utils.ErrScheduleNotFound } // storageClassEventHandler returns an EventHandler that responds to changes @@ -305,9 +292,9 @@ func (r *PersistentVolumeClaimReconciler) storageClassEventHandler() handler.Eve } annotationsToWatch := []string{ - rsCronJobScheduleTimeAnnotation, - krcJobScheduleTimeAnnotation, - krEnableAnnotation, + utils.RsCronJobScheduleTimeAnnotation, + utils.KrcJobScheduleTimeAnnotation, + utils.KrEnableAnnotation, } var requests []reconcile.Request @@ -340,13 +327,13 @@ func (r *PersistentVolumeClaimReconciler) setupIndexers(mgr ctrl.Manager) error }{ { obj: &csiaddonsv1alpha1.ReclaimSpaceCronJob{}, - field: jobOwnerKey, - indexFn: extractOwnerNameFromPVCObj[*csiaddonsv1alpha1.ReclaimSpaceCronJob], + field: utils.JobOwnerKey, + indexFn: utils.ExtractOwnerNameFromPVCObj[*csiaddonsv1alpha1.ReclaimSpaceCronJob], }, { obj: &csiaddonsv1alpha1.EncryptionKeyRotationCronJob{}, - field: jobOwnerKey, - indexFn: extractOwnerNameFromPVCObj[*csiaddonsv1alpha1.EncryptionKeyRotationCronJob], + field: utils.JobOwnerKey, + indexFn: utils.ExtractOwnerNameFromPVCObj[*csiaddonsv1alpha1.EncryptionKeyRotationCronJob], }, { obj: &corev1.PersistentVolumeClaim{}, @@ -382,8 +369,8 @@ func (r *PersistentVolumeClaimReconciler) SetupWithManager(mgr ctrl.Manager, ctr return err } - pvcPred := createAnnotationPredicate(rsCronJobScheduleTimeAnnotation, krcJobScheduleTimeAnnotation, krEnableAnnotation) - scPred := createAnnotationPredicate(rsCronJobScheduleTimeAnnotation, krcJobScheduleTimeAnnotation, krEnableAnnotation) + pvcPred := createAnnotationPredicate(utils.RsCronJobScheduleTimeAnnotation, utils.KrcJobScheduleTimeAnnotation, utils.KrEnableAnnotation) + scPred := createAnnotationPredicate(utils.RsCronJobScheduleTimeAnnotation, utils.KrcJobScheduleTimeAnnotation, utils.KrEnableAnnotation) return ctrl.NewControllerManagedBy(mgr). For(&corev1.PersistentVolumeClaim{}). @@ -410,7 +397,7 @@ func (r *PersistentVolumeClaimReconciler) findChildCronJob( err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), - client.MatchingFields{jobOwnerKey: req.Name}) + client.MatchingFields{utils.JobOwnerKey: req.Name}) if err != nil { logger.Error(err, "failed to list child reclaimSpaceCronJobs") @@ -477,15 +464,15 @@ func getScheduleFromAnnotation( // constructKRCronJob constructs an EncryptionKeyRotationCronJob object func constructKRCronJob(name, namespace, schedule, pvcName string) *csiaddonsv1alpha1.EncryptionKeyRotationCronJob { - failedJobHistoryLimit := defaultFailedJobsHistoryLimit - successfulJobsHistoryLimit := defaultSuccessfulJobsHistoryLimit + failedJobHistoryLimit := utils.DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := utils.DefaultSuccessfulJobsHistoryLimit return &csiaddonsv1alpha1.EncryptionKeyRotationCronJob{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.EncryptionKeyRotationCronJobSpec{ @@ -507,15 +494,15 @@ func constructKRCronJob(name, namespace, schedule, pvcName string) *csiaddonsv1a // constructRSCronJob constructs a ReclaimSpaceCronJob object func constructRSCronJob(name, namespace, schedule, pvcName string) *csiaddonsv1alpha1.ReclaimSpaceCronJob { - failedJobsHistoryLimit := defaultFailedJobsHistoryLimit - successfulJobsHistoryLimit := defaultSuccessfulJobsHistoryLimit + failedJobsHistoryLimit := utils.DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := utils.DefaultSuccessfulJobsHistoryLimit return &csiaddonsv1alpha1.ReclaimSpaceCronJob{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{ @@ -533,25 +520,6 @@ func constructRSCronJob(name, namespace, schedule, pvcName string) *csiaddonsv1a } } -// extractOwnerNameFromPVCObj extracts owner.Name from the object if it is -// of type `T` and has a PVC as its owner. -func extractOwnerNameFromPVCObj[T client.Object](rawObj client.Object) []string { - // extract the owner from job object. - job, ok := rawObj.(T) - if !ok { - return nil - } - owner := metav1.GetControllerOf(job) - if owner == nil { - return nil - } - if owner.APIVersion != "v1" || owner.Kind != "PersistentVolumeClaim" { - return nil - } - - return []string{owner.Name} -} - // generateCronJobName returns unique name by suffixing parent name // with time hash. func generateCronJobName(parentName string) string { @@ -611,17 +579,17 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace( } if rsCronJob != nil { *logger = logger.WithValues("ReclaimSpaceCronJobName", rsCronJob.Name) - if state, ok := rsCronJob.GetAnnotations()[csiAddonsStateAnnotation]; ok && state != csiAddonsStateManaged { + if state, ok := rsCronJob.GetAnnotations()[utils.CSIAddonsStateAnnotation]; ok && state != utils.CSIAddonsStateManaged { logger.Info("ReclaimSpaceCronJob is not managed, exiting reconcile") return ctrl.Result{}, nil } } - schedule, err := r.determineScheduleAndRequeue(ctx, logger, pvc, pv.Spec.CSI.Driver, rsCronJobScheduleTimeAnnotation) - if errors.Is(err, ErrConnNotFoundRequeueNeeded) { + schedule, err := r.determineScheduleAndRequeue(ctx, logger, pvc, pv.Spec.CSI.Driver, utils.RsCronJobScheduleTimeAnnotation) + if errors.Is(err, utils.ErrConnNotFoundRequeueNeeded) { return ctrl.Result{Requeue: true}, nil } - if errors.Is(err, ErrScheduleNotFound) { + if errors.Is(err, utils.ErrScheduleNotFound) { // if schedule is not found, // delete cron job. if rsCronJob != nil { @@ -631,10 +599,10 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace( } } // delete name from annotation. - _, nameFound := pvc.Annotations[rsCronJobNameAnnotation] + _, nameFound := pvc.Annotations[utils.RsCronJobNameAnnotation] if nameFound { // remove name annotation by patching it to null. - patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{%q: null}}}`, rsCronJobNameAnnotation)) + patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{%q: null}}}`, utils.RsCronJobNameAnnotation)) err = r.Patch(ctx, pvc, client.RawPatch(types.StrategicMergePatchType, patch)) if err != nil { logger.Error(err, "Failed to remove annotation") @@ -671,7 +639,7 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace( // Update schedule on the pvc err = r.patchAnnotationsToResource(ctx, logger, map[string]string{ - rsCronJobScheduleTimeAnnotation: schedule, + utils.RsCronJobScheduleTimeAnnotation: schedule, }, pvc) if err != nil { return ctrl.Result{}, err @@ -686,8 +654,8 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace( // adding annotation is required for the case when pvc does not have // have schedule annotation but namespace has. err = r.patchAnnotationsToResource(ctx, logger, map[string]string{ - rsCronJobNameAnnotation: rsCronJobName, - rsCronJobScheduleTimeAnnotation: schedule, + utils.RsCronJobNameAnnotation: rsCronJobName, + utils.RsCronJobScheduleTimeAnnotation: schedule, }, pvc) if err != nil { return ctrl.Result{}, err @@ -721,7 +689,7 @@ func (r *PersistentVolumeClaimReconciler) findChildEncryptionKeyRotationCronJob( var childJobs csiaddonsv1alpha1.EncryptionKeyRotationCronJobList var activeJob *csiaddonsv1alpha1.EncryptionKeyRotationCronJob - err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}) + err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{utils.JobOwnerKey: req.Name}) if err != nil { logger.Error(err, "failed to list child encryptionkeyrotationcronjobs") return activeJob, fmt.Errorf("failed to list encryptionkeyrotationcronjobs: %v", err) @@ -760,13 +728,13 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation( } if krcJob != nil { *logger = logger.WithValues("EncryptionKeyrotationCronJobName", krcJob.Name) - if state, ok := krcJob.GetAnnotations()[csiAddonsStateAnnotation]; ok && state != csiAddonsStateManaged { + if state, ok := krcJob.GetAnnotations()[utils.CSIAddonsStateAnnotation]; ok && state != utils.CSIAddonsStateManaged { logger.Info("EncryptionKeyRotationCronJob is not managed, exiting reconcile") return nil } } - disabled, err := r.checkDisabledByAnnotation(ctx, logger, pvc, krEnableAnnotation) + disabled, err := r.checkDisabledByAnnotation(ctx, logger, pvc, utils.KrEnableAnnotation) if err != nil { return err } @@ -786,8 +754,8 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation( } // Determine schedule - sched, err := r.determineScheduleAndRequeue(ctx, logger, pvc, pv.Spec.CSI.Driver, krcJobScheduleTimeAnnotation) - if errors.Is(err, ErrScheduleNotFound) { + sched, err := r.determineScheduleAndRequeue(ctx, logger, pvc, pv.Spec.CSI.Driver, utils.KrcJobScheduleTimeAnnotation) + if errors.Is(err, utils.ErrScheduleNotFound) { // No schedule, delete the job if krcJob != nil { err = r.Delete(ctx, krcJob) @@ -825,7 +793,7 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation( // update the schedule on the pvc err = r.patchAnnotationsToResource(ctx, logger, map[string]string{ - krcJobScheduleTimeAnnotation: sched, + utils.KrcJobScheduleTimeAnnotation: sched, }, pvc) if err != nil { return err @@ -836,8 +804,8 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation( // Add the annotation to the pvc, this will help us optimize reconciles krcJobName := generateCronJobName(req.Name) err = r.patchAnnotationsToResource(ctx, logger, map[string]string{ - krcJobNameAnnotation: krcJobName, - krcJobScheduleTimeAnnotation: sched, + utils.KrcJobNameAnnotation: krcJobName, + utils.KrcJobScheduleTimeAnnotation: sched, }, pvc) if err != nil { return err @@ -878,20 +846,6 @@ func annotationValueMissingOrDiff(scAnnotations, pvcAnnotations map[string]strin return false } -// AnnotationValueChanged checks if any of the specified keys have different values -// between the old and new annotations maps. -func annotationValueChanged(oldAnnotations, newAnnotations map[string]string, keys []string) bool { - for _, key := range keys { - oldVal, oldExists := oldAnnotations[key] - newVal, newExists := newAnnotations[key] - - if oldExists != newExists || oldVal != newVal { - return true - } - } - return false -} - // CreateAnnotationPredicate returns a predicate.Funcs that checks if any of the specified // annotation keys have different values between the old and new annotations maps. func createAnnotationPredicate(annotations ...string) predicate.Funcs { @@ -904,7 +858,7 @@ func createAnnotationPredicate(annotations ...string) predicate.Funcs { oldAnnotations := e.ObjectOld.GetAnnotations() newAnnotations := e.ObjectNew.GetAnnotations() - return annotationValueChanged(oldAnnotations, newAnnotations, annotations) + return utils.AnnotationValueChanged(oldAnnotations, newAnnotations, annotations) }, } } @@ -923,7 +877,7 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromSC( if err != nil { if apierrors.IsNotFound(err) { logger.Error(err, "StorageClass not found", "StorageClass", storageClassName) - return "", ErrScheduleNotFound + return "", utils.ErrScheduleNotFound } logger.Error(err, "Failed to get StorageClass", "StorageClass", storageClassName) @@ -935,7 +889,7 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromSC( } } - return "", ErrScheduleNotFound + return "", utils.ErrScheduleNotFound } func (r *PersistentVolumeClaimReconciler) getScheduleFromNS( @@ -966,7 +920,7 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromNS( // requeuing the request. // Depending on requeue value, it will return ErrorConnNotFoundRequeueNeeded. switch annotationKey { - case krcJobScheduleTimeAnnotation: + case utils.KrcJobScheduleTimeAnnotation: requeue, keyRotationSupported, err := r.checkDriverSupportCapability(logger, ns.Annotations, driverName, keyRotationOp) if err != nil { return "", err @@ -975,9 +929,9 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromNS( return schedule, nil } if requeue { - return "", ErrConnNotFoundRequeueNeeded + return "", utils.ErrConnNotFoundRequeueNeeded } - case rsCronJobScheduleTimeAnnotation: + case utils.RsCronJobScheduleTimeAnnotation: requeue, supportReclaimspace, err := r.checkDriverSupportCapability(logger, ns.Annotations, driverName, relciamSpaceOp) if err != nil { return "", err @@ -990,7 +944,7 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromNS( if requeue { // The request needs to be requeued for checking // driver support again. - return "", ErrConnNotFoundRequeueNeeded + return "", utils.ErrConnNotFoundRequeueNeeded } default: logger.Info("Unknown annotation key", "AnnotationKey", annotationKey) @@ -998,7 +952,7 @@ func (r *PersistentVolumeClaimReconciler) getScheduleFromNS( } } - return "", ErrScheduleNotFound + return "", utils.ErrScheduleNotFound } func (r *PersistentVolumeClaimReconciler) getScheduleFromPVC( diff --git a/internal/controller/csiaddons/persistentvolumeclaim_controller_test.go b/internal/controller/csiaddons/persistentvolumeclaim_controller_test.go index 1169adeb5..21bd163b2 100644 --- a/internal/controller/csiaddons/persistentvolumeclaim_controller_test.go +++ b/internal/controller/csiaddons/persistentvolumeclaim_controller_test.go @@ -23,6 +23,7 @@ import ( csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/go-logr/logr" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -128,7 +129,7 @@ func TestExtractOwnerNameFromPVCObj(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := extractOwnerNameFromPVCObj[*csiaddonsv1alpha1.ReclaimSpaceCronJob](tt.args.rawObj) + got := utils.ExtractOwnerNameFromPVCObj[*csiaddonsv1alpha1.ReclaimSpaceCronJob](tt.args.rawObj) assert.Equal(t, tt.want, got) }) } @@ -185,7 +186,7 @@ func TestGetScheduleFromAnnotation(t *testing.T) { args: args{ logger: &logger, annotations: map[string]string{ - rsCronJobScheduleTimeAnnotation: "@weekly", + utils.RsCronJobScheduleTimeAnnotation: "@weekly", }, }, want: "@weekly", @@ -196,7 +197,7 @@ func TestGetScheduleFromAnnotation(t *testing.T) { args: args{ logger: &logger, annotations: map[string]string{ - rsCronJobScheduleTimeAnnotation: "@daytime", + utils.RsCronJobScheduleTimeAnnotation: "@daytime", }, }, want: defaultSchedule, @@ -205,7 +206,7 @@ func TestGetScheduleFromAnnotation(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1 := getScheduleFromAnnotation(rsCronJobScheduleTimeAnnotation, tt.args.logger, tt.args.annotations) + got, got1 := getScheduleFromAnnotation(utils.RsCronJobScheduleTimeAnnotation, tt.args.logger, tt.args.annotations) assert.Equal(t, tt.want, got) assert.Equal(t, tt.want1, got1) }) @@ -226,22 +227,22 @@ func TestDetermineScheduleAndRequeue(t *testing.T) { { name: "pvc annotation set", args: args{ - pvcAnnotations: map[string]string{rsCronJobScheduleTimeAnnotation: "@daily"}, + pvcAnnotations: map[string]string{utils.RsCronJobScheduleTimeAnnotation: "@daily"}, }, want: "@daily", }, { name: "sc annotation set", args: args{ - scAnnotations: map[string]string{rsCronJobScheduleTimeAnnotation: "@monthly"}, + scAnnotations: map[string]string{utils.RsCronJobScheduleTimeAnnotation: "@monthly"}, }, want: "@monthly", }, { name: "pvc & sc annotation set", args: args{ - pvcAnnotations: map[string]string{rsCronJobScheduleTimeAnnotation: "@daily"}, - scAnnotations: map[string]string{rsCronJobScheduleTimeAnnotation: "@weekly"}, + pvcAnnotations: map[string]string{utils.RsCronJobScheduleTimeAnnotation: "@daily"}, + scAnnotations: map[string]string{utils.RsCronJobScheduleTimeAnnotation: "@weekly"}, }, want: "@daily", }, @@ -299,7 +300,7 @@ func TestDetermineScheduleAndRequeue(t *testing.T) { err = r.Update(ctx, pvc) assert.NoError(t, err) - schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, rsCronJobScheduleTimeAnnotation) + schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, utils.RsCronJobScheduleTimeAnnotation) assert.NoError(t, error) assert.Equal(t, tt.want, schedule) }) @@ -309,8 +310,8 @@ func TestDetermineScheduleAndRequeue(t *testing.T) { emptyScName := "" pvc.Spec.StorageClassName = &emptyScName pvc.Annotations = nil - schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, rsCronJobScheduleTimeAnnotation) - assert.ErrorIs(t, error, ErrScheduleNotFound) + schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, utils.RsCronJobScheduleTimeAnnotation) + assert.ErrorIs(t, error, utils.ErrScheduleNotFound) assert.Equal(t, "", schedule) }) @@ -319,8 +320,8 @@ func TestDetermineScheduleAndRequeue(t *testing.T) { sc.Name = "non-existent-sc" pvc.Spec.StorageClassName = &sc.Name pvc.Annotations = nil - schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, rsCronJobScheduleTimeAnnotation) - assert.ErrorIs(t, error, ErrScheduleNotFound) + schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, utils.RsCronJobScheduleTimeAnnotation) + assert.ErrorIs(t, error, utils.ErrScheduleNotFound) assert.Equal(t, "", schedule) }) @@ -328,81 +329,12 @@ func TestDetermineScheduleAndRequeue(t *testing.T) { t.Run("StorageClassName is nil", func(t *testing.T) { pvc.Spec.StorageClassName = nil pvc.Annotations = nil - schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, rsCronJobScheduleTimeAnnotation) - assert.ErrorIs(t, error, ErrScheduleNotFound) + schedule, error := r.determineScheduleAndRequeue(ctx, &logger, pvc, driverName, utils.RsCronJobScheduleTimeAnnotation) + assert.ErrorIs(t, error, utils.ErrScheduleNotFound) assert.Equal(t, "", schedule) }) } -func TestAnnotationValueChanged(t *testing.T) { - tests := []struct { - name string - oldAnnotations map[string]string - newAnnotations map[string]string - keys []string - expected bool - }{ - { - name: "No changes", - oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, - newAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, - keys: []string{"key1", "key2"}, - expected: false, - }, - { - name: "Value changed", - oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, - newAnnotations: map[string]string{"key1": "value1", "key2": "newvalue2"}, - keys: []string{"key1", "key2"}, - expected: true, - }, - { - name: "Key added", - oldAnnotations: map[string]string{"key1": "value1"}, - newAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, - keys: []string{"key1", "key2"}, - expected: true, - }, - { - name: "Key removed", - oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, - newAnnotations: map[string]string{"key1": "value1"}, - keys: []string{"key1", "key2"}, - expected: true, - }, - { - name: "Change in non-specified key", - oldAnnotations: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, - newAnnotations: map[string]string{"key1": "value1", "key2": "value2", "key3": "newvalue3"}, - keys: []string{"key1", "key2"}, - expected: false, - }, - { - name: "Empty keys slice", - oldAnnotations: map[string]string{"key1": "value1"}, - newAnnotations: map[string]string{"key1": "newvalue1"}, - keys: []string{}, - expected: false, - }, - { - name: "Nil maps", - oldAnnotations: nil, - newAnnotations: nil, - keys: []string{"key1"}, - expected: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := annotationValueChanged(tt.oldAnnotations, tt.newAnnotations, tt.keys) - if result != tt.expected { - t.Errorf("AnnotationValueChanged() = %v, want %v", result, tt.expected) - } - }) - } -} - func TestCreateAnnotationPredicate(t *testing.T) { tests := []struct { name string @@ -470,8 +402,8 @@ func TestCreateAnnotationPredicate(t *testing.T) { } func TestConstructKRCronJob(t *testing.T) { - failedJobHistoryLimit := defaultFailedJobsHistoryLimit - successfulJobsHistoryLimit := defaultSuccessfulJobsHistoryLimit + failedJobHistoryLimit := utils.DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := utils.DefaultSuccessfulJobsHistoryLimit tests := []struct { name string cronName string @@ -491,7 +423,7 @@ func TestConstructKRCronJob(t *testing.T) { Name: "test-kr-cron", Namespace: "default", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.EncryptionKeyRotationCronJobSpec{ @@ -521,7 +453,7 @@ func TestConstructKRCronJob(t *testing.T) { Name: "empty-schedule-cron", Namespace: "kube-system", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.EncryptionKeyRotationCronJobSpec{ @@ -551,7 +483,7 @@ func TestConstructKRCronJob(t *testing.T) { Name: "special-!@#$%^&*()-cron", Namespace: "test-ns", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.EncryptionKeyRotationCronJobSpec{ @@ -580,8 +512,8 @@ func TestConstructKRCronJob(t *testing.T) { } } func TestConstructRSCronJob(t *testing.T) { - failedJobHistoryLimit := defaultFailedJobsHistoryLimit - successfulJobsHistoryLimit := defaultSuccessfulJobsHistoryLimit + failedJobHistoryLimit := utils.DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := utils.DefaultSuccessfulJobsHistoryLimit tests := []struct { name string cronName string @@ -601,7 +533,7 @@ func TestConstructRSCronJob(t *testing.T) { Name: "test-rs-cron", Namespace: "default", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{ @@ -631,7 +563,7 @@ func TestConstructRSCronJob(t *testing.T) { Name: "empty-schedule-cron", Namespace: "kube-system", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{ @@ -661,7 +593,7 @@ func TestConstructRSCronJob(t *testing.T) { Name: "special-!@#$%^&*()-cron", Namespace: "test-ns", Annotations: map[string]string{ - csiAddonsStateAnnotation: csiAddonsStateManaged, + utils.CSIAddonsStateAnnotation: utils.CSIAddonsStateManaged, }, }, Spec: csiaddonsv1alpha1.ReclaimSpaceCronJobSpec{ diff --git a/internal/controller/csiaddons/pvc_new_controller.go b/internal/controller/csiaddons/pvc_new_controller.go new file mode 100644 index 000000000..5d2987c65 --- /dev/null +++ b/internal/controller/csiaddons/pvc_new_controller.go @@ -0,0 +1,263 @@ +/* +Copyright 2025 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" + "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type PVCReconiler struct { + client.Client + Scheme *runtime.Scheme + + // ConnectionPool consists of map of Connection objects. + ConnPool *connection.ConnectionPool +} + +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;patch +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims/finalizers,verbs=update +//+kubebuilder:rbac:groups=csiaddons.openshift.io,resources=reclaimspacecronjobs,verbs=get;list;watch;create;delete;update +//+kubebuilder:rbac:groups=csiaddons.openshift.io,resources=encryptionkeyrotationcronjobs,verbs=get;list;watch;create;delete;update +//+kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch +//+kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch + +func (r *PVCReconiler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Fetch the PVC + pvc := &corev1.PersistentVolumeClaim{} + if err := r.Get(ctx, req.NamespacedName, pvc); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Ignore if being deleted + if !pvc.DeletionTimestamp.IsZero() { + logger.Info("PVC is being deleted, exiting reconciliation", "PVCInfo", req.NamespacedName) + + return ctrl.Result{}, nil + } + + logger.Info("Reconciling PVC", "PVCInfo", req.NamespacedName) + + // The PVC must be in a bound state, if not, we requeue + if pvc.Status.Phase != corev1.ClaimBound { + logger.Info("PVC is not yet bound, requeue the request", "PVCInfo", req.NamespacedName) + + return ctrl.Result{Requeue: true}, nil + } + + // Fetch the PV and check if it is CSI provisioned, if not, do nothing + // TODO: Shall we also check if it is matches `csi.ceph`? + pv := &corev1.PersistentVolume{} + if err := r.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName}, pv); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Must be CSI provisioned to continue + if pv.Spec.CSI == nil { + logger.Info("PVC is not CSI provisioned, exiting reconciliation", "PVCInfo", req.NamespacedName) + + return ctrl.Result{}, nil + } + + // Should not be a static PVC + if !hasValidStorageClassName(pvc) { + logger.Info("The PVC is statically provisioned, exiting reconciliation", "PVCInfo", req.NamespacedName) + + return ctrl.Result{}, nil + } + + // Now we fetch the StorageClass + sc := &storagev1.StorageClass{} + if err := r.Get(ctx, types.NamespacedName{Name: *pvc.Spec.StorageClassName}, sc); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Reconcile for dependent features + // Reconcile - Key rotation + keyRotationName := fmt.Sprintf("%s-keyrotation", pvc.Name) + keyRotationSched := sc.Annotations[utils.KrcJobScheduleTimeAnnotation] + keyRotationEnabled := sc.Annotations[utils.KrEnableAnnotation] + keyRotationChild := &csiaddonsv1alpha1.EncryptionKeyRotationCronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: keyRotationName, + Namespace: pvc.Namespace, + }, + } + + // FIXME: This is a shim and should be removed in later releases + // along with the field indexers on child objects + if requeue, err := utils.CleanOldJobs(ctx, + r.Client, + logger, + req, + &csiaddonsv1alpha1.EncryptionKeyRotationCronJobList{}, + keyRotationName); err != nil || requeue { + return ctrl.Result{Requeue: requeue}, err + } + + if err := r.reoncileFeature(ctx, logger, pvc, keyRotationChild, keyRotationSched, keyRotationEnabled); err != nil { + return ctrl.Result{}, err + } + + // Reconcile - Reclaim space + reclaimSpaceName := fmt.Sprintf("%s-reclaimspace", pvc.Name) + reclaimSpaceSched := sc.Annotations[utils.RsCronJobScheduleTimeAnnotation] + reclaimSpaceChild := &csiaddonsv1alpha1.ReclaimSpaceCronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: reclaimSpaceName, + Namespace: pvc.Namespace, + }, + } + + // FIXME: This is a shim and should be removed in later releases + // along with the field indexers on child objects + if requeue, err := utils.CleanOldJobs(ctx, + r.Client, + logger, + req, + &csiaddonsv1alpha1.ReclaimSpaceCronJobList{}, + reclaimSpaceName); err != nil || requeue { + return ctrl.Result{Requeue: requeue}, err + } + + if err := r.reoncileFeature(ctx, logger, pvc, reclaimSpaceChild, reclaimSpaceSched, "true"); err != nil { + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (r *PVCReconiler) reoncileFeature( + ctx context.Context, + logger logr.Logger, + pvc *corev1.PersistentVolumeClaim, + childObj client.Object, + schedule string, + enabledVal string, +) error { + defer logger.Info("Completed reconcile for feature") + + // Determine if the object should exists in the cluster + // Note: we do not return early if feature is disabled as we might need to garbage collect + shouldExist := (schedule != "") && (enabledVal != "false") + logger.Info("Reconciling feature for child object with values", "childObj", childObj, "schedule", schedule, "isEnabled", enabledVal, "shouldExist", shouldExist) + + // Now check if it actually exists in the cluster + exists := true + existingObj := childObj.DeepCopyObject().(client.Object) + if err := r.Get(ctx, types.NamespacedName{Name: childObj.GetName(), Namespace: childObj.GetNamespace()}, existingObj); err != nil { + if apierrors.IsNotFound(err) { + exists = false + } else { + return err + } + } + logger.Info("determined the state to reconcile with", "shouldExist", shouldExist, "exists", exists) + + // Object should not be present in the cluster, garbage collect or return + if !shouldExist { + if exists { + logger.Info("Deleting the undersired object from the cluster", "childObj", existingObj) + return client.IgnoreNotFound(r.Delete(ctx, existingObj)) + } + + return nil + } + + // We reached here, means we need to either create or update the object in cluster + // -- Create + if !exists { + // Set the required fields on the bare bones object + utils.SetSpec(childObj, schedule, pvc.Name) + + // Set controller reference + if err := ctrl.SetControllerReference(pvc, childObj, r.Scheme); err != nil { + return err + } + + logger.Info("creating a new object in the cluster", "newObj", childObj) + + // We use deterministic names for the resources we create + // This saves us additional listing and managing of stale resources + return client.IgnoreAlreadyExists(r.Create(ctx, childObj)) + } + + // -- Update + currentSched := utils.GetSchedule(existingObj) + if currentSched != schedule { + // Update and set spec + utils.SetSpec(existingObj, schedule, pvc.Name) + + // Update the object in the cluster + logger.Info("calling update for new schedule on object", "newObj", childObj, "currentSchedule", currentSched, "desiredSchedule", schedule) + return r.Update(ctx, existingObj) + } + + return nil +} + +func (r *PVCReconiler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error { + // Setup the required indexers to optimize lookups + if err := utils.SetupPVCControllerIndexers(mgr); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + // Primary source + For(&corev1.PersistentVolumeClaim{}). + + // Secondary sources + Owns(&csiaddonsv1alpha1.ReclaimSpaceCronJob{}). + Owns(&csiaddonsv1alpha1.EncryptionKeyRotationCronJob{}). + + // Watch the storageclass and fan out according to predicates + Watches( + &storagev1.StorageClass{}, + handler.EnqueueRequestsFromMapFunc(utils.ScMapFunc(r.Client)), + builder.WithPredicates(utils.StorageClassPredicate()), + ). + + // Watch the PVs silently, this is to avoid client/server rate limits + // And to make Get/List O(1) + Watches( + &corev1.PersistentVolume{}, + &handler.EnqueueRequestForObject{}, // This handler is never called due to `silentPredicate` + builder.WithPredicates(utils.SilentPredicate()), + ). + WithOptions(ctrlOptions). + Complete(r) +} diff --git a/internal/controller/csiaddons/reclaimspacecronjob_controller.go b/internal/controller/csiaddons/reclaimspacecronjob_controller.go index 13d8fcf0b..43b11f214 100644 --- a/internal/controller/csiaddons/reclaimspacecronjob_controller.go +++ b/internal/controller/csiaddons/reclaimspacecronjob_controller.go @@ -23,6 +23,7 @@ import ( "time" csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/go-logr/logr" "github.com/robfig/cron/v3" @@ -42,13 +43,6 @@ type ReclaimSpaceCronJobReconciler struct { Scheme *runtime.Scheme } -const ( - jobOwnerKey = ".metadata.controller" - // default values for Spec parameters. - defaultFailedJobsHistoryLimit int32 = 1 - defaultSuccessfulJobsHistoryLimit int32 = 3 -) - var ( apiGVStr = csiaddonsv1alpha1.GroupVersion.String() scheduledTimeAnnotation = csiaddonsv1alpha1.GroupVersion.Group + "/scheduled-at" @@ -88,14 +82,14 @@ func (r *ReclaimSpaceCronJobReconciler) Reconcile(ctx context.Context, req ctrl. // set history limit defaults, if not specified. if rsCronJob.Spec.FailedJobsHistoryLimit == nil { - *rsCronJob.Spec.FailedJobsHistoryLimit = defaultFailedJobsHistoryLimit + *rsCronJob.Spec.FailedJobsHistoryLimit = utils.DefaultFailedJobsHistoryLimit } if rsCronJob.Spec.SuccessfulJobsHistoryLimit == nil { - *rsCronJob.Spec.SuccessfulJobsHistoryLimit = defaultSuccessfulJobsHistoryLimit + *rsCronJob.Spec.SuccessfulJobsHistoryLimit = utils.DefaultSuccessfulJobsHistoryLimit } var childJobs csiaddonsv1alpha1.ReclaimSpaceJobList - err = r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}) + err = r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{utils.JobOwnerKey: req.Name}) if err != nil { logger.Error(err, "Failed to list child ReclaimSpaceJobs") return ctrl.Result{}, err @@ -198,7 +192,7 @@ func (r *ReclaimSpaceCronJobReconciler) Reconcile(ctx context.Context, req ctrl. // SetupWithManager sets up the controller with the Manager. func (r *ReclaimSpaceCronJobReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error { - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &csiaddonsv1alpha1.ReclaimSpaceJob{}, jobOwnerKey, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &csiaddonsv1alpha1.ReclaimSpaceJob{}, utils.JobOwnerKey, func(rawObj client.Object) []string { // extract the owner from job object. job, ok := rawObj.(*csiaddonsv1alpha1.ReclaimSpaceJob) if !ok { diff --git a/internal/controller/csiaddons/reclaimspacejob_controller.go b/internal/controller/csiaddons/reclaimspacejob_controller.go index 9c07e176e..a15d9ffbc 100644 --- a/internal/controller/csiaddons/reclaimspacejob_controller.go +++ b/internal/controller/csiaddons/reclaimspacejob_controller.go @@ -25,6 +25,7 @@ import ( csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + "github.com/csi-addons/kubernetes-csi-addons/internal/controller/utils" "github.com/csi-addons/kubernetes-csi-addons/internal/proto" "github.com/csi-addons/kubernetes-csi-addons/internal/util" @@ -40,8 +41,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -164,6 +167,12 @@ func (r *ReclaimSpaceJobReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptio return ctrl.NewControllerManagedBy(mgr). For(&csiaddonsv1alpha1.ReclaimSpaceJob{}). WithEventFilter(predicate.GenerationChangedPredicate{}). + // This is to avoid "stop-the-world" events and wait for cache sync when we list VA + Watches( + &scv1.VolumeAttachment{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(utils.SilentPredicate()), + ). WithOptions(ctrlOptions). Complete(r) } diff --git a/internal/controller/utils/annotations.go b/internal/controller/utils/annotations.go new file mode 100644 index 000000000..a533e77ec --- /dev/null +++ b/internal/controller/utils/annotations.go @@ -0,0 +1,39 @@ +package utils + +import ( + csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" +) + +var ( + RsCronJobScheduleTimeAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/schedule" + RsCronJobNameAnnotation = "reclaimspace." + csiaddonsv1alpha1.GroupVersion.Group + "/cronjob" + + KrEnableAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/enable" + KrcJobScheduleTimeAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/schedule" + KrcJobNameAnnotation = "keyrotation." + csiaddonsv1alpha1.GroupVersion.Group + "/cronjob" + + CSIAddonsStateAnnotation = csiaddonsv1alpha1.GroupVersion.Group + "/state" +) + +const ( + // Index keys + StorageClassIndex = "spec.storageClassName" + JobOwnerKey = ".metadata.controller" + + // Represents the CRs that are managed by the PVC controller + CSIAddonsStateManaged = "managed" +) + +// AnnotationValueChanged checks if any of the specified keys have different values +// between the old and new annotations maps. +func AnnotationValueChanged(oldAnnotations, newAnnotations map[string]string, keys []string) bool { + for _, key := range keys { + oldVal, oldExists := oldAnnotations[key] + newVal, newExists := newAnnotations[key] + + if oldExists != newExists || oldVal != newVal { + return true + } + } + return false +} diff --git a/internal/controller/utils/annotations_test.go b/internal/controller/utils/annotations_test.go new file mode 100644 index 000000000..610f7db0c --- /dev/null +++ b/internal/controller/utils/annotations_test.go @@ -0,0 +1,72 @@ +package utils + +import "testing" + +func TestAnnotationValueChanged(t *testing.T) { + tests := []struct { + name string + oldAnnotations map[string]string + newAnnotations map[string]string + keys []string + expected bool + }{ + { + name: "No changes", + oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + newAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + keys: []string{"key1", "key2"}, + expected: false, + }, + { + name: "Value changed", + oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + newAnnotations: map[string]string{"key1": "value1", "key2": "newvalue2"}, + keys: []string{"key1", "key2"}, + expected: true, + }, + { + name: "Key added", + oldAnnotations: map[string]string{"key1": "value1"}, + newAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + keys: []string{"key1", "key2"}, + expected: true, + }, + { + name: "Key removed", + oldAnnotations: map[string]string{"key1": "value1", "key2": "value2"}, + newAnnotations: map[string]string{"key1": "value1"}, + keys: []string{"key1", "key2"}, + expected: true, + }, + { + name: "Change in non-specified key", + oldAnnotations: map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, + newAnnotations: map[string]string{"key1": "value1", "key2": "value2", "key3": "newvalue3"}, + keys: []string{"key1", "key2"}, + expected: false, + }, + { + name: "Empty keys slice", + oldAnnotations: map[string]string{"key1": "value1"}, + newAnnotations: map[string]string{"key1": "newvalue1"}, + keys: []string{}, + expected: false, + }, + { + name: "Nil maps", + oldAnnotations: nil, + newAnnotations: nil, + keys: []string{"key1"}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := AnnotationValueChanged(tt.oldAnnotations, tt.newAnnotations, tt.keys) + if result != tt.expected { + t.Errorf("AnnotationValueChanged() = %v, want %v", result, tt.expected) + } + }) + } +} diff --git a/internal/controller/utils/predicates.go b/internal/controller/utils/predicates.go new file mode 100644 index 000000000..8400648e8 --- /dev/null +++ b/internal/controller/utils/predicates.go @@ -0,0 +1,122 @@ +package utils + +import ( + "context" + + csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SilentPredicate returns a predicate that rejects every event. +func SilentPredicate() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return false }, + UpdateFunc: func(e event.UpdateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +} + +func StorageClassPredicate() predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldSC, oldOk := e.ObjectOld.(*storagev1.StorageClass) + newSC, newOk := e.ObjectNew.(*storagev1.StorageClass) + if !oldOk || !newOk { + return false + } + + // Only trigger if relevant annotations change + relevantAnnotations := []string{ + KrcJobScheduleTimeAnnotation, + KrEnableAnnotation, + RsCronJobScheduleTimeAnnotation, + } + + return AnnotationValueChanged(oldSC.GetAnnotations(), newSC.GetAnnotations(), relevantAnnotations) + }, + CreateFunc: func(e event.CreateEvent) bool { return true }, + } +} + +func ScMapFunc(r client.Client) handler.MapFunc { + return func(ctx context.Context, obj client.Object) []reconcile.Request { + ok := false + sc, ok := obj.(*storagev1.StorageClass) + if !ok { + return nil + } + + // List all PVCs using this SC + pvcList := &corev1.PersistentVolumeClaimList{} + if err := r.List(ctx, pvcList, client.MatchingFields{StorageClassIndex: sc.Name}); err != nil { + return nil + } + + requests := make([]reconcile.Request, len(pvcList.Items)) + for i, pvc := range pvcList.Items { + requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}} + } + return requests + } +} + +// SetupPVCControllerIndexers adds field indexers to the manager to optimize lookups. +// +// The following indexers are set: +// - PersistentVolumeClaims - indexed by spec.storageClassName +func SetupPVCControllerIndexers(mgr ctrl.Manager) error { + indices := []struct { + obj client.Object + field string + indexFn client.IndexerFunc + }{ + { + // Map PVC to SC + obj: &corev1.PersistentVolumeClaim{}, + field: StorageClassIndex, + indexFn: func(rawObj client.Object) []string { + pvc, ok := rawObj.(*corev1.PersistentVolumeClaim) + if !ok || (pvc.Spec.StorageClassName == nil) || len(*pvc.Spec.StorageClassName) == 0 { + return nil + } + return []string{*pvc.Spec.StorageClassName} + }, + }, + // FIXME: Remove this shim in later releases + { + obj: &csiaddonsv1alpha1.ReclaimSpaceCronJob{}, + field: JobOwnerKey, + indexFn: ExtractOwnerNameFromPVCObj[*csiaddonsv1alpha1.ReclaimSpaceCronJob], + }, + // FIXME: Remove this shim in later releases + { + obj: &csiaddonsv1alpha1.EncryptionKeyRotationCronJob{}, + field: JobOwnerKey, + indexFn: ExtractOwnerNameFromPVCObj[*csiaddonsv1alpha1.EncryptionKeyRotationCronJob], + }, + } + + for _, index := range indices { + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), + index.obj, + index.field, + index.indexFn, + ); err != nil { + return err + } + } + + return nil + +} diff --git a/internal/controller/utils/reclaimspace_keyrotation_utils.go b/internal/controller/utils/reclaimspace_keyrotation_utils.go new file mode 100644 index 000000000..7d6ac8b5c --- /dev/null +++ b/internal/controller/utils/reclaimspace_keyrotation_utils.go @@ -0,0 +1,164 @@ +package utils + +import ( + "context" + "errors" + + csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + DefaultFailedJobsHistoryLimit int32 = 1 + DefaultSuccessfulJobsHistoryLimit int32 = 3 + + DefaultBackoffLimit = 6 + DefaultRetryDeadlineSeconds = 600 +) + +var ( + ErrConnNotFoundRequeueNeeded = errors.New("connection not found, requeue needed") + ErrScheduleNotFound = errors.New("schedule not found") +) + +func setKeyrotationSpec(v *csiaddonsv1alpha1.EncryptionKeyRotationCronJob, schedule, pvcName string) { + failedJobsHistoryLimit := DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := DefaultSuccessfulJobsHistoryLimit + + if v.Annotations == nil { + v.Annotations = map[string]string{} + } + v.Annotations[CSIAddonsStateAnnotation] = CSIAddonsStateManaged + + v.Spec.Schedule = schedule + v.Spec.FailedJobsHistoryLimit = &failedJobsHistoryLimit + v.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit + + v.Spec.JobSpec = csiaddonsv1alpha1.EncryptionKeyRotationJobTemplateSpec{ + Spec: csiaddonsv1alpha1.EncryptionKeyRotationJobSpec{ + Target: csiaddonsv1alpha1.TargetSpec{PersistentVolumeClaim: pvcName}, + BackoffLimit: DefaultBackoffLimit, + RetryDeadlineSeconds: DefaultRetryDeadlineSeconds, + }, + } +} + +func setReclaimspaceSpec(v *csiaddonsv1alpha1.ReclaimSpaceCronJob, schedule, pvcName string) { + failedJobsHistoryLimit := DefaultFailedJobsHistoryLimit + successfulJobsHistoryLimit := DefaultSuccessfulJobsHistoryLimit + + if v.Annotations == nil { + v.Annotations = map[string]string{} + } + v.Annotations[CSIAddonsStateAnnotation] = CSIAddonsStateManaged + + v.Spec.Schedule = schedule + v.Spec.FailedJobsHistoryLimit = &failedJobsHistoryLimit + v.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit + + v.Spec.JobSpec = csiaddonsv1alpha1.ReclaimSpaceJobTemplateSpec{ + Spec: csiaddonsv1alpha1.ReclaimSpaceJobSpec{ + Target: csiaddonsv1alpha1.TargetSpec{PersistentVolumeClaim: pvcName}, + BackoffLimit: DefaultBackoffLimit, + RetryDeadlineSeconds: DefaultRetryDeadlineSeconds, + }, + } +} + +func SetSpec(obj client.Object, schedule, pvcName string) { + + switch v := obj.(type) { + case *csiaddonsv1alpha1.EncryptionKeyRotationCronJob: + setKeyrotationSpec(v, schedule, pvcName) + case *csiaddonsv1alpha1.ReclaimSpaceCronJob: + setReclaimspaceSpec(v, schedule, pvcName) + } +} + +func GetSchedule(obj client.Object) string { + switch v := obj.(type) { + case *csiaddonsv1alpha1.EncryptionKeyRotationCronJob: + return v.Spec.Schedule + case *csiaddonsv1alpha1.ReclaimSpaceCronJob: + return v.Spec.Schedule + default: + return "" + } +} + +// ExtractOwnerNameFromPVCObj extracts owner.Name from the object if it is +// of type `T` and has a PVC as its owner. +func ExtractOwnerNameFromPVCObj[T client.Object](rawObj client.Object) []string { + // extract the owner from job object. + job, ok := rawObj.(T) + if !ok { + return nil + } + owner := metav1.GetControllerOf(job) + if owner == nil { + return nil + } + if owner.APIVersion != "v1" || owner.Kind != "PersistentVolumeClaim" { + return nil + } + + return []string{owner.Name} +} + +func CleanOldJobs( + ctx context.Context, + c client.Client, + log logr.Logger, + req ctrl.Request, + objList client.ObjectList, + expectedName string, +) (bool, error) { + // We need to find all the EKRCronJobs in the namespace of the PVC which + // are owned by this controller and remove it if it doesn't match the expected name + shouldRequeue := false + + if err := c.List(ctx, objList, client.InNamespace(req.Namespace), client.MatchingFields{JobOwnerKey: req.Name}); client.IgnoreNotFound(err) != nil { + return shouldRequeue, err + } + + items, err := meta.ExtractList(objList) + if err != nil { + return shouldRequeue, err + } + + for _, item := range items { + obj, ok := item.(client.Object) + if !ok { + // As long as objList is a k8s object + // we will never hit this + continue + } + objName := obj.GetName() + + // Only delete what we might have created + if owner := metav1.GetControllerOf(obj); owner == nil || + owner.Kind != "PersistentVolumeClaim" || + owner.Name != req.Name { + log.Info("Found an object without any owner", "jobName", objName) + + continue + } + + // If the name does not match, delete the resource + if obj.GetName() != expectedName { + if err := c.Delete(ctx, obj); client.IgnoreNotFound(err) != nil { + return shouldRequeue, err + } + + shouldRequeue = true + log.Info("Deleted old job", "jobName", objName) + } + } + + return shouldRequeue, nil +}