Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions internal/bridge/crunchybridgecluster/apply.go

This file was deleted.

3 changes: 2 additions & 1 deletion internal/bridge/crunchybridgecluster/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crunchydata/postgres-operator/internal/bridge"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *CrunchyBridgeClusterReconciler) reconcilePostgresRoleSecrets(
roleSecrets[roleName], err = r.generatePostgresRoleSecret(cluster, role, clusterRole)
}
if err == nil {
err = errors.WithStack(r.apply(ctx, roleSecrets[roleName]))
err = errors.WithStack(runtime.Apply(ctx, r.Writer, roleSecrets[roleName]))
}
if err != nil {
log.Error(err, "Issue creating role secret.")
Expand Down
31 changes: 0 additions & 31 deletions internal/controller/pgupgrade/apply.go

This file was deleted.

4 changes: 2 additions & 2 deletions internal/controller/pgupgrade/pgupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// TODO: error from apply could mean that the job exists with a different spec.
if err == nil && !upgradeJobComplete {
err = errors.WithStack(r.apply(ctx,
err = errors.WithStack(runtime.Apply(ctx, r.Writer,
r.generateUpgradeJob(ctx, upgrade, world.ClusterPrimary, config.FetchKeyCommand(&world.Cluster.Spec))))
}

Expand All @@ -464,7 +464,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err == nil && upgradeJobComplete && !removeDataJobsComplete {
for _, sts := range world.ClusterReplicas {
if err == nil {
err = r.apply(ctx, r.generateRemoveDataJob(ctx, upgrade, sts))
err = runtime.Apply(ctx, r.Writer, r.generateRemoveDataJob(ctx, upgrade, sts))
}
}
}
Expand Down
44 changes: 2 additions & 42 deletions internal/controller/postgrescluster/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ package postgrescluster

import (
"context"
"reflect"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crunchydata/postgres-operator/internal/kubeapi"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
)

// apply sends an apply patch to object's endpoint in the Kubernetes API and
Expand All @@ -21,42 +18,5 @@ import (
// - https://docs.k8s.io/reference/using-api/server-side-apply/#managers
// - https://docs.k8s.io/reference/using-api/server-side-apply/#conflicts
func (r *Reconciler) apply(ctx context.Context, object client.Object) error {
// Generate an apply-patch by comparing the object to its zero value.
zero := reflect.New(reflect.TypeOf(object).Elem()).Interface()
data, err := client.MergeFrom(zero.(client.Object)).Data(object)
apply := client.RawPatch(client.Apply.Type(), data)

// Keep a copy of the object before any API calls.
intent := object.DeepCopyObject()
patch := kubeapi.NewJSONPatch()

// Send the apply-patch with force=true.
if err == nil {
err = r.Writer.Patch(ctx, object, apply, client.ForceOwnership)
}

// Some fields cannot be server-side applied correctly. When their outcome
// does not match the intent, send a json-patch to get really specific.
switch actual := object.(type) {
case *corev1.Service:
applyServiceSpec(patch, actual.Spec, intent.(*corev1.Service).Spec, "spec")
}

// Send the json-patch when necessary.
if err == nil && !patch.IsEmpty() {
err = r.Writer.Patch(ctx, object, patch)
}
return err
}

// applyServiceSpec is called by Reconciler.apply to work around issues
// with server-side apply.
func applyServiceSpec(
patch *kubeapi.JSON6902, actual, intent corev1.ServiceSpec, path ...string,
) {
// Service.Spec.Selector is not +mapType=atomic until Kubernetes 1.22.
// - https://issue.k8s.io/97970
if !equality.Semantic.DeepEqual(actual.Selector, intent.Selector) {
patch.Replace(append(path, "selector")...)(intent.Selector)
}
return runtime.Apply(ctx, r.Writer, object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking about a separate PR to remove this file and change the many applys in internal/controller/postgrescluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't planning on it, but that seems reasonable. I skipped it here because it is called a lot.

}
8 changes: 4 additions & 4 deletions internal/controller/postgrescluster/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/crunchydata/postgres-operator/internal/kubeapi"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
Expand All @@ -31,7 +31,7 @@ func (r *Reconciler) adoptObject(ctx context.Context, postgresCluster *v1beta1.P
return err
}

patchBytes, err := kubeapi.NewMergePatch().
patchBytes, err := runtime.NewMergePatch().
Add("metadata", "ownerReferences")(obj.GetOwnerReferences()).Bytes()
if err != nil {
return err
Expand Down Expand Up @@ -160,8 +160,8 @@ func (r *Reconciler) manageControllerRefs(ctx context.Context,
func (r *Reconciler) releaseObject(ctx context.Context,
postgresCluster *v1beta1.PostgresCluster, obj client.Object) error {

// TODO create a strategic merge type in kubeapi instead of using Merge7386
patch, err := kubeapi.NewMergePatch().
// TODO create a strategic merge type instead of using Merge7386
patch, err := runtime.NewMergePatch().
Add("metadata", "ownerReferences")([]map[string]string{{
"$patch": "delete",
"uid": string(postgresCluster.GetUID()),
Expand Down
76 changes: 24 additions & 52 deletions internal/controller/postgrescluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -342,59 +341,32 @@ spec:
//
// The "metadata.finalizers" field is also okay.
// - https://book.kubebuilder.io/reference/using-finalizers.html
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highly recommend turning off whitespace differences to see what's actually going on here for anyone reviewing this

// NOTE(cbandy): Kubernetes prior to v1.16.10 and v1.17.6 does not track
// managed fields on the status subresource: https://issue.k8s.io/88901
switch {
case suite.ServerVersion.LessThan(version.MustParseGeneric("1.22")):

// Kubernetes 1.22 began tracking subresources in managed fields.
// - https://pr.k8s.io/100970
Expect(existing.ManagedFields).To(ContainElement(
MatchFields(IgnoreExtras, Fields{
"Manager": Equal(test.Owner),
"FieldsV1": PointTo(MatchAllFields(Fields{
"Raw": WithTransform(func(in []byte) (out map[string]any) {
Expect(yaml.Unmarshal(in, &out)).To(Succeed())
return out
}, MatchAllKeys(Keys{
"f:metadata": MatchAllKeys(Keys{
"f:finalizers": Not(BeZero()),
}),
"f:status": Not(BeZero()),
})),
})),
}),
), `controller should manage only "finalizers" and "status"`)

default:
Expect(existing.ManagedFields).To(ContainElements(
MatchFields(IgnoreExtras, Fields{
"Manager": Equal(test.Owner),
"FieldsV1": PointTo(MatchAllFields(Fields{
"Raw": WithTransform(func(in []byte) (out map[string]any) {
Expect(yaml.Unmarshal(in, &out)).To(Succeed())
return out
}, MatchAllKeys(Keys{
"f:metadata": MatchAllKeys(Keys{
"f:finalizers": Not(BeZero()),
}),
})),
Expect(existing.ManagedFields).To(ContainElements(
MatchFields(IgnoreExtras, Fields{
"Manager": Equal(test.Owner),
"FieldsV1": PointTo(MatchAllFields(Fields{
"Raw": WithTransform(func(in []byte) (out map[string]any) {
Expect(yaml.Unmarshal(in, &out)).To(Succeed())
return out
}, MatchAllKeys(Keys{
"f:metadata": MatchAllKeys(Keys{
"f:finalizers": Not(BeZero()),
}),
})),
}),
MatchFields(IgnoreExtras, Fields{
"Manager": Equal(test.Owner),
"FieldsV1": PointTo(MatchAllFields(Fields{
"Raw": WithTransform(func(in []byte) (out map[string]any) {
Expect(yaml.Unmarshal(in, &out)).To(Succeed())
return out
}, MatchAllKeys(Keys{
"f:status": Not(BeZero()),
})),
})),
}),
MatchFields(IgnoreExtras, Fields{
"Manager": Equal(test.Owner),
"FieldsV1": PointTo(MatchAllFields(Fields{
"Raw": WithTransform(func(in []byte) (out map[string]any) {
Expect(yaml.Unmarshal(in, &out)).To(Succeed())
return out
}, MatchAllKeys(Keys{
"f:status": Not(BeZero()),
})),
}),
), `controller should manage only "finalizers" and "status"`)
}
})),
}),
), `controller should manage only "finalizers" and "status"`)
})

Specify("Patroni Distributed Configuration", func() {
Expand Down
27 changes: 1 addition & 26 deletions internal/controller/postgrescluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,19 @@ package postgrescluster

import (
"context"
"os"
"strings"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/testing/require"
)

var suite struct {
Client client.Client
Config *rest.Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the only use of ServerVersion was in the test in internal/controller/postgrescluster/controller_test.go where we were checking for older versions of k8s than we care about, ah


ServerVersion *version.Version

Manager manager.Manager
}

func TestAPIs(t *testing.T) {
Expand All @@ -39,24 +28,10 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
if os.Getenv("KUBEBUILDER_ASSETS") == "" && !strings.EqualFold(os.Getenv("USE_EXISTING_CLUSTER"), "true") {
Skip("skipping")
}
suite.Client = require.Kubernetes(GinkgoT())

logging.SetLogSink(logging.Logrus(GinkgoWriter, "test", 1, 1))
log.SetLogger(logging.FromContext(context.Background()))

By("bootstrapping test environment")
suite.Config, suite.Client = require.Kubernetes2(GinkgoT())

dc, err := discovery.NewDiscoveryClientForConfig(suite.Config)
Expect(err).ToNot(HaveOccurred())

server, err := dc.ServerVersion()
Expect(err).ToNot(HaveOccurred())

suite.ServerVersion, err = version.ParseGeneric(server.GitVersion)
Expect(err).ToNot(HaveOccurred())
})

var _ = AfterSuite(func() {
Expand Down
58 changes: 58 additions & 0 deletions internal/controller/runtime/apply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 - 2025 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package runtime

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Apply sends an apply patch with force=true using cc and updates object with any returned content.
// The client is responsible for setting fieldManager; see [client.WithFieldOwner].
//
// - https://docs.k8s.io/reference/using-api/server-side-apply#managers
// - https://docs.k8s.io/reference/using-api/server-side-apply#conflicts
func Apply[
// NOTE: This interface can go away following https://go.dev/issue/47487.
ClientPatch interface {
Patch(context.Context, client.Object, client.Patch, ...client.PatchOption) error
},
T interface{ client.Object },
](ctx context.Context, cc ClientPatch, object T) error {
// Generate an apply-patch by comparing the object to its zero value.
data, err := client.MergeFrom(*new(T)).Data(object)
apply := client.RawPatch(client.Apply.Type(), data)

// Keep a copy of the object before any API calls.
intent := object.DeepCopyObject()

// Send the apply-patch with force=true.
if err == nil {
err = cc.Patch(ctx, object, apply, client.ForceOwnership)
}

// Some fields cannot be server-side applied correctly.
// When their outcome does not match the intent, send a json-patch to get really specific.
patch := NewJSONPatch()

switch actual := any(object).(type) {
case *corev1.Service:
intent := intent.(*corev1.Service)

// Service.Spec.Selector cannot be unset; perhaps https://issue.k8s.io/117447
if !equality.Semantic.DeepEqual(actual.Spec.Selector, intent.Spec.Selector) {
patch.Replace("spec", "selector")(intent.Spec.Selector)
}
}

// Send the json-patch when necessary.
if err == nil && !patch.IsEmpty() {
err = cc.Patch(ctx, object, patch)
}
return err
}
Loading
Loading