From a2b1f96f3f728459e3eeb41a13aa265de356a6d0 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 4 Mar 2024 17:15:11 +0100 Subject: [PATCH 1/2] remove stream resources after drop from Postgres manifest --- pkg/cluster/cluster.go | 2 +- pkg/cluster/streams.go | 3 ++- pkg/cluster/streams_test.go | 17 +++++++++++++++++ pkg/util/k8sutil/k8sutil.go | 26 ++++++++++++++++++++++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index fad7965ba..24c95d40f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1045,7 +1045,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // streams - if len(newSpec.Spec.Streams) > 0 { + if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { if err := c.syncStreams(); err != nil { c.logger.Errorf("could not sync streams: %v", err) updateFailed = true diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index a135c5767..ec4221b4b 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -327,7 +327,8 @@ func (c *Cluster) syncStreams() error { if len(slotsToSync) > 0 { requiredPatroniConfig.Slots = slotsToSync } else { - return nil + // try to delete existing stream resources + return c.deleteStreams() } c.logger.Debug("syncing logical replication slots") diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index f71178823..63c38311b 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -455,4 +455,21 @@ func TestUpdateFabricEventStream(t *testing.T) { if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) } + + mockClient := k8sutil.NewMockKubernetesClient() + cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter + + // remove streams from manifest + pgPatched.Spec.Streams = nil + pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( + context.TODO(), pgPatched, metav1.UpdateOptions{}) + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgUpdated.Spec + cluster.syncStreams() + + streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + if len(streamList.Items) > 0 || err != nil { + t.Errorf("stream resource has not been removed or unexpected error %v", err) + } } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 44608856e..6efeccc86 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -19,8 +19,9 @@ import ( apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apipolicyv1 "k8s.io/api/policy/v1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + apiextv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -62,7 +63,7 @@ type KubernetesClient struct { appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter policyv1.PodDisruptionBudgetsGetter - apiextv1.CustomResourceDefinitionsGetter + apiextv1client.CustomResourceDefinitionsGetter clientbatchv1.CronJobsGetter acidv1.OperatorConfigurationsGetter acidv1.PostgresTeamsGetter @@ -74,6 +75,13 @@ type KubernetesClient struct { Zalandov1ClientSet *zalandoclient.Clientset } +type mockCustomResourceDefinition struct { + apiextv1client.CustomResourceDefinitionInterface +} + +type MockCustomResourceDefinitionsGetter struct { +} + type mockSecret struct { corev1.SecretInterface } @@ -293,6 +301,18 @@ func SameLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) return true, "" } +func (c *mockCustomResourceDefinition) Get(ctx context.Context, name string, options metav1.GetOptions) (*apiextv1.CustomResourceDefinition, error) { + return &apiextv1.CustomResourceDefinition{}, nil +} + +func (c *mockCustomResourceDefinition) Create(ctx context.Context, crd *apiextv1.CustomResourceDefinition, options metav1.CreateOptions) (*apiextv1.CustomResourceDefinition, error) { + return &apiextv1.CustomResourceDefinition{}, nil +} + +func (mock *MockCustomResourceDefinitionsGetter) CustomResourceDefinitions() apiextv1client.CustomResourceDefinitionInterface { + return &mockCustomResourceDefinition{} +} + func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { oldFormatSecret := &v1.Secret{} oldFormatSecret.Name = "testcluster" @@ -497,6 +517,8 @@ func NewMockKubernetesClient() KubernetesClient { ConfigMapsGetter: &MockConfigMapsGetter{}, DeploymentsGetter: &MockDeploymentGetter{}, ServicesGetter: &MockServiceGetter{}, + + CustomResourceDefinitionsGetter: &MockCustomResourceDefinitionsGetter{}, } } From 755b05e28aceebaa05e0322b07a6b39b66347a3d Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 24 May 2024 15:11:41 +0200 Subject: [PATCH 2/2] fix wrong conflict resolution --- pkg/util/k8sutil/k8sutil.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 6983f05fd..b237eb8fe 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -261,6 +261,18 @@ func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason stri return } +func (c *mockCustomResourceDefinition) Get(ctx context.Context, name string, options metav1.GetOptions) (*apiextv1.CustomResourceDefinition, error) { + return &apiextv1.CustomResourceDefinition{}, nil +} + +func (c *mockCustomResourceDefinition) Create(ctx context.Context, crd *apiextv1.CustomResourceDefinition, options metav1.CreateOptions) (*apiextv1.CustomResourceDefinition, error) { + return &apiextv1.CustomResourceDefinition{}, nil +} + +func (mock *MockCustomResourceDefinitionsGetter) CustomResourceDefinitions() apiextv1client.CustomResourceDefinitionInterface { + return &mockCustomResourceDefinition{} +} + func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { oldFormatSecret := &v1.Secret{} oldFormatSecret.Name = "testcluster"