Skip to content

Commit 08ad61a

Browse files
authored
test: add additional integration tests for waved work applier manifest processing (#244)
1 parent 8aefc88 commit 08ad61a

File tree

9 files changed

+1465
-66
lines changed

9 files changed

+1465
-66
lines changed

cmd/memberagent/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
418418
// resource processing.
419419
5,
420420
// Use the default worker count (4) for parallelized manifest processing.
421-
parallelizer.DefaultNumOfWorkers,
421+
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
422422
time.Minute*time.Duration(*deletionWaitTime),
423423
*watchWorkWithPriorityQueue,
424424
*watchWorkReconcileAgeMinutes,

pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {
379379

380380
// This controller is created for testing purposes only; no reconciliation loop is actually
381381
// run.
382-
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil)
382+
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
383383

384384
propertyProvider1 = &manuallyUpdatedProvider{}
385385
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
@@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {
402402

403403
// This controller is created for testing purposes only; no reconciliation loop is actually
404404
// run.
405-
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil)
405+
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
406406

407407
member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
408408
Expect(err).NotTo(HaveOccurred())

pkg/controllers/workapplier/controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import (
4949
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
5050
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
5151
"github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter"
52-
"github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
52+
parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
5353
)
5454

5555
const (
@@ -227,7 +227,7 @@ type Reconciler struct {
227227
watchWorkReconcileAgeMinutes int
228228
deletionWaitTime time.Duration
229229
joined *atomic.Bool
230-
parallelizer *parallelizer.Parallerlizer
230+
parallelizer parallelizerutil.Parallelizer
231231
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
232232
}
233233

@@ -237,15 +237,20 @@ func NewReconciler(
237237
spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper,
238238
recorder record.EventRecorder,
239239
concurrentReconciles int,
240-
workerCount int,
240+
parallelizer parallelizerutil.Parallelizer,
241241
deletionWaitTime time.Duration,
242242
watchWorkWithPriorityQueue bool,
243243
watchWorkReconcileAgeMinutes int,
244244
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter,
245245
) *Reconciler {
246246
if requeueRateLimiter == nil {
247+
klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter")
247248
requeueRateLimiter = defaultRequeueRateLimiter
248249
}
250+
if parallelizer == nil {
251+
klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1")
252+
parallelizer = parallelizerutil.NewParallelizer(1)
253+
}
249254

250255
return &Reconciler{
251256
hubClient: hubClient,
@@ -254,7 +259,7 @@ func NewReconciler(
254259
restMapper: restMapper,
255260
recorder: recorder,
256261
concurrentReconciles: concurrentReconciles,
257-
parallelizer: parallelizer.NewParallelizer(workerCount),
262+
parallelizer: parallelizer,
258263
watchWorkWithPriorityQueue: watchWorkWithPriorityQueue,
259264
watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes,
260265
workNameSpace: workNameSpace,

pkg/controllers/workapplier/controller_integration_test.go

Lines changed: 51 additions & 49 deletions
Large diffs are not rendered by default.

pkg/controllers/workapplier/process.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (r *Reconciler) processManifests(
8282
klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundlesInWave[piece].manifestObj), "work", klog.KObj(work))
8383
}
8484

85-
r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, "processingManifests")
85+
r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx))
8686
}
8787
}
8888

pkg/controllers/workapplier/suite_test.go

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"flag"
2222
"path/filepath"
23+
"strings"
2324
"sync"
2425
"testing"
2526
"time"
@@ -32,6 +33,7 @@ import (
3233
"k8s.io/client-go/dynamic"
3334
"k8s.io/client-go/kubernetes/scheme"
3435
"k8s.io/client-go/rest"
36+
"k8s.io/client-go/util/workqueue"
3537
"k8s.io/klog/v2"
3638
"k8s.io/klog/v2/textlogger"
3739
ctrl "sigs.k8s.io/controller-runtime"
@@ -46,6 +48,7 @@ import (
4648
"sigs.k8s.io/controller-runtime/pkg/predicate"
4749

4850
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
51+
"github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
4952
testv1alpha1 "github.com/kubefleet-dev/kubefleet/test/apis/v1alpha1"
5053
)
5154

@@ -70,6 +73,13 @@ var (
7073
memberDynamicClient2 dynamic.Interface
7174
workApplier2 *Reconciler
7275

76+
memberCfg3 *rest.Config
77+
memberEnv3 *envtest.Environment
78+
hubMgr3 manager.Manager
79+
memberClient3 client.Client
80+
memberDynamicClient3 dynamic.Interface
81+
workApplier3 *Reconciler
82+
7383
ctx context.Context
7484
cancel context.CancelFunc
7585
wg sync.WaitGroup
@@ -83,8 +93,33 @@ const (
8393

8494
memberReservedNSName1 = "fleet-member-experimental-1"
8595
memberReservedNSName2 = "fleet-member-experimental-2"
96+
memberReservedNSName3 = "fleet-member-experimental-3"
97+
98+
parallelizerFixedDelay = time.Second * 5
8699
)
87100

101+
// tasks in parallel with a fixed delay after completing each task group.
102+
//
103+
// This is added to help verify the behavior of waved parallel processing in the work applier.
104+
type parallelizerWithFixedDelay struct {
105+
regularParallelizer parallelizer.Parallelizer
106+
delay time.Duration
107+
}
108+
109+
func (p *parallelizerWithFixedDelay) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) {
110+
p.regularParallelizer.ParallelizeUntil(ctx, pieces, doWork, operation)
111+
klog.V(2).InfoS("Parallelization completed, start to wait with a fixed delay", "operation", operation, "delay", p.delay)
112+
// No need to add delay for non-waved operations.
113+
if strings.HasPrefix(operation, "processingManifestsInWave") {
114+
// Only log the delay for operations that are actually related to waves.
115+
klog.V(2).InfoS("Waiting with a fixed delay after processing a wave", "operation", operation, "delay", p.delay)
116+
time.Sleep(p.delay)
117+
}
118+
}
119+
120+
// Verify that parallelizerWithFixedDelay implements the parallelizer.Parallelizer interface.
121+
var _ parallelizer.Parallelizer = &parallelizerWithFixedDelay{}
122+
88123
func TestAPIs(t *testing.T) {
89124
RegisterFailHandler(Fail)
90125

@@ -105,6 +140,13 @@ func setupResources() {
105140
},
106141
}
107142
Expect(hubClient.Create(ctx, ns2)).To(Succeed())
143+
144+
ns3 := &corev1.Namespace{
145+
ObjectMeta: metav1.ObjectMeta{
146+
Name: memberReservedNSName3,
147+
},
148+
}
149+
Expect(hubClient.Create(ctx, ns3)).To(Succeed())
108150
}
109151

110152
var _ = BeforeSuite(func() {
@@ -143,6 +185,14 @@ var _ = BeforeSuite(func() {
143185
filepath.Join("../../../", "test", "manifests"),
144186
},
145187
}
188+
// memberEnv3 is the test environment for verifying the behavior of waved parallel processing in
189+
// the work applier.
190+
memberEnv3 = &envtest.Environment{
191+
CRDDirectoryPaths: []string{
192+
filepath.Join("../../../", "config", "crd", "bases"),
193+
filepath.Join("../../../", "test", "manifests"),
194+
},
195+
}
146196

147197
var err error
148198
hubCfg, err = hubEnv.Start()
@@ -157,6 +207,14 @@ var _ = BeforeSuite(func() {
157207
Expect(err).ToNot(HaveOccurred())
158208
Expect(memberCfg2).ToNot(BeNil())
159209

210+
memberCfg3, err = memberEnv3.Start()
211+
Expect(err).ToNot(HaveOccurred())
212+
Expect(memberCfg3).ToNot(BeNil())
213+
214+
memberCfg2, err = memberEnv2.Start()
215+
Expect(err).ToNot(HaveOccurred())
216+
Expect(memberCfg2).ToNot(BeNil())
217+
160218
err = batchv1.AddToScheme(scheme.Scheme)
161219
Expect(err).NotTo(HaveOccurred())
162220
err = fleetv1beta1.AddToScheme(scheme.Scheme)
@@ -177,13 +235,20 @@ var _ = BeforeSuite(func() {
177235
Expect(err).ToNot(HaveOccurred())
178236
Expect(memberClient2).ToNot(BeNil())
179237

238+
memberClient3, err = client.New(memberCfg3, client.Options{Scheme: scheme.Scheme})
239+
Expect(err).ToNot(HaveOccurred())
240+
Expect(memberClient3).ToNot(BeNil())
241+
180242
// This setup also requires a client-go dynamic client for the member cluster.
181243
memberDynamicClient1, err = dynamic.NewForConfig(memberCfg1)
182244
Expect(err).ToNot(HaveOccurred())
183245

184246
memberDynamicClient2, err = dynamic.NewForConfig(memberCfg2)
185247
Expect(err).ToNot(HaveOccurred())
186248

249+
memberDynamicClient3, err = dynamic.NewForConfig(memberCfg3)
250+
Expect(err).ToNot(HaveOccurred())
251+
187252
By("Setting up the resources")
188253
setupResources()
189254

@@ -210,7 +275,7 @@ var _ = BeforeSuite(func() {
210275
memberClient1.RESTMapper(),
211276
hubMgr1.GetEventRecorderFor("work-applier"),
212277
maxConcurrentReconciles,
213-
workerCount,
278+
parallelizer.NewParallelizer(workerCount),
214279
30*time.Second,
215280
true,
216281
60,
@@ -259,7 +324,7 @@ var _ = BeforeSuite(func() {
259324
memberClient2.RESTMapper(),
260325
hubMgr2.GetEventRecorderFor("work-applier"),
261326
maxConcurrentReconciles,
262-
workerCount,
327+
parallelizer.NewParallelizer(workerCount),
263328
30*time.Second,
264329
true,
265330
60,
@@ -274,8 +339,52 @@ var _ = BeforeSuite(func() {
274339
Complete(workApplier2)
275340
Expect(err).NotTo(HaveOccurred())
276341

342+
By("Setting up the controller and the controller manager for member cluster 3")
343+
hubMgr3, err = ctrl.NewManager(hubCfg, ctrl.Options{
344+
Scheme: scheme.Scheme,
345+
Metrics: server.Options{
346+
BindAddress: "0",
347+
},
348+
Cache: cache.Options{
349+
DefaultNamespaces: map[string]cache.Config{
350+
memberReservedNSName3: {},
351+
},
352+
},
353+
Logger: textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))),
354+
})
355+
Expect(err).ToNot(HaveOccurred())
356+
357+
pWithDelay := &parallelizerWithFixedDelay{
358+
regularParallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
359+
// To avoid flakiness, use a fixed delay of 5 seconds so that we could reliably verify
360+
// if manifests are actually being processed in waves.
361+
delay: parallelizerFixedDelay,
362+
}
363+
workApplier3 = NewReconciler(
364+
hubClient,
365+
memberReservedNSName3,
366+
memberDynamicClient3,
367+
memberClient3,
368+
memberClient3.RESTMapper(),
369+
hubMgr3.GetEventRecorderFor("work-applier"),
370+
maxConcurrentReconciles,
371+
pWithDelay,
372+
30*time.Second,
373+
true,
374+
60,
375+
nil, // Use the default backoff rate limiter.
376+
)
377+
// Due to name conflicts, the third work applier must be set up manually.
378+
err = ctrl.NewControllerManagedBy(hubMgr3).Named("work-applier-controller-waved-parallel-processing").
379+
WithOptions(ctrloption.Options{
380+
MaxConcurrentReconciles: workApplier3.concurrentReconciles,
381+
}).
382+
For(&fleetv1beta1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
383+
Complete(workApplier3)
384+
Expect(err).NotTo(HaveOccurred())
385+
277386
wg = sync.WaitGroup{}
278-
wg.Add(2)
387+
wg.Add(3)
279388
go func() {
280389
defer GinkgoRecover()
281390
defer wg.Done()
@@ -289,6 +398,13 @@ var _ = BeforeSuite(func() {
289398
Expect(workApplier2.Join(ctx)).To(Succeed())
290399
Expect(hubMgr2.Start(ctx)).To(Succeed())
291400
}()
401+
402+
go func() {
403+
defer GinkgoRecover()
404+
defer wg.Done()
405+
Expect(workApplier3.Join(ctx)).To(Succeed())
406+
Expect(hubMgr3.Start(ctx)).To(Succeed())
407+
}()
292408
})
293409

294410
var _ = AfterSuite(func() {
@@ -300,4 +416,5 @@ var _ = AfterSuite(func() {
300416
Expect(hubEnv.Stop()).To(Succeed())
301417
Expect(memberEnv1.Stop()).To(Succeed())
302418
Expect(memberEnv2.Stop()).To(Succeed())
419+
Expect(memberEnv3.Stop()).To(Succeed())
303420
})

0 commit comments

Comments
 (0)