Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/controllers/placement/resource_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
pvutil "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/deployment"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -444,6 +445,15 @@ func generateRawContent(object *unstructured.Unstructured) ([]byte, error) {
delete(annots, corev1.LastAppliedConfigAnnotation)
// Remove the revision annotation set by deployment controller.
delete(annots, deployment.RevisionAnnotation)
// Remove node-specific annotations from PVCs that would break when propagated to member clusters
// These annotations reference specific nodes from the hub cluster which don't exist on member clusters
// The member cluster's storage provisioner will set appropriate values for its own nodes
// All annotations below are listed in well-known labels, annotations and taints document:
// https://kubernetes.io/docs/reference/labels-annotations-taints/
delete(annots, pvutil.AnnSelectedNode) // Node selected for volume binding
delete(annots, pvutil.AnnBindCompleted) // Binding completion status
delete(annots, pvutil.AnnBoundByController) // Controller binding status
delete(annots, pvutil.AnnBetaStorageProvisioner) // Beta storage provisioner annotation
if len(annots) == 0 {
object.SetAnnotations(nil)
} else {
Expand Down Expand Up @@ -491,6 +501,10 @@ func generateRawContent(object *unstructured.Unstructured) ([]byte, error) {
unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "controller-uid")
unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "batch.kubernetes.io/controller-uid")
}
} else if object.GetKind() == "PersistentVolumeClaim" && object.GetAPIVersion() == "v1" {
// Remove volumeName which references a specific PV from the hub cluster that won't exist on member clusters.
// The member cluster's storage provisioner will create and bind a new PV.
unstructured.RemoveNestedField(object.Object, "spec", "volumeName")
}

rawContent, err := object.MarshalJSON()
Expand Down
58 changes: 58 additions & 0 deletions pkg/controllers/placement/resource_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
pvutil "k8s.io/component-helpers/storage/volume"
"k8s.io/kubectl/pkg/util/deployment"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -243,6 +245,62 @@ func TestGenerateResourceContent(t *testing.T) {
},
},
},
"PersistentVolumeClaim with node-specific annotations": {
resource: corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pvc",
Namespace: "test-namespace",
Annotations: map[string]string{
pvutil.AnnSelectedNode: "hub-control-plane",
pvutil.AnnBindCompleted: "yes",
pvutil.AnnBoundByController: "yes",
pvutil.AnnBetaStorageProvisioner: "kubernetes.io/no-provisioner",
"custom-annotation": "should-remain",
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
StorageClassName: ptr.To("standard"),
VolumeName: "pvc-12345-from-hub-cluster",
},
},
wantResource: corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pvc",
Namespace: "test-namespace",
Annotations: map[string]string{
"custom-annotation": "should-remain",
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
StorageClassName: ptr.To("standard"),
// VolumeName should be removed
},
},
},
}

for testName, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/enveloped_object_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ var _ = Describe("placing wrapped resources using a CRP", func() {
// read the test resources.
readDeploymentTestManifest(&testDeployment)
readDaemonSetTestManifest(&testDaemonSet)
readStatefulSetTestManifest(&testStatefulSet, true)
readStatefulSetTestManifest(&testStatefulSet, StatefulSetInvalidStorage)
readEnvelopeResourceTestManifest(&testResourceEnvelope)
})

Expand Down
92 changes: 91 additions & 1 deletion test/e2e/resource_placement_hub_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
var testDeployment appsv1.Deployment
var testDaemonSet appsv1.DaemonSet
var testJob batchv1.Job
var testStatefulSet appsv1.StatefulSet

BeforeAll(func() {
// Read the test manifests
readDeploymentTestManifest(&testDeployment)
readDaemonSetTestManifest(&testDaemonSet)
readJobTestManifest(&testJob)
readStatefulSetTestManifest(&testStatefulSet, StatefulSetWithStorage)
workNamespace := appNamespace()

// Create namespace and workloads
Expand All @@ -51,9 +53,11 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
testDeployment.Namespace = workNamespace.Name
testDaemonSet.Namespace = workNamespace.Name
testJob.Namespace = workNamespace.Name
testStatefulSet.Namespace = workNamespace.Name
Expect(hubClient.Create(ctx, &testDeployment)).To(Succeed(), "Failed to create test deployment %s", testDeployment.Name)
Expect(hubClient.Create(ctx, &testDaemonSet)).To(Succeed(), "Failed to create test daemonset %s", testDaemonSet.Name)
Expect(hubClient.Create(ctx, &testJob)).To(Succeed(), "Failed to create test job %s", testJob.Name)
Expect(hubClient.Create(ctx, &testStatefulSet)).To(Succeed(), "Failed to create test statefulset %s", testStatefulSet.Name)

// Create the CRP that selects the namespace
By("creating CRP that selects the namespace")
Expand Down Expand Up @@ -105,9 +109,30 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
Name: testJob.Name,
Namespace: workNamespace.Name,
},
{
Group: "apps",
Version: "v1",
Kind: "StatefulSet",
Name: testStatefulSet.Name,
Namespace: workNamespace.Name,
},
// PVCs created by StatefulSet controller from volumeClaimTemplates
// Kubernetes StatefulSet controller uses naming convention: <volumeClaimTemplate-name>-<statefulset-name>-<replica-index>
{
Version: "v1",
Kind: "PersistentVolumeClaim",
Name: fmt.Sprintf("%s-%s-%d", testStatefulSet.Spec.VolumeClaimTemplates[0].Name, testStatefulSet.Name, 0),
Namespace: workNamespace.Name,
},
{
Version: "v1",
Kind: "PersistentVolumeClaim",
Name: fmt.Sprintf("%s-%s-%d", testStatefulSet.Spec.VolumeClaimTemplates[0].Name, testStatefulSet.Name, 1),
Namespace: workNamespace.Name,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to the PVs created by the PVC? Do we ever place PV?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the controller on the member cluster will create PVCs for the statefulSet that we place on the member cluster too. From the look of it, their name would be the same (deterministic) but this can lead to problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PVs are cluster scoped resources so they are not selected by the CRP

}
// Use customizedPlacementStatusUpdatedActual with resourceIsTrackable=false
// because Jobs don't have availability tracking like Deployments/DaemonSets do
// because Jobs and PVCs don't have availability tracking like Deployments/DaemonSets do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PVC has conditions, I wonder why it's not trackable? However, do we really want to place PVC if they are the by-product of a statefulSet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added logic to track availability of PVC in availability tracker
I don't think we should place PVC if they are the by-product of a statefulset

I originally wanted to not propagate PVC but realized that stateful set controller doesn't set owner reference on the PVC so cannot reliably tell whether a PVC is created because of a statefulset (unless I infer from the name)

The hub PVCs look like this during the hub workload e2e test

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  annotations:
    pv.kubernetes.io/bind-completed: "yes"
    pv.kubernetes.io/bound-by-controller: "yes"
    volume.beta.kubernetes.io/storage-provisioner: rancher.io/local-path
    volume.kubernetes.io/selected-node: hub-control-plane
    volume.kubernetes.io/storage-provisioner: rancher.io/local-path
  creationTimestamp: "2025-11-26T17:02:01Z"
  finalizers:
  - kubernetes.io/pvc-protection
  labels:
    app: test-ss
  name: test-ss-pvc-test-ss-0
  namespace: application-1
  resourceVersion: "2328"
  uid: 1bab1a0e-d24d-44c6-bbbb-8a1f64a70e5c
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 100Mi
  storageClassName: standard
  volumeMode: Filesystem
  volumeName: pvc-1bab1a0e-d24d-44c6-bbbb-8a1f64a70e5c
status:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 100Mi
  phase: Bound
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  annotations:
    pv.kubernetes.io/bind-completed: "yes"
    pv.kubernetes.io/bound-by-controller: "yes"
    volume.beta.kubernetes.io/storage-provisioner: rancher.io/local-path
    volume.kubernetes.io/selected-node: hub-control-plane
    volume.kubernetes.io/storage-provisioner: rancher.io/local-path
  creationTimestamp: "2025-11-26T17:02:08Z"
  finalizers:
  - kubernetes.io/pvc-protection
  labels:
    app: test-ss
  name: test-ss-pvc-test-ss-1
  namespace: application-1
  resourceVersion: "2424"
  uid: 6a89a3cd-7a26-47eb-82bf-af929a1d2f8b
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 100Mi
  storageClassName: standard
  volumeMode: Filesystem
  volumeName: pvc-6a89a3cd-7a26-47eb-82bf-af929a1d2f8b
status:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 100Mi
  phase: Bound

crpKey := types.NamespacedName{Name: crpName}
crpStatusUpdatedActual := customizedPlacementStatusUpdatedActual(crpKey, wantSelectedResources, allMemberClusterNames, nil, "0", false)
Eventually(crpStatusUpdatedActual, workloadEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected")
Expand Down Expand Up @@ -170,6 +195,13 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
"Hub job should complete successfully")
})

It("should verify hub statefulset is ready", func() {
By("checking hub statefulset status")
statefulSetReadyActual := waitForStatefulSetToBeReady(hubClient, &testStatefulSet)
Eventually(statefulSetReadyActual, workloadEventuallyDuration, eventuallyInterval).Should(Succeed(),
"Hub statefulset should be ready before placement")
})

It("should place the deployment on all member clusters", func() {
By("verifying deployment is placed and ready on all member clusters")
for idx := range allMemberClusters {
Expand Down Expand Up @@ -206,6 +238,24 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
}
})

It("should place the statefulset on all member clusters", func() {
By("verifying statefulset is placed and ready on all member clusters")
for idx := range allMemberClusters {
memberCluster := allMemberClusters[idx]
statefulsetPlacedActual := waitForStatefulSetPlacementToReady(memberCluster, &testStatefulSet)
Eventually(statefulsetPlacedActual, workloadEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to place statefulset on member cluster %s", memberCluster.ClusterName)
}
})

It("should verify statefulset replicas are ready on all clusters", func() {
By("checking statefulset status on each cluster")
for _, cluster := range allMemberClusters {
statefulSetReadyActual := waitForStatefulSetToBeReady(cluster.KubeClient, &testStatefulSet)
Eventually(statefulSetReadyActual, workloadEventuallyDuration, eventuallyInterval).Should(Succeed(),
"StatefulSet should be ready on cluster %s", cluster.ClusterName)
}
})

It("should verify deployment replicas are ready on all clusters", func() {
By("checking deployment status on each cluster")
for _, cluster := range allMemberClusters {
Expand All @@ -232,6 +282,46 @@ var _ = Describe("placing workloads using a CRP with PickAll policy", Label("res
})
})

func waitForStatefulSetToBeReady(kubeClient client.Client, testStatefulSet *appsv1.StatefulSet) func() error {
return func() error {
var statefulSet appsv1.StatefulSet
if err := kubeClient.Get(ctx, types.NamespacedName{
Name: testStatefulSet.Name,
Namespace: testStatefulSet.Namespace,
}, &statefulSet); err != nil {
return err
}

// Verify statefulset is ready
if statefulSet.Status.ObservedGeneration != statefulSet.Generation {
return fmt.Errorf("statefulset has stale status: observed generation %d != generation %d",
statefulSet.Status.ObservedGeneration, statefulSet.Generation)
}

requiredReplicas := int32(1)
if statefulSet.Spec.Replicas != nil {
requiredReplicas = *statefulSet.Spec.Replicas
}

if statefulSet.Status.CurrentReplicas != requiredReplicas {
return fmt.Errorf("statefulset not ready: %d/%d current replicas",
statefulSet.Status.CurrentReplicas, requiredReplicas)
}

if statefulSet.Status.UpdatedReplicas != requiredReplicas {
return fmt.Errorf("statefulset not updated: %d/%d updated replicas",
statefulSet.Status.UpdatedReplicas, requiredReplicas)
}

if statefulSet.Status.CurrentReplicas != statefulSet.Status.UpdatedReplicas {
return fmt.Errorf("statefulset replicas not synchronized: %d current != %d updated",
statefulSet.Status.CurrentReplicas, statefulSet.Status.UpdatedReplicas)
}

return nil
}
}

func waitForJobToComplete(kubeClient client.Client, testJob *batchv1.Job) func() error {
return func() error {
var job batchv1.Job
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/resource_placement_rollout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = Describe("placing namespaced scoped resources using a RP with rollout",
testDaemonSet = appv1.DaemonSet{}
readDaemonSetTestManifest(&testDaemonSet)
testStatefulSet = appv1.StatefulSet{}
readStatefulSetTestManifest(&testStatefulSet, false)
readStatefulSetTestManifest(&testStatefulSet, StatefulSetBasic)
testService = corev1.Service{}
readServiceTestManifest(&testService)
testJob = batchv1.Job{}
Expand Down
28 changes: 28 additions & 0 deletions test/e2e/resources/statefulset-with-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: test-ss
spec:
selector:
matchLabels:
app: test-ss
serviceName: "test-ss-svc"
replicas: 2
template:
metadata:
labels:
app: test-ss
spec:
terminationGracePeriodSeconds: 10
containers:
- name: pause
image: k8s.gcr.io/pause:3.8
volumeClaimTemplates:
- metadata:
name: test-ss-pvc
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "standard"
resources:
requests:
storage: 100Mi
2 changes: 1 addition & 1 deletion test/e2e/rollout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ var _ = Describe("placing wrapped resources using a CRP", Ordered, func() {

BeforeAll(func() {
// Create the test resources.
readStatefulSetTestManifest(&testStatefulSet, false)
readStatefulSetTestManifest(&testStatefulSet, StatefulSetBasic)
readEnvelopeResourceTestManifest(&testStatefulSetEnvelope)
wantSelectedResources = []placementv1beta1.ResourceIdentifier{
{
Expand Down
29 changes: 23 additions & 6 deletions test/e2e/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ import (
"github.com/kubefleet-dev/kubefleet/test/e2e/framework"
)

// StatefulSetVariant represents different StatefulSet configurations for testing
type StatefulSetVariant int

const (
// StatefulSetBasic is a StatefulSet without any persistent volume claims
StatefulSetBasic StatefulSetVariant = iota
// StatefulSetInvalidStorage is a StatefulSet with a non-existent storage class
StatefulSetInvalidStorage
// StatefulSetWithStorage is a StatefulSet with a valid standard storage class
StatefulSetWithStorage
)

var (
croTestAnnotationKey = "cro-test-annotation"
croTestAnnotationValue = "cro-test-annotation-val"
Expand Down Expand Up @@ -1537,13 +1549,18 @@ func readDaemonSetTestManifest(testDaemonSet *appsv1.DaemonSet) {
Expect(err).Should(Succeed())
}

func readStatefulSetTestManifest(testStatefulSet *appsv1.StatefulSet, withVolume bool) {
func readStatefulSetTestManifest(testStatefulSet *appsv1.StatefulSet, variant StatefulSetVariant) {
By("Read the statefulSet resource")
if withVolume {
Expect(utils.GetObjectFromManifest("resources/statefulset-with-volume.yaml", testStatefulSet)).Should(Succeed())
} else {
Expect(utils.GetObjectFromManifest("resources/test-statefulset.yaml", testStatefulSet)).Should(Succeed())
}
var manifestPath string
switch variant {
case StatefulSetBasic:
manifestPath = "resources/statefulset-basic.yaml"
case StatefulSetInvalidStorage:
manifestPath = "resources/statefulset-invalid-storage.yaml"
case StatefulSetWithStorage:
manifestPath = "resources/statefulset-with-storage.yaml"
}
Expect(utils.GetObjectFromManifest(manifestPath, testStatefulSet)).Should(Succeed())
}

func readServiceTestManifest(testService *corev1.Service) {
Expand Down
Loading