diff --git a/cmd/memberagent/main.go b/cmd/memberagent/main.go index eca9da748..ee9082b2a 100644 --- a/cmd/memberagent/main.go +++ b/cmd/memberagent/main.go @@ -381,6 +381,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb return err } // create the work controller, so we can pass it to the internal member cluster reconciler + workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes) workController := workapplier.NewReconciler( hubMgr.GetClient(), targetNS, @@ -396,7 +397,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb time.Second*time.Duration(*availabilityCheckInterval), time.Second*time.Duration(*driftDetectionInterval), *watchWorkWithPriorityQueue, - *watchWorkReconcileAgeMinutes, + workObjAgeForPrioritizedProcessing, ) if err = workController.SetupWithManager(hubMgr); err != nil { diff --git a/go.mod b/go.mod index afa3d6529..34a91b093 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,10 @@ require ( github.com/crossplane/crossplane-runtime v1.17.0 github.com/evanphx/json-patch/v5 v5.9.11 github.com/go-logr/logr v1.4.2 - github.com/google/go-cmp v0.6.0 + github.com/google/go-cmp v0.7.0 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.1 - github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_model v0.6.1 github.com/qri-io/jsonpointer v0.1.1 github.com/spf13/cobra v1.8.1 @@ -24,12 +24,12 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 golang.org/x/sync v0.12.0 - golang.org/x/time v0.7.0 - k8s.io/api v0.32.1 - k8s.io/apiextensions-apiserver v0.32.1 - k8s.io/apimachinery v0.32.1 - k8s.io/client-go v0.32.1 - k8s.io/component-base v0.32.1 + golang.org/x/time v0.9.0 + k8s.io/api v0.33.0 + k8s.io/apiextensions-apiserver v0.33.0 + k8s.io/apimachinery v0.33.0 + k8s.io/client-go v0.33.0 + k8s.io/component-base v0.33.0 k8s.io/component-helpers v0.28.3 k8s.io/klog/v2 v2.130.1 k8s.io/metrics v0.25.2 @@ -37,7 +37,7 @@ require ( sigs.k8s.io/cloud-provider-azure v1.28.2 sigs.k8s.io/cloud-provider-azure/pkg/azclient v0.0.50 sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 - sigs.k8s.io/controller-runtime v0.20.4 + sigs.k8s.io/controller-runtime v0.21.0 sigs.k8s.io/work-api v0.0.0-20220407021756-586d707fdb2c ) @@ -68,6 +68,7 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.3.1 // indirect github.com/aws/karpenter-core v0.32.2-0.20231109191441-e32aafc81fb5 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver/v4 v4.0.0 // indirect github.com/blendle/zapdriver v1.3.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -82,10 +83,8 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect - github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/gofuzz v1.2.0 // indirect + github.com/google/gnostic-models v0.6.9 // indirect github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -101,7 +100,7 @@ require ( github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/samber/lo v1.38.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect @@ -110,25 +109,26 @@ require ( github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/sjson v1.2.5 // indirect github.com/x448/float16 v0.8.4 // indirect - go.opentelemetry.io/otel v1.31.0 // indirect - go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel v1.33.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.36.0 // indirect golang.org/x/net v0.38.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect + golang.org/x/oauth2 v0.27.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect golang.org/x/tools v0.28.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect + k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index f8835a13a..9f25067d3 100644 --- a/go.sum +++ b/go.sum @@ -143,15 +143,13 @@ github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeD github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= -github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= +github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -169,6 +167,8 @@ github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKu github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -202,12 +202,12 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= -github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA= @@ -228,6 +228,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -253,14 +255,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.goms.io/fleet-networking v0.3.3 h1:5rwBntaUoLF+E1CzaWAEL4GdvLJPQorKhjgkbLlllPE= go.goms.io/fleet-networking v0.3.3/go.mod h1:Qgbi8M1fGaz/p5rtb6HJPmTDATWRnMt9HD1gz57WKUc= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -299,8 +303,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= +golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -331,8 +335,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -346,8 +350,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -360,26 +364,26 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.32.1 h1:f562zw9cy+GvXzXf0CKlVQ7yHJVYzLfL6JAS4kOAaOc= -k8s.io/api v0.32.1/go.mod h1:/Yi/BqkuueW1BgpoePYBRdDYfjPF5sgTr5+YqDZra5k= -k8s.io/apiextensions-apiserver v0.32.1 h1:hjkALhRUeCariC8DiVmb5jj0VjIc1N0DREP32+6UXZw= -k8s.io/apiextensions-apiserver v0.32.1/go.mod h1:sxWIGuGiYov7Io1fAS2X06NjMIk5CbRHc2StSmbaQto= -k8s.io/apimachinery v0.32.1 h1:683ENpaCBjma4CYqsmZyhEzrGz6cjn1MY/X2jB2hkZs= -k8s.io/apimachinery v0.32.1/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= -k8s.io/client-go v0.32.1 h1:otM0AxdhdBIaQh7l1Q0jQpmo7WOFIk5FFa4bg6YMdUU= -k8s.io/client-go v0.32.1/go.mod h1:aTTKZY7MdxUaJ/KiUs8D+GssR9zJZi77ZqtzcGXIiDg= +k8s.io/api v0.33.0 h1:yTgZVn1XEe6opVpP1FylmNrIFWuDqe2H0V8CT5gxfIU= +k8s.io/api v0.33.0/go.mod h1:CTO61ECK/KU7haa3qq8sarQ0biLq2ju405IZAd9zsiM= +k8s.io/apiextensions-apiserver v0.33.0 h1:d2qpYL7Mngbsc1taA4IjJPRJ9ilnsXIrndH+r9IimOs= +k8s.io/apiextensions-apiserver v0.33.0/go.mod h1:VeJ8u9dEEN+tbETo+lFkwaaZPg6uFKLGj5vyNEwwSzc= +k8s.io/apimachinery v0.33.0 h1:1a6kHrJxb2hs4t8EE5wuR/WxKDwGN1FKH3JvDtA0CIQ= +k8s.io/apimachinery v0.33.0/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= +k8s.io/client-go v0.33.0 h1:UASR0sAYVUzs2kYuKn/ZakZlcs2bEHaizrrHUZg0G98= +k8s.io/client-go v0.33.0/go.mod h1:kGkd+l/gNGg8GYWAPr0xF1rRKvVWvzh9vmZAMXtaKOg= k8s.io/cloud-provider v0.28.3 h1:9u+JjA3zIn0nqLOOa8tWnprFkffguSAhfBvo8p7LhBQ= k8s.io/cloud-provider v0.28.3/go.mod h1:shAJxdrKu+SwwGUhkodxByPjaH8KBFZqXo6jU1F0ehI= -k8s.io/component-base v0.32.1 h1:/5IfJ0dHIKBWysGV0yKTFfacZ5yNV1sulPh3ilJjRZk= -k8s.io/component-base v0.32.1/go.mod h1:j1iMMHi/sqAHeG5z+O9BFNCF698a1u0186zkjMZQ28w= +k8s.io/component-base v0.33.0 h1:Ot4PyJI+0JAD9covDhwLp9UNkUja209OzsJ4FzScBNk= +k8s.io/component-base v0.33.0/go.mod h1:aXYZLbw3kihdkOPMDhWbjGCO6sg+luw554KP51t8qCU= k8s.io/component-helpers v0.28.3 h1:te9ieTGzcztVktUs92X53P6BamAoP73MK0qQP0WmDqc= k8s.io/component-helpers v0.28.3/go.mod h1:oJR7I9ist5UAQ3y/CTdbw6CXxdMZ1Lw2Ua/EZEwnVLs= k8s.io/csi-translation-lib v0.28.3 h1:7deV+HZjV418AGikSDPW8dyzTpm4K3tNbQUp3KmR7cs= k8s.io/csi-translation-lib v0.28.3/go.mod h1:zlrYwakCz2yji9/8EaJk+afIKPrYXPNXXLDO8DVuuTk= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= -k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= +k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= k8s.io/metrics v0.25.2 h1:105TuPaIFfr4EHzN56WwZJO7r1UesuDytNTzeMqGySo= k8s.io/metrics v0.25.2/go.mod h1:4NDAauOuEJ+NWO2+hWkhFE4rWBx/plLWJOYU3vGl0sA= k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= @@ -392,11 +396,14 @@ sigs.k8s.io/cloud-provider-azure/pkg/azclient v0.0.50 h1:l9igMANNptVwYmZrqGS51oW sigs.k8s.io/cloud-provider-azure/pkg/azclient v0.0.50/go.mod h1:1M90A+akyTabHVnveSKlvIO/Kk9kEr1LjRx+08twKVU= sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618= sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848/go.mod h1:/aN4e7RWOMHgT4xAjCNkV4YFcpKfpZCeumMIL7S+KNM= -sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= -sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/controller-runtime v0.21.0 h1:CYfjpEuicjUecRk+KAeyYh+ouUBn4llGyDYytIGcJS8= +sigs.k8s.io/controller-runtime v0.21.0/go.mod h1:OSg14+F65eWqIu4DceX7k/+QRAbTTvxeQSNSOQpukWM= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc= +sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index aca1e73e3..fa51ab229 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -34,10 +34,10 @@ package workapplier import ( "context" + "sync" "time" "go.uber.org/atomic" - "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,13 +55,11 @@ import ( ctrloption "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" "github.com/kubefleet-dev/kubefleet/pkg/controllers/work" - "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" "github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter" "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer" @@ -70,148 +68,35 @@ import ( const ( patchDetailPerObjLimit = 100 - minRequestAfterDuration = time.Second * 5 + minRequestAfterDuration = time.Second * 5 + minWorkObjAgeForPrioritizedQueueing = time.Minute * 30 ) const ( workFieldManagerName = "work-api-agent" ) -var ( - workAgeToReconcile = 1 * time.Hour -) - -// Custom type to hold a reconcile.Request and a priority value -type priorityQueueItem struct { - reconcile.Request - Priority int -} - -// PriorityQueueEventHandler is a custom event handler for adding objects to the priority queue. -type PriorityQueueEventHandler struct { - Queue priorityqueue.PriorityQueue[priorityQueueItem] // The priority queue to manage events - Client client.Client // store the client to make API calls -} - -// Implement priorityqueue.Item interface for priorityQueueItem -func (i priorityQueueItem) GetPriority() int { - return i.Priority -} - -func (h *PriorityQueueEventHandler) WorkPendingApply(ctx context.Context, obj client.Object) bool { - var work fleetv1beta1.Work - ns := obj.GetNamespace() - name := obj.GetName() - err := h.Client.Get(ctx, client.ObjectKey{ - Namespace: ns, - Name: name, - }, &work) - if err != nil { - // Log and return - klog.ErrorS(err, "Failed to get the work", "name", name, "ns", ns) - return true - } - availCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable) - appliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied) - - if availCond != nil && appliedCond != nil { - // check if the object has been recently modified - availCondLastUpdatedTime := availCond.LastTransitionTime.Time - appliedCondLastUpdatedTime := appliedCond.LastTransitionTime.Time - if time.Since(availCondLastUpdatedTime) < workAgeToReconcile || time.Since(appliedCondLastUpdatedTime) < workAgeToReconcile { - return true - } - } - - if condition.IsConditionStatusTrue(availCond, work.GetGeneration()) && - condition.IsConditionStatusTrue(appliedCond, work.GetGeneration()) { - return false - } - - // Work not yet applied - return true -} - -func (h *PriorityQueueEventHandler) AddToPriorityQueue(ctx context.Context, obj client.Object, alwaysAdd bool) { - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - } - - objAge := time.Since(obj.GetCreationTimestamp().Time) - - var objPriority int - if alwaysAdd || objAge < workAgeToReconcile || h.WorkPendingApply(ctx, obj) { - // Newer or pending objects get higher priority - // Negate the Unix timestamp to give higher priority to newer timestamps - objPriority = -int(time.Now().Unix()) - } else { - // skip adding older objects with no changes - klog.V(2).InfoS("adding old item to priorityQueueItem", "obj", req.Name, "age", objAge) - objPriority = int(obj.GetCreationTimestamp().Unix()) - } - - // Create the custom priorityQueueItem with the request and priority - item := priorityQueueItem{ - Request: req, - Priority: objPriority, - } - - h.Queue.Add(item) - klog.V(2).InfoS("Created PriorityQueueItem", "priority", objPriority, "obj", req.Name, "queue size", h.Queue.Len()) -} - -func (h *PriorityQueueEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, false) -} - -func (h *PriorityQueueEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, true) -} - -func (h *PriorityQueueEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - // Ignore updates where only the status changed - oldObj := evt.ObjectOld.DeepCopyObject() - newObj := evt.ObjectNew.DeepCopyObject() - - // Zero out the status - if oldWork, ok := oldObj.(*fleetv1beta1.Work); ok { - oldWork.Status = fleetv1beta1.WorkStatus{} - } - if newWork, ok := newObj.(*fleetv1beta1.Work); ok { - newWork.Status = fleetv1beta1.WorkStatus{} - } - - if !equality.Semantic.DeepEqual(oldObj, newObj) { - // ignore status changes to prevent noise - h.AddToPriorityQueue(ctx, evt.ObjectNew, true) - return - } - klog.V(4).InfoS("ignoring update event with only status change", "work", evt.ObjectNew.GetName()) -} - -func (h *PriorityQueueEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, false) -} - // Reconciler reconciles a Work object. type Reconciler struct { - hubClient client.Client - workNameSpace string - spokeDynamicClient dynamic.Interface - spokeClient client.Client - restMapper meta.RESTMapper - recorder record.EventRecorder - concurrentReconciles int - watchWorkWithPriorityQueue bool - watchWorkReconcileAgeMinutes int - joined *atomic.Bool - parallelizer *parallelizer.Parallerlizer + hubClient client.Client + workNameSpace string + spokeDynamicClient dynamic.Interface + spokeClient client.Client + restMapper meta.RESTMapper + recorder record.EventRecorder + concurrentReconciles int + joined *atomic.Bool + parallelizer *parallelizer.Parallerlizer availabilityCheckRequeueAfter time.Duration driftCheckRequeueAfter time.Duration + + usePriorityQueue bool + workObjAgeForPrioritizedProcessing time.Duration + // The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled. + // + // Note that this variable is set only after the controller starts. + pq priorityqueue.PriorityQueue[reconcile.Request] } func NewReconciler( @@ -222,8 +107,8 @@ func NewReconciler( workerCount int, availabilityCheckRequestAfter time.Duration, driftCheckRequestAfter time.Duration, - watchWorkWithPriorityQueue bool, - watchWorkReconcileAgeMinutes int, + usePriorityQueue bool, + workObjAgeForPrioritizedProcessing time.Duration, ) *Reconciler { acRequestAfter := availabilityCheckRequestAfter if acRequestAfter < minRequestAfterDuration { @@ -237,20 +122,26 @@ func NewReconciler( dcRequestAfter = minRequestAfterDuration } + woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing + if woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing { + klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing) + woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing + } + return &Reconciler{ - hubClient: hubClient, - spokeDynamicClient: spokeDynamicClient, - spokeClient: spokeClient, - restMapper: restMapper, - recorder: recorder, - concurrentReconciles: concurrentReconciles, - parallelizer: parallelizer.NewParallelizer(workerCount), - watchWorkWithPriorityQueue: watchWorkWithPriorityQueue, - watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes, - workNameSpace: workNameSpace, - joined: atomic.NewBool(false), - availabilityCheckRequeueAfter: acRequestAfter, - driftCheckRequeueAfter: dcRequestAfter, + hubClient: hubClient, + spokeDynamicClient: spokeDynamicClient, + spokeClient: spokeClient, + restMapper: restMapper, + recorder: recorder, + concurrentReconciles: concurrentReconciles, + parallelizer: parallelizer.NewParallelizer(workerCount), + workNameSpace: workNameSpace, + joined: atomic.NewBool(false), + availabilityCheckRequeueAfter: acRequestAfter, + driftCheckRequeueAfter: dcRequestAfter, + usePriorityQueue: usePriorityQueue, + workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing, } } @@ -615,24 +506,35 @@ func (r *Reconciler) Leave(ctx context.Context) error { return nil } +func (r *Reconciler) PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] { + return r.pq +} + // SetupWithManager wires up the controller. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { - // Create the priority queue using the rate limiter and a queue name - queue := priorityqueue.New[priorityQueueItem]("apply-work-queue") + if r.usePriorityQueue { + eventHandler := &priorityBasedWorkObjEventHandler{ + qm: r, + workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing, + } - // Create the event handler that uses the priority queue - eventHandler := &PriorityQueueEventHandler{ - Queue: queue, // Attach the priority queue to the event handler - Client: r.hubClient, - } + var once sync.Once + newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + withRateLimiterOpt := func(opts *priorityqueue.Opts[reconcile.Request]) { + opts.RateLimiter = rateLimiter + } + once.Do(func() { + r.pq = priorityqueue.New(controllerName, withRateLimiterOpt) + }) + return r.pq + } - if r.watchWorkWithPriorityQueue { - workAgeToReconcile = time.Duration(r.watchWorkReconcileAgeMinutes) * time.Minute return ctrl.NewControllerManagedBy(mgr).Named("work-applier-controller"). WithOptions(ctrloption.Options{ MaxConcurrentReconciles: r.concurrentReconciles, + NewQueue: newPQ, }). - For(&fleetv1beta1.Work{}). + // Use custom event handler to allow access to the priority queue interface. Watches(&fleetv1beta1.Work{}, eventHandler). Complete(r) } diff --git a/pkg/controllers/workapplier/pq.go b/pkg/controllers/workapplier/pq.go new file mode 100644 index 000000000..49a1c4a02 --- /dev/null +++ b/pkg/controllers/workapplier/pq.go @@ -0,0 +1,228 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "context" + "fmt" + "time" + + "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" + "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +const ( + // A list of priority levels and their targets for the work applier priority queue. + // + // The work applier, when a priority queue is in use, will prioritize requests in the following + // order: + // * with highest priority (-2): all Create/Delete events, and all Update events + // that concern recently created Work objects or Work objects that are in a failed/undeterminted + // state (apply op/availability check failure, or diff reporting failure). + // * with medium priority (-1): all other Update events. + // * with default priority (0): all requeues (with or with errors), and all Generic events. + // + // Note that requests with the same priority level will be processed in the FIFO order. + // + // TO-DO (chenyu1): evaluate if/how we need to/should prioritize requeues properly. + highPriorityLevel = 2 + mediumPriorityLevel = 1 + defaultPriorityLevel = 0 +) + +type CustomPriorityQueueManager interface { + PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] +} + +var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &priorityBasedWorkObjEventHandler{} + +// priorityBasedWorkObjEventHandler implements the TypedEventHandler interface. +// +// It is used to process work object events in a priority-based manner with a priority queue. +type priorityBasedWorkObjEventHandler struct { + qm CustomPriorityQueueManager + workObjAgeForPrioritizedProcessing time.Duration +} + +// Create implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Create(_ context.Context, createEvent event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + // + // Normally when this method is called, the priority queue has been initialized. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Create event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Create event") + return + } + + // Enqueue the request with high priority. + opts := priorityqueue.AddOpts{ + Priority: highPriorityLevel, + } + workObjName := createEvent.Object.GetName() + workObjNS := createEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Delete implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Delete(_ context.Context, deleteEvent event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Delete event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Delete event") + return + } + + // Enqueue the request with high priority. + opts := priorityqueue.AddOpts{ + Priority: highPriorityLevel, + } + workObjName := deleteEvent.Object.GetName() + workObjNS := deleteEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Update implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Update(_ context.Context, updateEvent event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received an Update event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Update event") + return + } + + // Ignore status only updates. + if updateEvent.ObjectOld.GetGeneration() == updateEvent.ObjectNew.GetGeneration() { + return + } + + oldWorkObj, oldOK := updateEvent.ObjectOld.(*fleetv1beta1.Work) + newWorkObj, newOK := updateEvent.ObjectNew.(*fleetv1beta1.Work) + if !oldOK || !newOK { + wrappedErr := fmt.Errorf("received an Update event, but the objects cannot be cast to Work objects") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Update event") + return + } + + pri := h.determineUpdateEventPriority(oldWorkObj, newWorkObj) + opts := priorityqueue.AddOpts{ + Priority: pri, + } + workObjName := newWorkObj.GetName() + workObjNS := newWorkObj.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Generic implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Generic(_ context.Context, genericEvent event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Generic event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Generic event") + return + } + + // Enqueue the request with default priority. + opts := priorityqueue.AddOpts{ + Priority: defaultPriorityLevel, + } + workObjName := genericEvent.Object.GetName() + workObjNS := genericEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +func (h *priorityBasedWorkObjEventHandler) determineUpdateEventPriority(oldWorkObj, newWorkObj *fleetv1beta1.Work) int { + // If the work object is recently created (its age is within the given threshold), + // process its Update event with high priority. + + // The age is expected to be the same for both old and new work objects, as the field + // is immutable and not user configurable. + workObjAge := time.Since(newWorkObj.CreationTimestamp.Time) + if workObjAge <= h.workObjAgeForPrioritizedProcessing { + return highPriorityLevel + } + + // Check if the work object is in a failed/undetermined state. + oldApplyStrategy := oldWorkObj.Spec.ApplyStrategy + isReportDiffModeEnabled := oldApplyStrategy != nil && oldApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff + + appliedCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied) + availableCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable) + diffReportedCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeDiffReported) + + // Note (chenyu1): it might be true that the Update event involves an apply strategy change; however, the prioritization + // logic stays the same: if the old work object is in a failed/undetermined state, the apply strategy change + // should receive the highest priority; otherwise, the Update event should be processed with medium priority. + switch { + case isReportDiffModeEnabled && condition.IsConditionStatusTrue(diffReportedCond, oldWorkObj.Generation): + // The ReportDiff mode is enabled and the status suggests that the diff reporting has been completed successfully. + // Use medium priority for the Update event. + return mediumPriorityLevel + case isReportDiffModeEnabled: + // The ReportDiff mode is enabled, but the diff reporting has not been completed yet or has failed. + // Use high priority for the Update event. + return highPriorityLevel + case condition.IsConditionStatusTrue(appliedCond, oldWorkObj.Generation) && condition.IsConditionStatusTrue(availableCond, oldWorkObj.Generation): + // The apply strategy is set to the CSA/SSA mode and the work object is applied and available. + // Use medium priority for the Update event. + return mediumPriorityLevel + default: + // The apply strategy is set to the CSA/SSA mode and the work object is in a failed/undetermined state. + // Use high priority for the Update event. + return highPriorityLevel + } +} diff --git a/pkg/controllers/workapplier/suite_test.go b/pkg/controllers/workapplier/suite_test.go index d7491e25b..43664ee17 100644 --- a/pkg/controllers/workapplier/suite_test.go +++ b/pkg/controllers/workapplier/suite_test.go @@ -181,6 +181,8 @@ var _ = AfterSuite(func() { defer klog.Flush() cancel() + + time.Sleep(20 * time.Second) // Allow some time for the controller manager to shut down gracefully. By("Tearing down the test environment") Expect(hubEnv.Stop()).To(Succeed()) Expect(memberEnv.Stop()).To(Succeed())