Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions config/crd/bases/postgres-operator.crunchydata.com_pgupgrades.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,7 @@ spec:
type: object
fromPostgresVersion:
description: The major version of PostgreSQL before the upgrade.
format: int32
maximum: 17
minimum: 11
type: integer
Expand All @@ -984,7 +985,7 @@ spec:
description: |-
The image pull secrets used to pull from a private registry.
Changing this value causes all running PGUpgrade pods to restart.
https://k8s.io/docs/tasks/configure-pod-container/pull-image-private-registry/
https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry
items:
description: |-
LocalObjectReference contains enough information to let you locate the
Expand All @@ -1002,6 +1003,13 @@ spec:
type: object
x-kubernetes-map-type: atomic
type: array
jobs:
description: |-
The number of simultaneous processes pg_upgrade should use.
More info: https://www.postgresql.org/docs/current/pgupgrade.html
format: int32
minimum: 0
type: integer
metadata:
description: Metadata contains metadata for custom resources
properties:
Expand All @@ -1015,14 +1023,14 @@ spec:
type: object
type: object
postgresClusterName:
description: The name of the cluster to be updated
description: The name of the Postgres cluster to upgrade.
minLength: 1
type: string
priorityClassName:
description: |-
Priority class name for the PGUpgrade pod. Changing this
value causes PGUpgrade pod to restart.
More info: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/
More info: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption
type: string
resources:
description: Resource requirements for the PGUpgrade container.
Expand Down Expand Up @@ -1083,13 +1091,9 @@ spec:
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
type: object
toPostgresImage:
description: |-
The image name to use for PostgreSQL containers after upgrade.
When omitted, the value comes from an operator environment variable.
type: string
toPostgresVersion:
description: The major version of PostgreSQL to be upgraded to.
format: int32
maximum: 17
minimum: 11
type: integer
Expand Down Expand Up @@ -1134,11 +1138,30 @@ spec:
type: string
type: object
type: array
transferMethod:
description: |-
The method pg_upgrade should use to transfer files to the new cluster.
More info: https://www.postgresql.org/docs/current/pgupgrade.html
enum:
- Clone
- Copy
- CopyFileRange
- Link
maxLength: 15
type: string
required:
- fromPostgresVersion
- postgresClusterName
- toPostgresVersion
type: object
x-kubernetes-validations:
- rule: self.fromPostgresVersion < self.toPostgresVersion
- message: Only Copy or Link before PostgreSQL 12
rule: '!has(self.transferMethod) || (self.toPostgresVersion < 12 ? self.transferMethod
in ["Copy","Link"] : true)'
- message: Only Clone, Copy, or Link before PostgreSQL 17
rule: '!has(self.transferMethod) || (self.toPostgresVersion < 17 ? self.transferMethod
in ["Clone","Copy","Link"] : true)'
status:
description: PGUpgradeStatus defines the observed state of PGUpgrade
properties:
Expand Down
41 changes: 24 additions & 17 deletions internal/controller/pgupgrade/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package pgupgrade

import (
"cmp"
"context"
"fmt"
"math"
"strings"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -35,9 +37,16 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {

// upgradeCommand returns an entrypoint that prepares the filesystem for
// and performs a PostgreSQL major version upgrade using pg_upgrade.
func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availableCPUs int) []string {
// Use multiple CPUs when three or more are available.
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, availableCPUs-1))
func upgradeCommand(spec *v1beta1.PGUpgradeSettings, fetchKeyCommand string) []string {
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, spec.Jobs))
argMethod := cmp.Or(map[string]string{
"Clone": ` --clone`,
"Copy": ` --copy`,
"CopyFileRange": ` --copy-file-range`,
}[spec.TransferMethod], ` --link`)

oldVersion := spec.FromPostgresVersion
newVersion := spec.ToPostgresVersion

// if the fetch key command is set for TDE, provide the value during initialization
initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
Expand Down Expand Up @@ -99,14 +108,14 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl
`echo -e "Step 5: Running pg_upgrade check...\n"`,
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\`,
` --new-datadir /pgdata/pg"${new_version}" --link --check` + argJobs,
` --new-datadir /pgdata/pg"${new_version}" --check` + argMethod + argJobs,

// Assuming the check completes successfully, the pg_upgrade command will
// be run that actually prepares the upgraded pgdata directory.
`echo -e "\nStep 6: Running pg_upgrade...\n"`,
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \`,
`--new-datadir /pgdata/pg"${new_version}" --link` + argJobs,
`--new-datadir /pgdata/pg"${new_version}"` + argMethod + argJobs,

// Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
// from the old data dir to help retain PostgreSQL parameters you had set before.
Expand All @@ -122,12 +131,12 @@ func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availabl

// largestWholeCPU returns the maximum CPU request or limit as a non-negative
// integer of CPUs. When resources lacks any CPU, the result is zero.
func largestWholeCPU(resources corev1.ResourceRequirements) int {
func largestWholeCPU(resources corev1.ResourceRequirements) int64 {
// Read CPU quantities as millicores then divide to get the "floor."
// NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
return max(
int(resources.Limits.Cpu().ScaledValue(resource.Milli)/1000),
int(resources.Requests.Cpu().ScaledValue(resource.Milli)/1000),
resources.Limits.Cpu().ScaledValue(resource.Milli)/1000,
resources.Requests.Cpu().ScaledValue(resource.Milli)/1000,
0)
}

Expand Down Expand Up @@ -180,10 +189,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
job.Spec.BackoffLimit = initialize.Int32(0)
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever

// When enabled, calculate the number of CPUs for pg_upgrade.
wholeCPUs := 0
if feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
wholeCPUs = largestWholeCPU(upgrade.Spec.Resources)
settings := upgrade.Spec.PGUpgradeSettings.DeepCopy()

// When jobs is undefined, use one less than the number of CPUs.
if settings.Jobs == 0 && feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
wholeCPUs := int32(min(math.MaxInt32, largestWholeCPU(upgrade.Spec.Resources)))
settings.Jobs = wholeCPUs - 1
}

// Replace all containers with one that does the upgrade.
Expand All @@ -198,11 +209,7 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
VolumeMounts: database.VolumeMounts,

// Use our upgrade command and the specified image and resources.
Command: upgradeCommand(
upgrade.Spec.FromPostgresVersion,
upgrade.Spec.ToPostgresVersion,
fetchKeyCommand,
wholeCPUs),
Command: upgradeCommand(settings, fetchKeyCommand),
Image: pgUpgradeContainerImage(upgrade),
ImagePullPolicy: upgrade.Spec.ImagePullPolicy,
Resources: upgrade.Spec.Resources,
Expand Down
52 changes: 39 additions & 13 deletions internal/controller/pgupgrade/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

func TestLargestWholeCPU(t *testing.T) {
assert.Equal(t, 0,
assert.Equal(t, int64(0),
largestWholeCPU(corev1.ResourceRequirements{}),
"expected the zero value to be zero")

for _, tt := range []struct {
Name, ResourcesYAML string
Result int
Result int64
}{
{
Name: "Negatives", ResourcesYAML: `{requests: {cpu: -3}, limits: {cpu: -5}}`,
Expand Down Expand Up @@ -72,27 +72,53 @@ func TestUpgradeCommand(t *testing.T) {
})
}

t.Run("CPUs", func(t *testing.T) {
t.Run("Jobs", func(t *testing.T) {
for _, tt := range []struct {
CPUs int
Jobs string
Spec int32
Args string
}{
{CPUs: 0, Jobs: "--jobs=1"},
{CPUs: 1, Jobs: "--jobs=1"},
{CPUs: 2, Jobs: "--jobs=1"},
{CPUs: 3, Jobs: "--jobs=2"},
{CPUs: 10, Jobs: "--jobs=9"},
{Spec: -1, Args: "--jobs=1"},
{Spec: 0, Args: "--jobs=1"},
{Spec: 1, Args: "--jobs=1"},
{Spec: 2, Args: "--jobs=2"},
{Spec: 10, Args: "--jobs=10"},
} {
command := upgradeCommand(10, 11, "", tt.CPUs)
spec := &v1beta1.PGUpgradeSettings{Jobs: tt.Spec}
command := upgradeCommand(spec, "")
assert.Assert(t, len(command) > 3)
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])

script := command[3]
assert.Assert(t, cmp.Contains(script, tt.Jobs))
assert.Assert(t, cmp.Contains(script, tt.Args))

expectScript(t, script)
}
})

t.Run("Method", func(t *testing.T) {
for _, tt := range []struct {
Spec string
Args string
}{
{Spec: "", Args: "--link"},
{Spec: "mystery!", Args: "--link"},
{Spec: "Link", Args: "--link"},
{Spec: "Clone", Args: "--clone"},
{Spec: "Copy", Args: "--copy"},
{Spec: "CopyFileRange", Args: "--copy-file-range"},
} {
spec := &v1beta1.PGUpgradeSettings{TransferMethod: tt.Spec}
command := upgradeCommand(spec, "")
assert.Assert(t, len(command) > 3)
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])

script := command[3]
assert.Assert(t, cmp.Contains(script, tt.Args))

expectScript(t, script)
}

})
}

func TestGenerateUpgradeJob(t *testing.T) {
Expand Down Expand Up @@ -194,7 +220,7 @@ spec:
echo -e "Step 5: Running pg_upgrade check...\n"
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\
--new-datadir /pgdata/pg"${new_version}" --link --check --jobs=1
--new-datadir /pgdata/pg"${new_version}" --check --link --jobs=1
echo -e "\nStep 6: Running pg_upgrade...\n"
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/pgupgrade/pgupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
setStatusToProgressingIfReasonWas("", upgrade)

// The "from" version must be smaller than the "to" version.
// NOTE: CRD validation also rejects these values.
// An invalid PGUpgrade should not be requeued.
if upgrade.Spec.FromPostgresVersion >= upgrade.Spec.ToPostgresVersion {

Expand Down Expand Up @@ -418,7 +419,7 @@ func (r *PGUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Set the cluster status when we know the upgrade has completed successfully.
// This will serve to help the user see that the upgrade has completed if they
// are only watching the PostgresCluster
patch.Status.PostgresVersion = upgrade.Spec.ToPostgresVersion
patch.Status.PostgresVersion = int(upgrade.Spec.ToPostgresVersion)

// Set the pgBackRest status for bootstrapping
patch.Status.PGBackRest.Repos = []v1beta1.RepoStatus{}
Expand Down
2 changes: 1 addition & 1 deletion internal/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewGate() MutableGate {
OpenTelemetryLogs: {Default: false, PreRelease: featuregate.Alpha},
OpenTelemetryMetrics: {Default: false, PreRelease: featuregate.Alpha},
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
PGUpgradeCPUConcurrency: {Default: false, PreRelease: featuregate.Alpha},
PGUpgradeCPUConcurrency: {Default: true, PreRelease: featuregate.Beta},
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestDefaults(t *testing.T) {
assert.Assert(t, false == gate.Enabled(OpenTelemetryLogs))
assert.Assert(t, false == gate.Enabled(OpenTelemetryMetrics))
assert.Assert(t, false == gate.Enabled(PGBouncerSidecars))
assert.Assert(t, false == gate.Enabled(PGUpgradeCPUConcurrency))
assert.Assert(t, true == gate.Enabled(PGUpgradeCPUConcurrency))
assert.Assert(t, false == gate.Enabled(TablespaceVolumes))
assert.Assert(t, false == gate.Enabled(VolumeSnapshots))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/upgradecheck/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCheckForUpgrades(t *testing.T) {
assert.Equal(t, data.RegistrationToken, "speakFriend")
assert.Equal(t, data.BridgeClustersTotal, 2)
assert.Equal(t, data.PGOClustersTotal, 2)
assert.Equal(t, data.FeatureGatesEnabled, "AutoCreateUserSchema=true,TablespaceVolumes=true")
assert.Equal(t, data.FeatureGatesEnabled, "AutoCreateUserSchema=true,PGUpgradeCPUConcurrency=true,TablespaceVolumes=true")
}

t.Run("success", func(t *testing.T) {
Expand Down
Loading
Loading